Skip to content

Commit

Permalink
*) allow pausing/resuming of crawlJob Threads separately
Browse files Browse the repository at this point in the history
   - pausing/resuming localCrawls
   - pausing/resuming remoteTriggeredCrawls
   - pausing/resuming globalCrawlTrigger
   See: http://www.yacy-forum.de/viewtopic.php?t=1591

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@1723 6c8d7289-2bf4-0310-a012-ef5d649a1542
  • Loading branch information
theli committed Feb 21, 2006
1 parent af1da09 commit 2336f0f
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 59 deletions.
6 changes: 3 additions & 3 deletions htroot/IndexCreate_p.java
Expand Up @@ -281,12 +281,12 @@ public static serverObjects respond(httpHeader header, serverObjects post, serve


if (post.containsKey("pausecrawlqueue")) {
switchboard.pauseCrawling();
switchboard.pauseCrawlJob(plasmaSwitchboard.CRAWLJOB_LOCAL_CRAWL);
prop.put("info", 4);//crawling paused
}

if (post.containsKey("continuecrawlqueue")) {
switchboard.continueCrawling();
switchboard.continueCrawlJob(plasmaSwitchboard.CRAWLJOB_LOCAL_CRAWL);
prop.put("info", 5);//crawling continued
}
}
Expand Down Expand Up @@ -454,7 +454,7 @@ record = yacyCore.newsPool.get(yacyNewsPool.PROCESSED_DB, c);
}


prop.put("crawler-paused",(switchboard.crawlingIsPaused())?0:1);
prop.put("crawler-paused",(switchboard.crawlJobIsPaused(plasmaSwitchboard.CRAWLJOB_LOCAL_CRAWL))?0:1);

