Skip to content

Commit

Permalink
*) more failsafe threadpools
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@1446 6c8d7289-2bf4-0310-a012-ef5d649a1542
  • Loading branch information
theli committed Jan 26, 2006
1 parent 3feeba3 commit f5abfe8
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 127 deletions.
38 changes: 33 additions & 5 deletions source/de/anomic/plasma/plasmaCrawlLoader.java
Expand Up @@ -48,9 +48,11 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;

import org.apache.commons.pool.impl.GenericObjectPool;

import de.anomic.server.serverSemaphore;
import de.anomic.server.logging.serverLog;
import org.apache.commons.pool.impl.GenericObjectPool;

public final class plasmaCrawlLoader extends Thread {

Expand Down Expand Up @@ -291,8 +293,33 @@ public Object borrowObject() throws Exception {
return super.borrowObject();
}

public void returnObject(Object obj) throws Exception {
super.returnObject(obj);
public void returnObject(Object obj) {
if (obj == null) return;
if (obj instanceof plasmaCrawlWorker) {
try {
((plasmaCrawlWorker)obj).setName(plasmaCrawlWorker.threadBaseName + "_inPool");
super.returnObject(obj);
} catch (Exception e) {
((plasmaCrawlWorker)obj).setStopped(true);
serverLog.logSevere("CRAWLER-POOL","Unable to return crawler thread to pool.",e);
}
} else {
serverLog.logSevere("CRAWLER-POOL","Object of wront type '" + obj.getClass().getName() +
"' returned to pool.");
}
}

public void invalidateObject(Object obj) {
if (obj == null) return;
if (this.isClosed) return;
if (obj instanceof plasmaCrawlWorker) {
try {
((plasmaCrawlWorker)obj).setStopped(true);
super.invalidateObject(obj);
} catch (Exception e) {
serverLog.logSevere("CRAWLER-POOL","Unable to invalidate crawling thread.",e);
}
}
}

public synchronized void close() throws Exception {
Expand Down Expand Up @@ -395,6 +422,7 @@ public Object makeObject() {
* @see org.apache.commons.pool.PoolableObjectFactory#destroyObject(java.lang.Object)
*/
public void destroyObject(Object obj) {
if (obj == null) return;
if (obj instanceof plasmaCrawlWorker) {
plasmaCrawlWorker theWorker = (plasmaCrawlWorker) obj;
theWorker.setStopped(true);
Expand All @@ -405,8 +433,8 @@ public void destroyObject(Object obj) {
* @see org.apache.commons.pool.PoolableObjectFactory#validateObject(java.lang.Object)
*/
public boolean validateObject(Object obj) {
if (obj instanceof plasmaCrawlWorker)
{
if (obj == null) return false;
if (obj instanceof plasmaCrawlWorker) {
plasmaCrawlWorker theWorker = (plasmaCrawlWorker) obj;
if (!theWorker.isAlive() || theWorker.isInterrupted()) return false;
if (theWorker.isRunning()) return true;
Expand Down
93 changes: 56 additions & 37 deletions source/de/anomic/plasma/plasmaCrawlStacker.java
Expand Up @@ -414,7 +414,7 @@ public stackCrawlMessage(String urlHash, byte[][] entryBytes) {
this.handle = Integer.parseInt(new String(entryBytes[11], "UTF-8"));
} catch (Exception e) {
e.printStackTrace();
throw new IllegalStateException();
throw new IllegalStateException(e);
}
}

Expand Down Expand Up @@ -596,13 +596,15 @@ public stackCrawlMessage waitForMessage() throws InterruptedException, IOExcepti
String urlHash = null;
byte[][] entryBytes = null;
stackCrawlMessage newMessage = null;
synchronized(this.urlEntryHashCache) {
urlHash = (String) this.urlEntryHashCache.removeFirst();
entryBytes = this.urlEntryCache.remove(urlHash.getBytes());
try {
synchronized(this.urlEntryHashCache) {
urlHash = (String) this.urlEntryHashCache.removeFirst();
entryBytes = this.urlEntryCache.remove(urlHash.getBytes());
}
} finally {
this.writeSync.V();
}

this.writeSync.V();

newMessage = new stackCrawlMessage(urlHash,entryBytes);
return newMessage;
}
Expand Down Expand Up @@ -693,11 +695,36 @@ public WorkerPool(plasmaCrawlStacker.WorkterFactory objFactory,
public Object borrowObject() throws Exception {
return super.borrowObject();
}

public void returnObject(Object obj) throws Exception {
super.returnObject(obj);

public void returnObject(Object obj) {
if (obj == null) return;
if (obj instanceof Worker) {
try {
((Worker)obj).setName("stackCrawlThread_inPool");
super.returnObject(obj);
} catch (Exception e) {
((Worker)obj).setStopped(true);
serverLog.logSevere("STACKCRAWL-POOL","Unable to return stackcrawl thread to pool.",e);
}
} else {
serverLog.logSevere("STACKCRAWL-POOL","Object of wront type '" + obj.getClass().getName() +
"' returned to pool.");
}
}

public void invalidateObject(Object obj) {
if (obj == null) return;
if (this.isClosed) return;
if (obj instanceof Worker) {
try {
((Worker)obj).setStopped(true);
super.invalidateObject(obj);
} catch (Exception e) {
serverLog.logSevere("STACKCRAWL-POOL","Unable to invalidate stackcrawl thread.",e);
}
}
}

public synchronized void close() throws Exception {

/*
Expand Down Expand Up @@ -807,38 +834,30 @@ public boolean isRunning() {
public void run() {
this.running = true;

// The thread keeps running.
while (!this.stopped && !Thread.interrupted()) {
if (this.done) {
// We are waiting for a task now.
synchronized (this) {
try {
this.wait(); //Wait until we get a request to process.
} catch (InterruptedException e) {
this.stopped = true;
// log.error("", e);
}
}
} else {
//There is a task....let us execute it.
try {
execute();
} catch (Exception e) {
// log.error("", e);
} finally {
reset();
try {
// The thread keeps running.
while (!this.stopped && !this.isInterrupted() && !plasmaCrawlStacker.this.theWorkerPool.isClosed) {
if (this.done) {
// return thread back into pool
plasmaCrawlStacker.this.theWorkerPool.returnObject(this);

if (!this.stopped && !this.isInterrupted() && !plasmaCrawlStacker.this.theWorkerPool.isClosed) {
try {
this.setName("stackCrawlThread_inPool");
plasmaCrawlStacker.this.theWorkerPool.returnObject(this);
} catch (Exception e1) {
// e1.printStackTrace();
this.stopped = true;
}
// We are waiting for a new task now.
synchronized (this) { this.wait(); }
} else {
try {
// executing the new task
execute();
} finally {
// reset thread
reset();
}
}
}
} catch (InterruptedException ex) {
serverLog.logInfo("STACKCRAWL-POOL","Interruption of thread '" + this.getName() + "' detected.");
} finally {
if (plasmaCrawlStacker.this.theWorkerPool != null)
plasmaCrawlStacker.this.theWorkerPool.invalidateObject(this);
}
}

Expand Down
51 changes: 20 additions & 31 deletions source/de/anomic/plasma/plasmaCrawlWorker.java
Expand Up @@ -65,7 +65,7 @@
public final class plasmaCrawlWorker extends Thread {

private static final int DEFAULT_CRAWLING_RETRY_COUNT = 5;
private static final String threadBaseName = "CrawlerWorker";
static final String threadBaseName = "CrawlerWorker";

private final CrawlerPool myPool;
private final plasmaSwitchboard sb;
Expand Down Expand Up @@ -165,40 +165,29 @@ public void reset() {
public void run() {
this.running = true;

// The thread keeps running.
while (!this.stopped && !Thread.interrupted()) {
if (this.done) {
// We are waiting for a task now.
synchronized (this) {
try {
// The thread keeps running.
while (!this.stopped && !this.isInterrupted() && !this.myPool.isClosed) {
if (this.done) {
// return thread back into pool
this.myPool.returnObject(this);

// We are waiting for a new task now.
synchronized (this) { this.wait(); }
} else {
try {
this.wait(); //Wait until we get a request to process.
}
catch (InterruptedException e) {
this.stopped = true;
// log.error("", e);
}
}
} else {
//There is a task....let us execute it.
try {
execute();
} catch (Exception e) {
// log.error("", e);
}
finally {
reset();

if (!this.stopped && !this.isInterrupted()) {
try {
this.myPool.returnObject(this);
this.setName(plasmaCrawlWorker.threadBaseName + "_inPool");
}
catch (Exception e1) {
log.logSevere("pool error", e1);
}
// executing the new task
execute();
} finally {
reset();
}
}
}
} catch (InterruptedException ex) {
serverLog.logInfo("CRAWLER-POOL","Interruption of thread '" + this.getName() + "' detected.");
} finally {
if (this.myPool != null)
this.myPool.invalidateObject(this);
}
}

Expand Down

0 comments on commit f5abfe8

Please sign in to comment.