Skip to content

Commit

Permalink
implemented multithreading of indexing
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@221 6c8d7289-2bf4-0310-a012-ef5d649a1542
  • Loading branch information
orbiter committed Jun 8, 2005
1 parent 7c318f8 commit 33f9315
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 30 deletions.
12 changes: 6 additions & 6 deletions htroot/IndexCreate_p.java
Expand Up @@ -209,10 +209,10 @@ public static serverObjects respond(httpHeader header, serverObjects post, serve
prop.put("xdstopwChecked", env.getConfig("xdstopw", "").equals("true") ? 1 : 0);
prop.put("xpstopwChecked", env.getConfig("xpstopw", "").equals("true") ? 1 : 0);

int processStackSize = switchboard.processStack.size();
int queueStackSize = switchboard.queueStack.size();
int loaderThreadsSize = switchboard.cacheLoader.size();
int crawlerListSize = switchboard.noticeURL.stackSize();
int completequeue = processStackSize + loaderThreadsSize + crawlerListSize;
int completequeue = queueStackSize + loaderThreadsSize + crawlerListSize;

if ((completequeue > 0) || ((post != null) && (post.containsKey("refreshpage")))) {
prop.put("refreshbutton", 1);
Expand Down Expand Up @@ -318,15 +318,15 @@ public static serverObjects respond(httpHeader header, serverObjects post, serve

yacySeed initiator;

if (switchboard.processStack.size() == 0) {
if (switchboard.queueStack.size() == 0) {
prop.put("indexing-queue", 0); //is empty
} else {
prop.put("indexing-queue", 1);
prop.put("indexing-queue_num", switchboard.processStack.size());//num entries in queue
prop.put("indexing-queue_num", switchboard.queueStack.size());//num entries in queue
dark = true;
plasmaHTCache.Entry pcentry;
for (i = 0; i < switchboard.processStack.size(); i++) {
pcentry = (plasmaHTCache.Entry) switchboard.processStack.get(i);
for (i = 0; i < switchboard.queueStack.size(); i++) {
pcentry = (plasmaHTCache.Entry) switchboard.queueStack.get(i);
if (pcentry != null) {
initiator = yacyCore.seedDB.getConnected(pcentry.initiator());
prop.put("indexing-queue_list_"+i+"_dark", ((dark) ? 1 : 0));
Expand Down
2 changes: 1 addition & 1 deletion htroot/ProxyIndexingMonitor_p.java
@@ -1,5 +1,5 @@
// ProxyIndexingMonitor_p.java
// -----------------------
// ---------------------------
// part of the AnomicHTTPD caching proxy
// (C) by Michael Peter Christen; mc@anomic.de
// first published on http://www.anomic.de
Expand Down
52 changes: 31 additions & 21 deletions source/de/anomic/plasma/plasmaSwitchboard.java
Expand Up @@ -173,7 +173,7 @@ public final class plasmaSwitchboard extends serverAbstractSwitch implements ser
public plasmaHTCache cacheManager;
public plasmaSnippetCache snippetCache;
public plasmaCrawlLoader cacheLoader;
public LinkedList processStack = new LinkedList();
public LinkedList queueStack = new LinkedList();
public messageBoard messageDB;
public wikiBoard wikiDB;
public String remoteProxyHost;
Expand Down Expand Up @@ -334,14 +334,18 @@ public plasmaSwitchboard(String rootPath, String initPath, String configPath) th

// deploy threads
log.logSystem("Starting Threads");
int indexing_cluster = Integer.parseInt(getConfig("?80_indexing_cluster", "1"));
if (indexing_cluster < 1) indexing_cluster = 1;
deployThread("90_cleanup", "Cleanup", "simple cleaning process for monitoring information" ,
new serverInstantThread(this, "cleanupJob", "cleanupJobSize"), 10000); // all 5 Minutes
deployThread("80_dequeue", "Indexing Dequeue", "thread that creates database entries from scraped web content and performes indexing" ,
deployThread("80_indexing", "Parsing/Indexing", "thread that performes document parsing and indexing" ,
new serverInstantThread(this, "deQueue", "queueSize"), 10000);
setConfig("81_dequeue_idlesleep" , getConfig("80_dequeue_idlesleep", ""));
setConfig("81_dequeue_busysleep" , getConfig("80_dequeue_busysleep", ""));
deployThread("81_dequeue", "Indexing Dequeue (second job, test run)", "thread that creates database entries from scraped web content and performes indexing" ,
new serverInstantThread(this, "deQueue", "queueSize"), 11000);
for (int i = 1; i < indexing_cluster; i++) {
setConfig((i + 80) + "_indexing_idlesleep", getConfig("80_indexing_idlesleep", ""));
setConfig((i + 80) + "_indexing_busysleep", getConfig("80_indexing_busysleep", ""));
deployThread((i + 80) + "_indexing", "Parsing/Indexing (cluster job)", "thread that performes document parsing and indexing" ,
new serverInstantThread(this, "deQueue", "queueSize"), 10000 + (i * 1000));
}
deployThread("70_cachemanager", "Proxy Cache Enqueue", "job takes new proxy files from RAM stack, stores them, and hands over to the Indexing Stack",
new serverInstantThread(cacheManager, "job", "size"), 10000);
deployThread("62_remotetriggeredcrawl", "Remote Crawl Job", "thread that performes a single crawl/indexing step triggered by a remote peer",
Expand Down Expand Up @@ -406,7 +410,7 @@ private void resetProfiles() {
} catch (IOException e) {}
}
private void cleanProfiles() {
if (queueSize() > 0) return;
if ((queueStack.size() > 0) || (cacheLoader.size() > 0) || (noticeURL.stackSize() > 0)) return;
Iterator i = profiles.profiles(true);
plasmaCrawlProfile.entry entry;
try {
Expand Down Expand Up @@ -467,7 +471,8 @@ public int totalSize() {
*/

public int queueSize() {
return processStack.size() + cacheLoader.size() + noticeURL.stackSize();
return queueStack.size();
//return processStack.size() + cacheLoader.size() + noticeURL.stackSize();
}

public int lUrlSize() {
Expand All @@ -480,13 +485,13 @@ public int cacheSizeMin() {

public void enQueue(Object job) {
plasmaHTCache.Entry entry = (plasmaHTCache.Entry) job;
processStack.addLast(entry);
queueStack.addLast(entry);
}

public boolean deQueue() {
// work off fresh entries from the proxy or from the crawler

if (processStack.size() == 0) {
if (queueStack.size() == 0) {
//log.logDebug("DEQUEUE: queue is empty");
return false; // nothing to do
}
Expand All @@ -496,12 +501,12 @@ public boolean deQueue() {

// do one processing step
log.logDebug("DEQUEUE: cacheManager=" + ((cacheManager.idle()) ? "idle" : "busy") +
", processStack=" + processStack.size() +
", queueStack=" + queueStack.size() +
", coreStackSize=" + noticeURL.coreStackSize() +
", limitStackSize=" + noticeURL.limitStackSize() +
", overhangStackSize=" + noticeURL.overhangStackSize() +
", remoteStackSize=" + noticeURL.remoteStackSize());
processResourceStack((plasmaHTCache.Entry) processStack.removeFirst());
processResourceStack((plasmaHTCache.Entry) queueStack.removeFirst());
return true;
}

Expand Down Expand Up @@ -574,9 +579,9 @@ public boolean coreCrawlJob() {
//log.logDebug("CoreCrawl: queue is empty");
return false;
}
if (processStack.size() >= crawlSlots) {
if (queueStack.size() >= crawlSlots) {
log.logDebug("CoreCrawl: too many processes in queue, dismissed (" +
"processStack=" + processStack.size() + ")");
"queueStack=" + queueStack.size() + ")");
return false;
}
if (cacheLoader.size() >= crawlSlots) {
Expand Down Expand Up @@ -652,7 +657,7 @@ public boolean limitCrawlTriggerJob() {
", permission=" + ((yacyCore.seedDB == null) ? "undefined" : (((yacyCore.seedDB.mySeed.isSenior()) || (yacyCore.seedDB.mySeed.isPrincipal())) ? "true" : "false")));

boolean tryRemote =
((noticeURL.coreStackSize() != 0) || (processStack.size() != 0)) /* should do ourself */ &&
((noticeURL.coreStackSize() != 0) || (queueStack.size() != 0)) /* should do ourself */ &&
(profile.remoteIndexing()) /* granted */ &&
(urlEntry.initiator() != null) && (!(urlEntry.initiator().equals(plasmaURL.dummyHash))) /* not proxy */ &&
((yacyCore.seedDB.mySeed.isSenior()) ||
Expand All @@ -664,9 +669,9 @@ public boolean limitCrawlTriggerJob() {
}

// alternatively do a local crawl
if (processStack.size() >= crawlSlots) {
if (queueStack.size() >= crawlSlots) {
log.logDebug("LimitCrawl: too many processes in queue, dismissed (" +
"processStack=" + processStack.size() + ")");
"queueStack=" + queueStack.size() + ")");
return false;
}
if (cacheLoader.size() >= crawlSlots) {
Expand All @@ -692,16 +697,18 @@ public boolean remoteTriggeredCrawlJob() {
//log.logDebug("GlobalCrawl: queue is empty");
return false;
}
if (processStack.size() > 0) {
/*
if (queueStack.size() > 0) {
log.logDebug("GlobalCrawl: any processe is in queue, dismissed (" +
"processStack=" + processStack.size() + ")");
"processStack=" + queueStack.size() + ")");
return false;
}
if (noticeURL.coreStackSize() > 0) {
log.logDebug("GlobalCrawl: any local crawl is in queue, dismissed (" +
"coreStackSize=" + noticeURL.coreStackSize() + ")");
return false;
}
*/

// if the server is busy, we do this more slowly
if (!(cacheManager.idle())) try {Thread.currentThread().sleep(2000);} catch (InterruptedException e) {}
Expand Down Expand Up @@ -1377,7 +1384,7 @@ public serverObjects action(String actionName, serverObjects actionInput) {
public String toString() {
// it is possible to use this method in the cgi pages.
// actually it is used there for testing purpose
return "PROPS: " + super.toString() + "; QUEUE: " + processStack.toString();
return "PROPS: " + super.toString() + "; QUEUE: " + queueStack.toString();
}

// method for index deletion
Expand Down Expand Up @@ -1448,7 +1455,10 @@ public boolean job() {
int transferred;
long starttime = System.currentTimeMillis();
try {
if ((queueSize() == 0) &&
if (
(queueStack.size() == 0) &&
(cacheLoader.size() == 0) &&
(noticeURL.stackSize() == 0) &&
(getConfig("allowDistributeIndex", "false").equals("true")) &&
((transferred = performTransferIndex(indexCount, peerCount, true)) > 0)) {
indexCount = transferred;
Expand Down
13 changes: 11 additions & 2 deletions yacy.init
Expand Up @@ -420,11 +420,20 @@ xpstopw=true
62_remotetriggeredcrawl_busysleep=0
70_cachemanager_idlesleep=10000
70_cachemanager_busysleep=0
80_dequeue_idlesleep=10000
80_dequeue_busysleep=0
80_indexing_idlesleep=10000
80_indexing_busysleep=0
90_cleanup_idlesleep=300000
90_cleanup_busysleep=300000

# multiprocessor-settings
# you may want to run time-consuming processes on several processors
# the most time-consuming process is the indexing-Process
# We implemented an option to run several of these processes here
# setting the number of processes to Zero is not allowed
# If you have a double-processor system,
# a cluster value of '2' would be appropriate
80_indexing_cluster=1

# ram cache for database files

# ram cache for indexCache.db
Expand Down

0 comments on commit 33f9315

Please sign in to comment.