Skip to content

Commit

Permalink
fix for concurrent quicksort: catch jobs from ThreadPoolExecutor that…
Browse files Browse the repository at this point in the history
… had been rejected because of full processing queues.

Non-catched jobs may have been the cause for blockings and freezes in case of overloading during strong processing

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6827 6c8d7289-2bf4-0310-a012-ef5d649a1542
  • Loading branch information
orbiter committed Apr 21, 2010
1 parent 8c40f1c commit 027b971
Showing 1 changed file with 71 additions and 14 deletions.
85 changes: 71 additions & 14 deletions source/net/yacy/kelondro/index/RowCollection.java
Expand Up @@ -35,6 +35,10 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import net.yacy.kelondro.logging.Log;
import net.yacy.kelondro.order.Base64Order;
Expand All @@ -60,8 +64,27 @@ public class RowCollection implements Iterable<Row.Entry>, Cloneable {
private static final int exp_order_bound = 4;
private static final int exp_collection = 5;

public static final ExecutorService sortingthreadexecutor = (availableCPU > 1) ? Executors.newCachedThreadPool(new NamePrefixThreadFactory("sorting")) : null;
private static final ExecutorService partitionthreadexecutor = (availableCPU > 1) ? Executors.newCachedThreadPool(new NamePrefixThreadFactory("partition")) : null;
public static final ExecutorService sortingthreadexecutor =
(availableCPU > 1)
? new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Integer.MAX_VALUE,
120L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new NamePrefixThreadFactory("sorting"),
new ThreadPoolExecutor.CallerRunsPolicy())
: null;

private static final ExecutorService partitionthreadexecutor =
(availableCPU > 1)
? new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors(),
Integer.MAX_VALUE,
120L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new NamePrefixThreadFactory("partition"),
new ThreadPoolExecutor.CallerRunsPolicy())
: null;

public final Row rowdef;
protected byte[] chunkcache;
Expand Down Expand Up @@ -588,19 +611,53 @@ public synchronized final void sort() {
this.chunkcount - p > isortlimit * 5
) {
// sort this using multi-threading
final Future<Integer> part0 = partitionthreadexecutor.submit(new partitionthread(this, 0, p, 0));
final Future<Integer> part1 = partitionthreadexecutor.submit(new partitionthread(this, p, this.chunkcount, p));
Future<Integer> part0, part1;
int p0 = -1, p1 = -1;
try {
part0 = partitionthreadexecutor.submit(new partitionthread(this, 0, p, 0));
} catch (RejectedExecutionException e) {
part0 = null;
try {p0 = new partitionthread(this, 0, p, 0).call().intValue();} catch (Exception ee) {}
}
try {
part1 = partitionthreadexecutor.submit(new partitionthread(this, p, this.chunkcount, p));
} catch (RejectedExecutionException e) {
part1 = null;
try {p1 = new partitionthread(this, p, this.chunkcount, p).call().intValue();} catch (Exception ee) {}
}
try {
final int p0 = part0.get().intValue();
final Future<Object> sort0 = sortingthreadexecutor.submit(new qsortthread(this, 0, p0, 0));
final Future<Object> sort1 = sortingthreadexecutor.submit(new qsortthread(this, p0, p, p0));
final int p1 = part1.get().intValue();
final Future<Object> sort2 = sortingthreadexecutor.submit(new qsortthread(this, p, p1, p));
final Future<Object> sort3 = sortingthreadexecutor.submit(new qsortthread(this, p1, this.chunkcount, p1));
sort0.get();
sort1.get();
sort2.get();
sort3.get();
if (part0 != null) p0 = part0.get().intValue();
Future<Object> sort0, sort1, sort2, sort3;
try {
sort0 = sortingthreadexecutor.submit(new qsortthread(this, 0, p0, 0));
} catch (RejectedExecutionException e) {
sort0 = null;
try {new qsortthread(this, 0, p0, 0).call();} catch (Exception ee) {}
}
try {
sort1 = sortingthreadexecutor.submit(new qsortthread(this, p0, p, p0));
} catch (RejectedExecutionException e) {
sort1 = null;
try {new qsortthread(this, p0, p, p0).call();} catch (Exception ee) {}
}
if (part1 != null) p1 = part1.get().intValue();
try {
sort2 = sortingthreadexecutor.submit(new qsortthread(this, p, p1, p));
} catch (RejectedExecutionException e) {
sort2 = null;
try {new qsortthread(this, p, p1, p).call();} catch (Exception ee) {}
}
try {
sort3 = sortingthreadexecutor.submit(new qsortthread(this, p1, this.chunkcount, p1));
} catch (RejectedExecutionException e) {
sort3 = null;
try {new qsortthread(this, p1, this.chunkcount, p1).call();} catch (Exception ee) {}
}
// wait for all results
if (sort0 != null) sort0.get();
if (sort1 != null) sort1.get();
if (sort2 != null) sort2.get();
if (sort3 != null) sort3.get();
} catch (final InterruptedException e) {
Log.logSevere("RowCollection", "", e);
} catch (final ExecutionException e) {
Expand Down

0 comments on commit 027b971

Please sign in to comment.