Skip to content

Commit

Permalink
- removed deprecated threads
Browse files Browse the repository at this point in the history
- added automatic http client reset. this was necessary because excessive intranet crawling caused deadlocks. this hack solved the problem.

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5768 6c8d7289-2bf4-0310-a012-ef5d649a1542
  • Loading branch information
orbiter committed Apr 1, 2009
1 parent 293290c commit 9bfb264
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 64 deletions.
8 changes: 6 additions & 2 deletions source/de/anomic/crawler/CrawlQueues.java
Expand Up @@ -36,6 +36,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import de.anomic.http.httpClient;
import de.anomic.kelondro.table.FlexWidthArray;
import de.anomic.kelondro.text.Document;
import de.anomic.kelondro.util.DateFormatter;
Expand Down Expand Up @@ -571,7 +572,8 @@ public void run() {
1,
"denied by robots.txt");
eentry.store();
errorURL.push(eentry);
errorURL.push(eentry);
this.entry.setStatus("worker-disallowed", serverProcessorJob.STATUS_FINISHED);
} else {
// starting a load from the internet
this.entry.setStatus("worker-loading", serverProcessorJob.STATUS_RUNNING);
Expand All @@ -585,6 +587,7 @@ public void run() {
"cannot load: " + result);
eentry.store();
errorURL.push(eentry);
this.entry.setStatus("worker-error", serverProcessorJob.STATUS_FINISHED);
} else {
this.entry.setStatus("worker-processed", serverProcessorJob.STATUS_FINISHED);
}
Expand All @@ -599,9 +602,10 @@ public void run() {
eentry.store();
errorURL.push(eentry);
e.printStackTrace();
httpClient.initConnectionManager();
this.entry.setStatus("worker-exception", serverProcessorJob.STATUS_FINISHED);
} finally {
workers.remove(code);
this.entry.setStatus("worker-finalized", serverProcessorJob.STATUS_FINISHED);
}
}

Expand Down
8 changes: 4 additions & 4 deletions source/de/anomic/crawler/CrawlStacker.java
Expand Up @@ -289,25 +289,25 @@ public String stackCrawl(final CrawlEntry entry) {
//int b = nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_LIMIT);
nextQueue.noticeURL.push(NoticedURL.STACK_TYPE_LIMIT, entry);
//assert b < nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_LIMIT);
this.log.logInfo("stacked/global: " + entry.url().toString() + ", stacksize = " + nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_LIMIT));
//this.log.logInfo("stacked/global: " + entry.url().toString() + ", stacksize = " + nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_LIMIT));
} else if (local) {
if (proxy) this.log.logWarning("URL '" + entry.url().toString() + "' has conflicting initiator properties: local = true, proxy = true, initiator = " + entry.initiator() + ", profile.handle = " + profile.handle());
if (remote) this.log.logWarning("URL '" + entry.url().toString() + "' has conflicting initiator properties: local = true, remote = true, initiator = " + entry.initiator() + ", profile.handle = " + profile.handle());
//int b = nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE);
nextQueue.noticeURL.push(NoticedURL.STACK_TYPE_CORE, entry);
//assert b < nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE);
this.log.logInfo("stacked/local: " + entry.url().toString() + ", stacksize = " + nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE));
//this.log.logInfo("stacked/local: " + entry.url().toString() + ", stacksize = " + nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE));
} else if (proxy) {
if (remote) this.log.logWarning("URL '" + entry.url().toString() + "' has conflicting initiator properties: proxy = true, remote = true, initiator = " + entry.initiator() + ", profile.handle = " + profile.handle());
//int b = nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE);
nextQueue.noticeURL.push(NoticedURL.STACK_TYPE_CORE, entry);
//assert b < nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE);
this.log.logInfo("stacked/proxy: " + entry.url().toString() + ", stacksize = " + nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE));
//this.log.logInfo("stacked/proxy: " + entry.url().toString() + ", stacksize = " + nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_CORE));
} else if (remote) {
//int b = nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_REMOTE);
nextQueue.noticeURL.push(NoticedURL.STACK_TYPE_REMOTE, entry);
//assert b < nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_REMOTE);
this.log.logInfo("stacked/remote: " + entry.url().toString() + ", stacksize = " + nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_REMOTE));
//this.log.logInfo("stacked/remote: " + entry.url().toString() + ", stacksize = " + nextQueue.noticeURL.stackSize(NoticedURL.STACK_TYPE_REMOTE));
}

