Skip to content

Commit

Permalink
serialized dhtChunk deletion with indexing
Browse files Browse the repository at this point in the history
The dht selection, transmission and deletion is now completely serialized with indexing


git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@1731 6c8d7289-2bf4-0310-a012-ef5d649a1542
  • Loading branch information
orbiter committed Feb 21, 2006
1 parent 76b167e commit 1d8ca6e
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 61 deletions.
2 changes: 1 addition & 1 deletion build.properties
Expand Up @@ -3,7 +3,7 @@ javacSource=1.4
javacTarget=1.4

# Release Configuration
releaseVersion=0.431
releaseVersion=0.432
releaseFile=yacy_dev_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz
#releaseFile=yacy_v${releaseVersion}_${DSTAMP}_${releaseNr}.tar.gz
releaseDir=yacy_dev_v${releaseVersion}_${DSTAMP}_${releaseNr}
Expand Down
9 changes: 0 additions & 9 deletions source/de/anomic/plasma/plasmaDHTChunk.java
Expand Up @@ -68,7 +68,6 @@ public class plasmaDHTChunk {
private plasmaCrawlLURL lurls;

private int status = chunkStatus_UNDEFINED;
private String statusMessage = "";
private String startPointHash;
private plasmaWordIndexEntryContainer[] indexContainers = null;
private HashMap urlCache; // String (url-hash) / plasmaCrawlLURL.Entry
Expand Down Expand Up @@ -114,14 +113,6 @@ public int getStatus() {
return this.status;
}

public void setStatusMessage(String message) {
this.statusMessage = message;
}

public String getStatusMessage() {
return statusMessage;
}

public plasmaDHTChunk(serverLog log, plasmaWordIndex wordIndex, plasmaCrawlLURL lurls, int minCount, int maxCount) {
this.log = log;
this.wordIndex = wordIndex;
Expand Down
2 changes: 1 addition & 1 deletion source/de/anomic/plasma/plasmaDHTFlush.java
Expand Up @@ -130,7 +130,7 @@ public yacySeed getSeed() {
public String[] getStatus() {
plasmaDHTTransfer workerThread = this.worker;
if (workerThread != null) {
return new String[]{this.status,workerThread.dhtChunk.getStatusMessage()};
return new String[]{this.status,workerThread.getStatusMessage()};
}
return new String[]{this.status,"Not running"};
}
Expand Down
26 changes: 18 additions & 8 deletions source/de/anomic/plasma/plasmaDHTTransfer.java
Expand Up @@ -56,6 +56,8 @@ public class plasmaDHTTransfer extends Thread {
// status fields
private boolean stopped = false;
private long transferTime = 0;
private int transferStatus = plasmaDHTChunk.chunkStatus_UNDEFINED;
private String transferStatusMessage = "";

// delivery destination
yacySeed seed = null;
Expand Down Expand Up @@ -102,30 +104,38 @@ public void stopIt() {
public long getTransferTime() {
return this.transferTime;
}

public int getStatus() {
return transferStatus;
}

public String getStatusMessage() {
return transferStatusMessage;
}

public void uploadIndex() throws InterruptedException {

/* loop until we
* - have successfully transfered the words list or
* - the retry counter limit was exceeded
*/
transferStatus = plasmaDHTChunk.chunkStatus_RUNNING;
long retryCount = 0, start = System.currentTimeMillis();
while (true) {
// testing if we were aborted
if (isAborted()) return;

// transfering seleted words to remote peer
dhtChunk.setStatusMessage("Running: Transfering chunk to target " + seed.hash + "/" + seed.getName());
dhtChunk.setStatus(plasmaDHTChunk.chunkStatus_RUNNING);
transferStatusMessage = "Running: Transfering chunk to target " + seed.hash + "/" + seed.getName();
String error = yacyClient.transferIndex(seed, dhtChunk.containers(), dhtChunk.urlCacheMap(), gzipBody4Transfer, timeout4Transfer);
if (error == null) {
// words successfully transfered
transferTime = System.currentTimeMillis() - start;
this.log.logInfo("Index transfer of " + dhtChunk.indexCount() + " words [" + dhtChunk.firstContainer().wordHash() + " .. " + dhtChunk.lastContainer().wordHash() + "]" + " to peer " + seed.getName() + ":" + seed.hash + " in " + (transferTime / 1000) + " seconds successfull ("
+ (1000 * dhtChunk.indexCount() / (transferTime + 1)) + " words/s)");
retryCount = 0;
dhtChunk.setStatusMessage("Finished: Transfer of chunk to target " + seed.hash + "/" + seed.getName());
dhtChunk.setStatus(plasmaDHTChunk.chunkStatus_COMPLETE);
transferStatusMessage = "Finished: Transfer of chunk to target " + seed.hash + "/" + seed.getName();
transferStatus = plasmaDHTChunk.chunkStatus_COMPLETE;
break;
} else {
// words transfer failed
Expand All @@ -141,9 +151,9 @@ public void uploadIndex() throws InterruptedException {
yacyCore.peerActions.peerDeparture(seed);

// if the retry counter limit was not exceeded we'll retry it in a few seconds
dhtChunk.setStatusMessage("Disconnected peer: " + ((retryCount > 5) ? error + ". Transfer aborted" : "Retry " + retryCount));
transferStatusMessage = "Disconnected peer: " + ((retryCount > 5) ? error + ". Transfer aborted" : "Retry " + retryCount);
if (retryCount > maxRetry) {
dhtChunk.setStatus(plasmaDHTChunk.chunkStatus_FAILED);
transferStatus = plasmaDHTChunk.chunkStatus_FAILED;
return;
}
Thread.sleep(retryCount * 5000);
Expand All @@ -162,13 +172,13 @@ public void uploadIndex() throws InterruptedException {
if (added < 0) {
// inc. retry counter
retryCount++;
dhtChunk.setStatusMessage("Disconnected peer: Peer ping failed. " + ((retryCount > 5) ? "Transfer aborted." : "Retry " + retryCount));
transferStatusMessage = "Disconnected peer: Peer ping failed. " + ((retryCount > 5) ? "Transfer aborted." : "Retry " + retryCount);
if (retryCount > maxRetry) return;
Thread.sleep(retryCount * 5000);
continue;
} else {
yacyCore.seedDB.getConnected(seed.hash);
dhtChunk.setStatusMessage("running");
transferStatusMessage = "running";
break;
}
}
Expand Down
90 changes: 49 additions & 41 deletions source/de/anomic/plasma/plasmaSwitchboard.java
Expand Up @@ -864,11 +864,20 @@ public boolean deQueue() {

boolean doneSomething = false;

// possibly delete entries from last chunk
if ((this.dhtTransferChunk != null) &&
(this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE)) {
int deletedURLs = this.dhtTransferChunk.deleteTransferIndexes();
this.log.logFine("Deleted from " + this.dhtTransferChunk.containers().length + " transferred RWIs locally, removed " + deletedURLs + " URL references");
this.dhtTransferChunk = null;
}

// generate a dht chunk
if ((this.dhtTransferChunk == null) ||
(this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_UNDEFINED) ||
(this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE) ||
(this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_FAILED)) {
if ((dhtShallTransfer() == null) &&
((this.dhtTransferChunk == null) ||
(this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_UNDEFINED) ||
(this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE) ||
(this.dhtTransferChunk.getStatus() == plasmaDHTChunk.chunkStatus_FAILED))) {
// generate new chunk
dhtTransferChunk = new plasmaDHTChunk(this.log, this.wordIndex, this.urlPool.loadedURL, 30, dhtTransferIndexCount);
doneSomething = true;
Expand Down Expand Up @@ -1980,69 +1989,73 @@ public void abortTransferWholeIndex(boolean wait) {
}
}

public boolean dhtTransferJob() {

public String dhtShallTransfer() {
if (yacyCore.seedDB == null) {
log.logFine("no DHT distribution: seedDB == null");
return false;
return "no DHT distribution: seedDB == null";
}
if (yacyCore.seedDB.mySeed == null) {
log.logFine("no DHT distribution: mySeed == null");
return false;
return "no DHT distribution: mySeed == null";
}
if (yacyCore.seedDB.mySeed.isVirgin()) {
log.logFine("no DHT distribution: status is virgin");
return false;
return "no DHT distribution: status is virgin";
}
if (getConfig("allowDistributeIndex","false").equalsIgnoreCase("false")) {
log.logFine("no DHT distribution: not enabled");
return false;
return "no DHT distribution: not enabled";
}
if (urlPool.loadedURL.size() < 10) {
log.logFine("no DHT distribution: loadedURL.size() = " + urlPool.loadedURL.size());
return false;
return "no DHT distribution: loadedURL.size() = " + urlPool.loadedURL.size();
}
if (wordIndex.size() < 100) {
log.logFine("no DHT distribution: not enough words - wordIndex.size() = " + wordIndex.size());
return false;
return "no DHT distribution: not enough words - wordIndex.size() = " + wordIndex.size();
}
if ((getConfig("allowDistributeIndexWhileCrawling","false").equalsIgnoreCase("false")) && (urlPool.noticeURL.stackSize() > 0)) {
log.logFine("no DHT distribution: crawl in progress - noticeURL.stackSize() = " + urlPool.noticeURL.stackSize());
return "no DHT distribution: crawl in progress - noticeURL.stackSize() = " + urlPool.noticeURL.stackSize();
}
return null;
}

public boolean dhtTransferJob() {
String rejectReason = dhtShallTransfer();
if (rejectReason != null) {
log.logFine(rejectReason);
return false;
}
if (this.dhtTransferChunk == null) {
log.logFine("no DHT distribution: no transfer chunk defined");
return false;
}
if ((this.dhtTransferChunk != null) && (this.dhtTransferChunk.getStatus() != plasmaDHTChunk.chunkStatus_FILLED)) {
log.logFine("no DHT distribution: index distribution is in progress");
log.logFine("no DHT distribution: index distribution is in progress, status=" + this.dhtTransferChunk.getStatus());
return false;
}

// do the transfer
int peerCount = (yacyCore.seedDB.mySeed.isJunior()) ? 1 : 3;
long starttime = System.currentTimeMillis();

boolean ok = dhtTransferProcess(dhtTransferChunk, peerCount, true);
boolean ok = dhtTransferProcess(dhtTransferChunk, peerCount);

if (!ok) {
log.logFine("no word distribution: transfer failed");
if (ok) {
dhtTransferChunk.setStatus(plasmaDHTChunk.chunkStatus_COMPLETE);
log.logFine("DHT distribution: transfer COMPLETE");
// adopt transfer count
if ((System.currentTimeMillis() - starttime) > (10000 * peerCount)) {
dhtTransferIndexCount--;
} else {
dhtTransferIndexCount++;
}
if (dhtTransferIndexCount < 50) dhtTransferIndexCount = 50;

// show success
return true;
} else {
dhtTransferChunk.setStatus(plasmaDHTChunk.chunkStatus_FAILED);
log.logFine("DHT distribution: transfer FAILED");
return false;
}

// adopt transfer count
if ((System.currentTimeMillis() - starttime) > (10000 * peerCount))
dhtTransferIndexCount--;
else
dhtTransferIndexCount++;
if (dhtTransferIndexCount < 50) dhtTransferIndexCount = 50;

// show success
return true;

}

public boolean dhtTransferProcess(plasmaDHTChunk dhtChunk, int peerCount, boolean delete) {
public boolean dhtTransferProcess(plasmaDHTChunk dhtChunk, int peerCount) {
if ((yacyCore.seedDB == null) || (yacyCore.seedDB.sizeConnected() == 0)) return false;

// find a list of DHT-peers
Expand All @@ -2063,7 +2076,7 @@ public boolean dhtTransferProcess(plasmaDHTChunk dhtChunk, int peerCount, boolea
(int)getConfigLong("indexDistribution.timeout",60000), 0);
try {transfer.uploadIndex();} catch (InterruptedException e) {}

if (transfer.dhtChunk.getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE) {
if (transfer.getStatus() == plasmaDHTChunk.chunkStatus_COMPLETE) {
peerNames += ", " + seeds[i].getName();
hc1++;
}
Expand All @@ -2074,11 +2087,6 @@ public boolean dhtTransferProcess(plasmaDHTChunk dhtChunk, int peerCount, boolea
// clean up and finish with deletion of indexes
if (hc1 >= peerCount) {
// success
if (delete) {
int deletedURLs = dhtChunk.deleteTransferIndexes();
this.log.logFine("Deleted from " + dhtChunk.containers().length + " transferred RWIs locally, removed " + deletedURLs + " URL references");
}
dhtChunk.setStatus(plasmaDHTChunk.chunkStatus_COMPLETE);
return true;
}
this.log.logSevere("Index distribution failed. Too few peers (" + hc1 + ") received the index, not deleted locally.");
Expand Down
2 changes: 1 addition & 1 deletion source/de/anomic/plasma/plasmaWordIndex.java
Expand Up @@ -149,7 +149,7 @@ private synchronized void flushCacheToBackend(String wordHash) {
}
}

public int addEntriesBackend(plasmaWordIndexEntryContainer entries) {
private int addEntriesBackend(plasmaWordIndexEntryContainer entries) {
plasmaWordIndexEntryContainer feedback = assortmentCluster.storeTry(entries.wordHash(), entries);
if (feedback == null) {
return entries.size();
Expand Down

0 comments on commit 1d8ca6e

Please sign in to comment.