// return rewrite properties
return prop;
Expand Down
28 changes: 22 additions & 6 deletions htroot/ScreenSaver.java
Expand Up @@ -74,7 +74,9 @@ public class ScreenSaver {
public static serverObjects respond(httpHeader header, serverObjects post, serverSwitch env) {

plasmaSwitchboard sb = (plasmaSwitchboard)env;
boolean crawlingStarted = false;
boolean localCrawlStarted = false;
boolean remoteTriggeredCrawlStarted = false;
boolean globalCrawlTriggerStarted = false;
try {
InputStream input = (InputStream) header.get("INPUTSTREAM");
OutputStream output = (OutputStream) header.get("OUTPUTSTREAM");
Expand Down Expand Up @@ -110,10 +112,18 @@ public static serverObjects respond(httpHeader header, serverObjects post, serve

outputWriter.println(currentURL);
} else if (line.equals("CONTINUECRAWLING")) {
if (sb.crawlingIsPaused()) {
crawlingStarted = true;
sb.continueCrawling();
if (sb.crawlJobIsPaused(plasmaSwitchboard.CRAWLJOB_LOCAL_CRAWL)) {
localCrawlStarted = true;
sb.continueCrawlJob(plasmaSwitchboard.CRAWLJOB_LOCAL_CRAWL);
}
if (sb.crawlJobIsPaused(plasmaSwitchboard.CRAWLJOB_REMOTE_TRIGGERED_CRAWL)) {
remoteTriggeredCrawlStarted = true;
sb.continueCrawlJob(plasmaSwitchboard.CRAWLJOB_REMOTE_TRIGGERED_CRAWL);
}
if (sb.crawlJobIsPaused(plasmaSwitchboard.CRAWLJOB_GLOBAL_CRAWL_TRIGGER)) {
globalCrawlTriggerStarted = true;
sb.continueCrawlJob(plasmaSwitchboard.CRAWLJOB_GLOBAL_CRAWL_TRIGGER);
}
} else if (line.equals("EXIT")) {
outputWriter.println("OK");
outputWriter.flush();
Expand All @@ -128,9 +138,15 @@ public static serverObjects respond(httpHeader header, serverObjects post, serve
} catch (Exception e) {
return null;
} finally {
if (crawlingStarted) {
sb.pauseCrawling();
if (localCrawlStarted) {
sb.pauseCrawlJob(plasmaSwitchboard.CRAWLJOB_LOCAL_CRAWL);
}
if (remoteTriggeredCrawlStarted) {
sb.pauseCrawlJob(plasmaSwitchboard.CRAWLJOB_REMOTE_TRIGGERED_CRAWL);
}
if (globalCrawlTriggerStarted) {
sb.pauseCrawlJob(plasmaSwitchboard.CRAWLJOB_GLOBAL_CRAWL_TRIGGER);
}
}
}

Expand Down
35 changes: 26 additions & 9 deletions htroot/Status.java
Expand Up @@ -78,10 +78,22 @@ public static serverObjects respond(httpHeader header, serverObjects post, serve
prop.put("LOCATION","");
}
} else {
if (post.containsKey("pausecrawlqueue")) {
((plasmaSwitchboard)env).pauseCrawling();
} else if (post.containsKey("continuecrawlqueue")) {
((plasmaSwitchboard)env).continueCrawling();
if (post.containsKey("pauseCrawlJob")) {
String jobType = (String) post.get("jobType");
if (jobType.equals("localCrawl"))
((plasmaSwitchboard)env).pauseCrawlJob(plasmaSwitchboard.CRAWLJOB_LOCAL_CRAWL);
else if (jobType.equals("remoteTriggeredCrawl"))
((plasmaSwitchboard)env).pauseCrawlJob(plasmaSwitchboard.CRAWLJOB_REMOTE_TRIGGERED_CRAWL);
else if (jobType.equals("globalCrawlTrigger"))
((plasmaSwitchboard)env).pauseCrawlJob(plasmaSwitchboard.CRAWLJOB_GLOBAL_CRAWL_TRIGGER);
} else if (post.containsKey("continueCrawlJob")) {
String jobType = (String) post.get("jobType");
if (jobType.equals("localCrawl"))
((plasmaSwitchboard)env).continueCrawlJob(plasmaSwitchboard.CRAWLJOB_LOCAL_CRAWL);
else if (jobType.equals("remoteTriggeredCrawl"))
((plasmaSwitchboard)env).continueCrawlJob(plasmaSwitchboard.CRAWLJOB_REMOTE_TRIGGERED_CRAWL);
else if (jobType.equals("globalCrawlTrigger"))
((plasmaSwitchboard)env).continueCrawlJob(plasmaSwitchboard.CRAWLJOB_GLOBAL_CRAWL_TRIGGER);
} else if (post.containsKey("ResetTraffic")) {
httpdByteCountInputStream.resetCount();
httpdByteCountOutputStream.resetCount();
Expand Down Expand Up @@ -278,13 +290,18 @@ public static serverObjects respond(httpHeader header, serverObjects post, serve
prop.put("indexingQueueMax", Integer.toString(plasmaSwitchboard.indexingSlots));

prop.put("loaderQueueSize", Integer.toString(sb.cacheLoader.size()));
prop.put("loaderQueueMax", Integer.toString(plasmaSwitchboard.crawlSlots));
prop.put("loaderPaused",sb.crawlingIsPaused()?1:0);
prop.put("loaderQueueMax", Integer.toString(plasmaSwitchboard.crawlSlots));

prop.put("localCrawlQueueSize", Integer.toString(sb.getThread("50_localcrawl").getJobCount()));
prop.put("localCrawlQueueSize", Integer.toString(sb.getThread(plasmaSwitchboard.CRAWLJOB_LOCAL_CRAWL).getJobCount()));
prop.put("localCrawlPaused",sb.crawlJobIsPaused(plasmaSwitchboard.CRAWLJOB_LOCAL_CRAWL)?1:0);

prop.put("remoteTriggeredCrawlQueueSize", Integer.toString(sb.getThread(plasmaSwitchboard.CRAWLJOB_REMOTE_TRIGGERED_CRAWL).getJobCount()));
prop.put("remoteTriggeredCrawlPaused",sb.crawlJobIsPaused(plasmaSwitchboard.CRAWLJOB_REMOTE_TRIGGERED_CRAWL)?1:0);

prop.put("globalCrawlTriggerQueueSize", Integer.toString(sb.getThread(plasmaSwitchboard.CRAWLJOB_GLOBAL_CRAWL_TRIGGER).getJobCount()));
prop.put("globalCrawlTriggerPaused",sb.crawlJobIsPaused(plasmaSwitchboard.CRAWLJOB_GLOBAL_CRAWL_TRIGGER)?1:0);

prop.put("stackCrawlQueueSize", Integer.toString(sb.sbStackCrawlThread.size()));
prop.put("remoteCrawlQueueSize", Integer.toString(sb.getThread("62_remotetriggeredcrawl").getJobCount()));
prop.put("remoteCrawlTriggerQueueSize", Integer.toString(sb.getThread("61_globalcrawltrigger").getJobCount()));

// return rewrite properties
prop.put("date",(new Date()).toString());
Expand Down
45 changes: 35 additions & 10 deletions htroot/Status_p.inc
@@ -1,4 +1,4 @@
<table border="0" cellpadding="2" cellspacing="1" width="100%">
<table border="1" cellpadding="2" cellspacing="1" width="100%">
<tr class="TableHeader">
<td colspan="3"><b>Private System Properties</b></td>
</tr>
Expand Down Expand Up @@ -71,17 +71,42 @@
</tr>
<tr class="TableCellLight">
<td>Loader Queue</td>
<td>#[loaderQueueSize]# | #[loaderQueueMax]# #(loaderPaused)#::(paused)#(/loaderPaused)#&nbsp;<a href="Status.html?#(loaderPaused)#pausecrawlqueue::continuecrawlqueue#(/loaderPaused)#=" title="#(loaderPaused)#pause crawling::continue crawling#(/loaderPaused)#"><img src="env/grafics/#(loaderPaused)#stop.gif::start.gif#(/loaderPaused)#" border="0" width="12" height="12"></a></td>
<td>#[loaderQueueSize]# | #[loaderQueueMax]# </td>
<td>[<a href="IndexCreateLoaderQueue_p.html">Details</a>]</td>
</tr>
<tr class="TableCellDark">
<td>Crawler Queue</td>
<td>Enqueued from: local=#[localCrawlQueueSize]# remote=#[remoteCrawlQueueSize]# | Pending: #[stackCrawlQueueSize]#</td>
<td>[<a href="IndexCreateWWWLocalQueue_p.html">Details</a>]</td>
</tr>
<tr class="TableCellLight">
<td>Remote Crawl Trigger Queue</td>
<td>#[remoteCrawlTriggerQueueSize]#</td>
<td>[<a href="IndexCreateWWWGlobalQueue_p.html">Details</a>]</td>
<td>Crawler Queues</td>
<td>
<table>
<tr>
<td>Local Crawl</td>
<td>#[localCrawlQueueSize]#</td>
<td><a href="Status.html?#(localCrawlPaused)#pauseCrawlJob::continueCrawlJob#(/localCrawlPaused)#=&jobType=localCrawl" title="#(localCrawlPaused)#pause local crawl::continue local crawl#(/localCrawlPaused)#"><img src="env/grafics/#(localCrawlPaused)#stop.gif::start.gif#(/localCrawlPaused)#" border="0" width="12" height="12"></a></td>
<td>#(localCrawlPaused)#&nbsp;::(paused)#(/localCrawlPaused)#</td>
</tr>
<tr>
<td>Remote triggered Crawl</td>
<td>#[remoteTriggeredCrawlQueueSize]#</td>
<td><a href="Status.html?#(remoteTriggeredCrawlPaused)#pauseCrawlJob::continueCrawlJob#(/remoteTriggeredCrawlPaused)#=&jobType=remoteTriggeredCrawl" title="#(remoteTriggeredCrawlPaused)#pause remote triggered crawl::continue remote triggered crawl#(/remoteTriggeredCrawlPaused)#"><img src="env/grafics/#(remoteTriggeredCrawlPaused)#stop.gif::start.gif#(/remoteTriggeredCrawlPaused)#" border="0" width="12" height="12"></a></td>
<td>#(remoteTriggeredCrawlPaused)#&nbsp;::(paused)#(/remoteTriggeredCrawlPaused)#</td>
</tr>
<tr>
<td>Global Crawl Trigger</td>
<td>#[globalCrawlTriggerQueueSize]#</td>
<td><a href="Status.html?#(globalCrawlTriggerPaused)#pauseCrawlJob::continueCrawlJob#(/globalCrawlTriggerPaused)#=&jobType=globalCrawlTrigger" title="#(globalCrawlTriggerPaused)#pause global crawl trigger::continue global crawl trigger#(/globalCrawlTriggerPaused)#"><img src="env/grafics/#(globalCrawlTriggerPaused)#stop.gif::start.gif#(/globalCrawlTriggerPaused)#" border="0" width="12" height="12"></a></td>
<td>#(globalCrawlTriggerPaused)#&nbsp;::(paused)#(/globalCrawlTriggerPaused)#</td>
</tr>
<tr>
<td>Pending Crawl</td>
<td>#[stackCrawlQueueSize]#</td>
<td>&nbsp;</td>
<td>&nbsp;</td>
</tr>
</table>
</td>
<td valign="top">[<a href="IndexCreateWWWLocalQueue_p.html">Details</a>]<br>
&nbsp;<br>
[<a href="IndexCreateWWWGlobalQueue_p.html">Details</a>]
</td>
</tr>
</table>
3 changes: 2 additions & 1 deletion source/de/anomic/plasma/plasmaCrawlWorker.java
Expand Up @@ -530,7 +530,8 @@ private static plasmaHTCache.Entry load(
} else if ((errorMsg != null) && (errorMsg.indexOf("There is not enough space on the disk") >= 0)) {
log.logSevere("CRAWLER Not enough space on the disk detected while crawling '" + url.toString() + "'. " +
"Pausing crawlers. ");
plasmaCrawlLoader.switchboard.pauseCrawling();
plasmaCrawlLoader.switchboard.pauseCrawlJob(plasmaSwitchboard.CRAWLJOB_LOCAL_CRAWL);
plasmaCrawlLoader.switchboard.pauseCrawlJob(plasmaSwitchboard.CRAWLJOB_REMOTE_TRIGGERED_CRAWL);
} else if ((errorMsg != null) && (errorMsg.indexOf("Network is unreachable") >=0)) {
log.logSevere("CRAWLER Network is unreachable while trying to crawl URL '" + url.toString() + "'. ");
} else if ((errorMsg != null) && (errorMsg.indexOf("No trusted certificate found")>= 0)) {
Expand Down
88 changes: 59 additions & 29 deletions source/de/anomic/plasma/plasmaSwitchboard.java
Expand Up @@ -113,6 +113,7 @@ this class is also the core of the http crawling.
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -228,8 +229,17 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
private serverSemaphore shutdownSync = new serverSemaphore(0);
private boolean terminate = false;

private Object crawlingPausedSync = new Object();
private boolean crawlingIsPaused = false;
//private Object crawlingPausedSync = new Object();
//private boolean crawlingIsPaused = false;

public static final String CRAWLJOB_LOCAL_CRAWL = "50_localcrawl";
public static final String CRAWLJOB_REMOTE_TRIGGERED_CRAWL = "62_remotetriggeredcrawl";
public static final String CRAWLJOB_GLOBAL_CRAWL_TRIGGER = "61_globalcrawltrigger";
private static final int CRAWLJOB_SYNC = 0;
private static final int CRAWLJOB_STATUS = 1;

private Hashtable crawlJobsStatus = new Hashtable();

private static plasmaSwitchboard sb;

public plasmaSwitchboard(String rootPath, String initPath, String configPath) {
Expand Down Expand Up @@ -421,11 +431,25 @@ public plasmaSwitchboard(String rootPath, String initPath, String configPath) {

// start a loader
log.logConfig("Starting Crawl Loader");

crawlSlots = Integer.parseInt(getConfig("crawler.MaxActiveThreads", "10"));
this.crawlingIsPaused = Boolean.valueOf(getConfig("crawler.isPaused", "false")).booleanValue();
plasmaCrawlLoader.switchboard = this;
this.cacheLoader = new plasmaCrawlLoader(this.cacheManager, this.log);

/*
* Creating sync objects and loading status for the crawl jobs
* a) local crawl
* b) remote triggered crawl
* c) global crawl trigger
*/
this.crawlJobsStatus.put(CRAWLJOB_LOCAL_CRAWL, new Object[]{
new Object(),
Boolean.valueOf(getConfig(CRAWLJOB_LOCAL_CRAWL + "_isPaused", "false"))});
this.crawlJobsStatus.put(CRAWLJOB_REMOTE_TRIGGERED_CRAWL, new Object[]{
new Object(),
Boolean.valueOf(getConfig(CRAWLJOB_REMOTE_TRIGGERED_CRAWL + "_isPaused", "false"))});
this.crawlJobsStatus.put(CRAWLJOB_GLOBAL_CRAWL_TRIGGER, new Object[]{
new Object(),
Boolean.valueOf(getConfig(CRAWLJOB_GLOBAL_CRAWL_TRIGGER + "_isPaused", "false"))});

// starting board
initMessages(ramMessage);
Expand Down Expand Up @@ -956,32 +980,35 @@ public File getOwnSeedFile() {
/**
* With this function the crawling process can be paused
*/
public void pauseCrawling() {
synchronized(this.crawlingPausedSync) {
this.crawlingIsPaused = true;
public void pauseCrawlJob(String jobType) {
Object[] status = (Object[])this.crawlJobsStatus.get(jobType);
synchronized(status[CRAWLJOB_SYNC]) {
status[CRAWLJOB_STATUS] = Boolean.TRUE;
}
setConfig("crawler.isPaused", "true");
}
setConfig(jobType + "_isPaused", "true");
}

/**
* Continue the previously paused crawling
*/
public void continueCrawling() {
synchronized(this.crawlingPausedSync) {
if (this.crawlingIsPaused) {
this.crawlingIsPaused = false;
this.crawlingPausedSync.notifyAll();
public void continueCrawlJob(String jobType) {
Object[] status = (Object[])this.crawlJobsStatus.get(jobType);
synchronized(status[CRAWLJOB_SYNC]) {
if (((Boolean)status[CRAWLJOB_STATUS]).booleanValue()) {
status[CRAWLJOB_STATUS] = Boolean.FALSE;
status[CRAWLJOB_SYNC].notifyAll();
}
}
setConfig("crawler.isPaused", "false");
}
setConfig(jobType + "_isPaused", "false");
}

/**
* @return <code>true</code> if crawling was paused or <code>false</code> otherwise
*/
public boolean crawlingIsPaused() {
synchronized(this.crawlingPausedSync) {
return this.crawlingIsPaused;
public boolean crawlJobIsPaused(String jobType) {
Object[] status = (Object[])this.crawlJobsStatus.get(jobType);
synchronized(status[CRAWLJOB_SYNC]) {
return ((Boolean)status[CRAWLJOB_STATUS]).booleanValue();
}
}

Expand Down Expand Up @@ -1012,10 +1039,11 @@ public boolean coreCrawlJob() {
//if (!(cacheManager.idle())) try {Thread.currentThread().sleep(2000);} catch (InterruptedException e) {}

// if crawling was paused we have to wait until we wer notified to continue
synchronized(this.crawlingPausedSync) {
if (this.crawlingIsPaused) {
Object[] status = (Object[])this.crawlJobsStatus.get(CRAWLJOB_LOCAL_CRAWL);
synchronized(status[CRAWLJOB_SYNC]) {
if (((Boolean)status[CRAWLJOB_STATUS]).booleanValue()) {
try {
this.crawlingPausedSync.wait();
status[CRAWLJOB_SYNC].wait();
}
catch (InterruptedException e){ return false;}
}
Expand Down Expand Up @@ -1090,10 +1118,11 @@ public boolean limitCrawlTriggerJob() {
//if (!(cacheManager.idle())) try {Thread.currentThread().sleep(2000);} catch (InterruptedException e) {}

// if crawling was paused we have to wait until we wer notified to continue
synchronized(this.crawlingPausedSync) {
if (this.crawlingIsPaused) {
Object[] status = (Object[])this.crawlJobsStatus.get(CRAWLJOB_GLOBAL_CRAWL_TRIGGER);
synchronized(status[CRAWLJOB_SYNC]) {
if (((Boolean)status[CRAWLJOB_STATUS]).booleanValue()) {
try {
this.crawlingPausedSync.wait();
status[CRAWLJOB_SYNC].wait();
}
catch (InterruptedException e){ return false;}
}
Expand Down Expand Up @@ -1152,12 +1181,13 @@ public boolean remoteTriggeredCrawlJob() {
}

// if crawling was paused we have to wait until we wer notified to continue
synchronized(this.crawlingPausedSync) {
if (this.crawlingIsPaused) {
Object[] status = (Object[])this.crawlJobsStatus.get(CRAWLJOB_REMOTE_TRIGGERED_CRAWL);
synchronized(status[CRAWLJOB_SYNC]) {
if (((Boolean)status[CRAWLJOB_STATUS]).booleanValue()) {
try {
this.crawlingPausedSync.wait();
status[CRAWLJOB_SYNC].wait();
}
catch (InterruptedException e){ return false; }
catch (InterruptedException e){ return false;}
}
}

Expand Down
4 changes: 3 additions & 1 deletion yacy.init
Expand Up @@ -429,12 +429,15 @@ xpstopw=true
50_localcrawl_idlesleep=10000
50_localcrawl_busysleep=200
50_localcrawl_memprereq=1048576
50_localcrawl_isPaused=false
61_globalcrawltrigger_idlesleep=10000
61_globalcrawltrigger_busysleep=200
61_globalcrawltrigger_memprereq=1048576
61_globalcrawltrigger_isPaused=false
62_remotetriggeredcrawl_idlesleep=10000
62_remotetriggeredcrawl_busysleep=200
62_remotetriggeredcrawl_memprereq=1048576
62_remotetriggeredcrawl_isPaused=false
70_cachemanager_idlesleep=5000
70_cachemanager_busysleep=0
70_cachemanager_memprereq=1048576
Expand Down Expand Up @@ -561,7 +564,6 @@ onlineCautionDelay=30000
crawler.acceptLanguage=en-us,en;q=0.5
crawler.acceptCharset=ISO-8859-1,utf-8;q=0.7,*;q=0.7
crawler.clientTimeout=9000
crawler.isPaused=false

# maximum number of crawler threads
crawler.MaxActiveThreads = 10
Expand Down

0 comments on commit 2336f0f

Please sign in to comment.