Skip to content

Commit

Permalink
fix for mediawikiIndex surrogate producer + added concurrency
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5880 6c8d7289-2bf4-0310-a012-ef5d649a1542
  • Loading branch information
orbiter committed Apr 25, 2009
1 parent 6f5ea7b commit 2e31861
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 17 deletions.
2 changes: 1 addition & 1 deletion source/de/anomic/plasma/plasmaParser.java
Expand Up @@ -544,7 +544,7 @@ public plasmaParserDocument parseSource(final yacyURL location, final String mim

// testing if the resource is not empty
if (sourceArray == null || sourceArray.length == 0) {
final String errorMsg = "No resource content available (1) " + ((sourceArray == null) ? "source == null" : "source.length() == 0");
final String errorMsg = "No resource content available (1) " + (((sourceArray == null) ? "source == null" : "source.length() == 0") + ", url = " + location.toNormalform(true, false));
theLogger.logInfo("Unable to parse '" + location + "'. " + errorMsg);
throw new ParserException(errorMsg,location, errorMsg);
}
Expand Down
48 changes: 32 additions & 16 deletions source/de/anomic/tools/mediawikiIndex.java
Expand Up @@ -441,18 +441,19 @@ record = in.take();
try {
record.genHTML();
record.genDocument();
out.put(record);
} catch (RuntimeException e) {
e.printStackTrace();
} catch (MalformedURLException e) {
e.printStackTrace();
} catch (ParserException e) {
e.printStackTrace();
}
out.put(record);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("*** convertConsumer has terminated");
return Integer.valueOf(0);
}

Expand Down Expand Up @@ -500,7 +501,7 @@ record = in.take();
osw.write("<?xml version=\"1.0\" encoding=\"utf-8\"?>\n<surrogates xmlns:dc=\"http://purl.org/dc/elements/1.1/\">\n");
}

System.out.println("Title: " + record.title);
System.out.println("[CONSUME] Title: " + record.title);
record.document.writeXML(osw, new Date());
rc++;
if (rc >= 10000) {
Expand All @@ -515,10 +516,7 @@ record = in.take();
osw.write("<?xml version=\"1.0\" encoding=\"utf-8\"?>\n<surrogates xmlns:dc=\"http://purl.org/dc/elements/1.1/\">\n");
}

osw.write("</surrogates>\n");
osw.close();
String finalfilename = targetstub + "." + fc + ".xml";
new File(targetdir, outputfilename).renameTo(new File(targetdir, finalfilename));

}
} catch (InterruptedException e) {
e.printStackTrace();
Expand All @@ -528,8 +526,17 @@ record = in.take();
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
osw.write("</surrogates>\n");
osw.close();
String finalfilename = targetstub + "." + fc + ".xml";
new File(targetdir, outputfilename).renameTo(new File(targetdir, finalfilename));
} catch (IOException e) {
e.printStackTrace();
}
}

System.out.println("*** convertWriter has terminated");
return Integer.valueOf(0);
}

Expand All @@ -546,20 +553,25 @@ public static void convert(File sourcefile, File targetdir, String urlStub) thro
if (b != 'Z') throw new IOException("Invalid bz2 content.");
is = new CBZip2InputStream(is);
}
BufferedReader r = new BufferedReader(new java.io.InputStreamReader(is, "UTF-8"));
BufferedReader r = new BufferedReader(new java.io.InputStreamReader(is, "UTF-8"), 10 * 1024 * 1024);
String t;
StringBuilder sb = new StringBuilder();
boolean page = false, text = false;
String title = null;
plasmaParser.initHTMLParsableMimeTypes("text/html");
plasmaParser.initParseableMimeTypes(plasmaParser.PARSER_MODE_CRAWLER, "text/html");
mediawikiIndex mi = new mediawikiIndex(urlStub);
BlockingQueue<wikiparserrecord> in = new ArrayBlockingQueue<wikiparserrecord>(10);
BlockingQueue<wikiparserrecord> out = new ArrayBlockingQueue<wikiparserrecord>(10);
wikiparserrecord poison = mi.newRecord();
ExecutorService service = Executors.newFixedThreadPool(2);
convertConsumer consumer = new convertConsumer(in, out, poison);
Future<Integer> consumerResult = service.submit(consumer);
int threads = Math.max(1, Runtime.getRuntime().availableProcessors() - 1);
BlockingQueue<wikiparserrecord> in = new ArrayBlockingQueue<wikiparserrecord>(threads * 10);
BlockingQueue<wikiparserrecord> out = new ArrayBlockingQueue<wikiparserrecord>(threads * 10);
ExecutorService service = Executors.newFixedThreadPool(threads + 1);
convertConsumer[] consumers = new convertConsumer[threads];
Future<Integer>[] consumerResults = new Future[threads];
for (int i = 0; i < threads; i++) {
consumers[i] = new convertConsumer(in, out, poison);
consumerResults[i] = service.submit(consumers[i]);
}
convertWriter writer = new convertWriter(out, poison, targetdir, targetstub);
Future<Integer> writerResult = service.submit(writer);

Expand All @@ -575,7 +587,7 @@ public static void convert(File sourcefile, File targetdir, String urlStub) thro
}
if (t.indexOf(textend) >= 0) {
text = false;
System.out.println("Title: " + title);
System.out.println("[INJECT] Title: " + title);
record = mi.newRecord(title, sb);
try {
in.put(record);
Expand Down Expand Up @@ -603,8 +615,12 @@ record = mi.newRecord(title, sb);
r.close();

try {
in.put(poison);
consumerResult.get(10000, TimeUnit.MILLISECONDS);
for (int i = 0; i < threads; i++) {
in.put(poison);
}
for (int i = 0; i < threads; i++) {
consumerResults[i].get(10000, TimeUnit.MILLISECONDS);
}
out.put(poison);
writerResult.get(10000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e1) {
Expand Down

0 comments on commit 2e31861

Please sign in to comment.