return null;
Expand Down
2 changes: 1 addition & 1 deletion source/de/anomic/crawler/Latency.java
Expand Up @@ -185,7 +185,7 @@ public void update(long time) {
}
public void slowdown() {
this.lastacc = System.currentTimeMillis();
this.timeacc = Math.min(60000, average() * 5);
this.timeacc = Math.min(60000, average() * 2);
this.count = 1;
}
public int count() {
Expand Down
51 changes: 30 additions & 21 deletions source/de/anomic/http/httpClient.java
Expand Up @@ -76,8 +76,8 @@ public class httpClient {
* "the HttpClient instance and connection manager should be shared among all threads for maximum efficiency."
* (Concurrent execution of HTTP methods, http://hc.apache.org/httpclient-3.x/performance.html)
*/
private final static MultiThreadedHttpConnectionManager conManager = new MultiThreadedHttpConnectionManager();
private final static HttpClient apacheHttpClient = new HttpClient(conManager);
private static MultiThreadedHttpConnectionManager conManager = null;
private static HttpClient apacheHttpClient = null;

// last ; must be before location (this is parsed)
private final static String jakartaUserAgent = " " +
Expand All @@ -87,25 +87,8 @@ public class httpClient {
/**
* set options for client
*/
// simple user agent
setUserAgent("yacy (www.yacy.net; " + getSystemOST() + ")");
// only one retry
apacheHttpClient.getParams().setParameter(HttpMethodParams.RETRY_HANDLER,
new DefaultHttpMethodRetryHandler(1, false));
/**
* set options for connection manager
*/
// conManager.getParams().setDefaultMaxConnectionsPerHost(4); // default 2
HostConfiguration localHostConfiguration = new HostConfiguration();
conManager.getParams().setMaxTotalConnections(200); // Proxy may need many connections
conManager.getParams().setConnectionTimeout(60000); // set a default timeout
conManager.getParams().setDefaultMaxConnectionsPerHost(3); // prevent DoS by mistake
localHostConfiguration.setHost("localhost");
conManager.getParams().setMaxConnectionsPerHost(localHostConfiguration, 100);
localHostConfiguration.setHost("127.0.0.1");
conManager.getParams().setMaxConnectionsPerHost(localHostConfiguration, 100);
// TODO should this be configurable?

initConnectionManager();

// accept self-signed or untrusted certificates
Protocol.registerProtocol("https", new Protocol("https",
(ProtocolSocketFactory) new AcceptEverythingSSLProtcolSocketFactory(), 443));
Expand All @@ -125,6 +108,32 @@ public class httpClient {
System.setProperty("sun.net.client.defaultReadTimeout", "60000");
}

public static void initConnectionManager() {
MultiThreadedHttpConnectionManager.shutdownAll();
conManager = new MultiThreadedHttpConnectionManager();
apacheHttpClient = new HttpClient(conManager);

/**
* set options for connection manager
*/
// conManager.getParams().setDefaultMaxConnectionsPerHost(4); // default 2
HostConfiguration localHostConfiguration = new HostConfiguration();
conManager.getParams().setMaxTotalConnections(200); // Proxy may need many connections
conManager.getParams().setConnectionTimeout(60000); // set a default timeout
conManager.getParams().setDefaultMaxConnectionsPerHost(10);
localHostConfiguration.setHost("localhost");
conManager.getParams().setMaxConnectionsPerHost(localHostConfiguration, 100);
localHostConfiguration.setHost("127.0.0.1");
conManager.getParams().setMaxConnectionsPerHost(localHostConfiguration, 100);

// only one retry
apacheHttpClient.getParams().setParameter(HttpMethodParams.RETRY_HANDLER,
new DefaultHttpMethodRetryHandler(1, false));
// simple user agent
setUserAgent("yacy (www.yacy.net; " + getSystemOST() + ")");

}

/**
* every x milliseconds do a cleanup (close old connections)
*
Expand Down
1 change: 1 addition & 0 deletions source/de/anomic/kelondro/text/IndexCell.java
Expand Up @@ -284,6 +284,7 @@ private synchronized void cleanCache() {

// clean-up the cache
if (this.lastCleanup + cleanupCycle > System.currentTimeMillis()) return;
//System.out.println("----cleanup check");
this.array.shrink(this.targetFileSize, this.maxFileSize);
this.lastCleanup = System.currentTimeMillis();
}
Expand Down
12 changes: 0 additions & 12 deletions source/de/anomic/plasma/plasmaSwitchboard.java
Expand Up @@ -605,8 +605,6 @@ public plasmaSwitchboard(final File rootPath, final String initPath, final Strin

deployThread(plasmaSwitchboardConstants.CLEANUP, "Cleanup", "simple cleaning process for monitoring information", null,
new serverInstantBusyThread(this, plasmaSwitchboardConstants.CLEANUP_METHOD_START, plasmaSwitchboardConstants.CLEANUP_METHOD_JOBCOUNT, plasmaSwitchboardConstants.CLEANUP_METHOD_FREEMEM), 600000); // all 5 Minutes, wait 10 minutes until first run
deployThread(plasmaSwitchboardConstants.CACHEFLUSH, "Cache Flush", "thread that flushes the index cache", "",
new serverInstantBusyThread(this, plasmaSwitchboardConstants.CACHEFLUSH_METHOD_START, plasmaSwitchboardConstants.CACHEFLUSH_METHOD_JOBCOUNT, plasmaSwitchboardConstants.CACHEFLUSH_METHOD_FREEMEM), 120000); // the cache flush does not need to be started soon, start it late after 2 minutes
deployThread(plasmaSwitchboardConstants.INDEXER, "Indexing", "thread that either initiates a parsing/indexing queue, distributes the index into the DHT, stores parsed documents", "/IndexCreateIndexingQueue_p.html",
new serverInstantBusyThread(this, plasmaSwitchboardConstants.INDEXER_METHOD_START, plasmaSwitchboardConstants.INDEXER_METHOD_JOBCOUNT, plasmaSwitchboardConstants.INDEXER_METHOD_FREEMEM), 10000);
deployThread(plasmaSwitchboardConstants.CRAWLJOB_REMOTE_TRIGGERED_CRAWL, "Remote Crawl Job", "thread that performes a single crawl/indexing step triggered by a remote peer", null,
Expand Down Expand Up @@ -1122,16 +1120,6 @@ public void close() {
log.logConfig("SWITCHBOARD SHUTDOWN TERMINATED");
}

public int rwiCacheSize() {
return webIndex.index().getBufferSize();
}

public boolean rwiCacheFlush() {
if (rwiCacheSize() == 0) return false;
webIndex.index().cleanupBuffer((int) ((this.getConfigLong(plasmaSwitchboardConstants.CACHEFLUSH_BUSYSLEEP, 10000) * this.getConfigLong("performanceIO", 10)) / 100));
return true;
}

public int queueSize() {
return webIndex.queuePreStack.size();
}
Expand Down
24 changes: 0 additions & 24 deletions source/de/anomic/plasma/plasmaSwitchboardConstants.java
Expand Up @@ -110,18 +110,6 @@ public final class plasmaSwitchboardConstants {
public static final String CRAWLJOB_REMOTE_TRIGGERED_CRAWL_METHOD_FREEMEM = null;
public static final String CRAWLJOB_REMOTE_TRIGGERED_CRAWL_IDLESLEEP = "62_remotetriggeredcrawl_idlesleep";
public static final String CRAWLJOB_REMOTE_TRIGGERED_CRAWL_BUSYSLEEP = "62_remotetriggeredcrawl_busysleep";
// 74_parsing
/**
* <p><code>public static final String <strong>INDEXER</strong> = "80_indexing"</code></p>
* <p>Name of the indexer thread, performing the actual indexing of a website</p>
*/
public static final String PARSER = "74_indexing";
public static final String PARSER_MEMPREREQ = "74_indexing_memprereq";
public static final String PARSER_IDLESLEEP = "74_indexing_idlesleep";
public static final String PARSER_BUSYSLEEP = "74_indexing_busysleep";
public static final String PARSER_METHOD_START = "deQueueProcess";
public static final String PARSER_METHOD_JOBCOUNT = "queueSize";
public static final String PARSER_METHOD_FREEMEM = "deQueueFreeMem";
// 80_indexing
/**
* <p><code>public static final String <strong>INDEXER</strong> = "80_indexing"</code></p>
Expand All @@ -135,18 +123,6 @@ public final class plasmaSwitchboardConstants {
public static final String INDEXER_METHOD_JOBCOUNT = "queueSize";
public static final String INDEXER_METHOD_FREEMEM = "deQueueFreeMem";
public static final String INDEXER_SLOTS = "indexer.slots";
// 85_cacheflush
/**
* the cache flush thread starts a flush of the RAM cache.
* This periodic flushing replaces the permanent flushing
*/
public static final String CACHEFLUSH = "85_cacheflush";
public static final String CACHEFLUSH_MEMPREREQ = "85_cacheflush_memprereq";
public static final String CACHEFLUSH_IDLESLEEP = "85_cacheflush_idlesleep";
public static final String CACHEFLUSH_BUSYSLEEP = "85_cacheflush_busysleep";
public static final String CACHEFLUSH_METHOD_START = "rwiCacheFlush";
public static final String CACHEFLUSH_METHOD_JOBCOUNT = "rwiCacheSize";
public static final String CACHEFLUSH_METHOD_FREEMEM = "deQueueFreeMem";
// 90_cleanup
/**
* <p><code>public static final String <strong>CLEANUP</strong> = "90_cleanup"</code></p>
Expand Down

0 comments on commit 9bfb264

Please sign in to comment.