From 027b971bde409261b1cded66f8d1e26a7e3907dd Mon Sep 17 00:00:00 2001 From: orbiter Date: Wed, 21 Apr 2010 13:44:59 +0000 Subject: [PATCH] fix for concurrent quicksort: catch jobs from ThreadPoolExecutor that had been rejected because of full processing queues. Non-catched jobs may have been the cause for blockings and freezes in case of overloading during strong processing git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6827 6c8d7289-2bf4-0310-a012-ef5d649a1542 --- .../yacy/kelondro/index/RowCollection.java | 85 ++++++++++++++++--- 1 file changed, 71 insertions(+), 14 deletions(-) diff --git a/source/net/yacy/kelondro/index/RowCollection.java b/source/net/yacy/kelondro/index/RowCollection.java index 45d4ceb91d..82ec3bd639 100644 --- a/source/net/yacy/kelondro/index/RowCollection.java +++ b/source/net/yacy/kelondro/index/RowCollection.java @@ -35,6 +35,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import net.yacy.kelondro.logging.Log; import net.yacy.kelondro.order.Base64Order; @@ -60,8 +64,27 @@ public class RowCollection implements Iterable, Cloneable { private static final int exp_order_bound = 4; private static final int exp_collection = 5; - public static final ExecutorService sortingthreadexecutor = (availableCPU > 1) ? Executors.newCachedThreadPool(new NamePrefixThreadFactory("sorting")) : null; - private static final ExecutorService partitionthreadexecutor = (availableCPU > 1) ? Executors.newCachedThreadPool(new NamePrefixThreadFactory("partition")) : null; + public static final ExecutorService sortingthreadexecutor = + (availableCPU > 1) + ? new ThreadPoolExecutor( + Runtime.getRuntime().availableProcessors(), + Integer.MAX_VALUE, + 120L, TimeUnit.SECONDS, + new SynchronousQueue(), + new NamePrefixThreadFactory("sorting"), + new ThreadPoolExecutor.CallerRunsPolicy()) + : null; + + private static final ExecutorService partitionthreadexecutor = + (availableCPU > 1) + ? new ThreadPoolExecutor( + Runtime.getRuntime().availableProcessors(), + Integer.MAX_VALUE, + 120L, TimeUnit.SECONDS, + new SynchronousQueue(), + new NamePrefixThreadFactory("partition"), + new ThreadPoolExecutor.CallerRunsPolicy()) + : null; public final Row rowdef; protected byte[] chunkcache; @@ -588,19 +611,53 @@ public synchronized final void sort() { this.chunkcount - p > isortlimit * 5 ) { // sort this using multi-threading - final Future part0 = partitionthreadexecutor.submit(new partitionthread(this, 0, p, 0)); - final Future part1 = partitionthreadexecutor.submit(new partitionthread(this, p, this.chunkcount, p)); + Future part0, part1; + int p0 = -1, p1 = -1; + try { + part0 = partitionthreadexecutor.submit(new partitionthread(this, 0, p, 0)); + } catch (RejectedExecutionException e) { + part0 = null; + try {p0 = new partitionthread(this, 0, p, 0).call().intValue();} catch (Exception ee) {} + } + try { + part1 = partitionthreadexecutor.submit(new partitionthread(this, p, this.chunkcount, p)); + } catch (RejectedExecutionException e) { + part1 = null; + try {p1 = new partitionthread(this, p, this.chunkcount, p).call().intValue();} catch (Exception ee) {} + } try { - final int p0 = part0.get().intValue(); - final Future sort0 = sortingthreadexecutor.submit(new qsortthread(this, 0, p0, 0)); - final Future sort1 = sortingthreadexecutor.submit(new qsortthread(this, p0, p, p0)); - final int p1 = part1.get().intValue(); - final Future sort2 = sortingthreadexecutor.submit(new qsortthread(this, p, p1, p)); - final Future sort3 = sortingthreadexecutor.submit(new qsortthread(this, p1, this.chunkcount, p1)); - sort0.get(); - sort1.get(); - sort2.get(); - sort3.get(); + if (part0 != null) p0 = part0.get().intValue(); + Future sort0, sort1, sort2, sort3; + try { + sort0 = sortingthreadexecutor.submit(new qsortthread(this, 0, p0, 0)); + } catch (RejectedExecutionException e) { + sort0 = null; + try {new qsortthread(this, 0, p0, 0).call();} catch (Exception ee) {} + } + try { + sort1 = sortingthreadexecutor.submit(new qsortthread(this, p0, p, p0)); + } catch (RejectedExecutionException e) { + sort1 = null; + try {new qsortthread(this, p0, p, p0).call();} catch (Exception ee) {} + } + if (part1 != null) p1 = part1.get().intValue(); + try { + sort2 = sortingthreadexecutor.submit(new qsortthread(this, p, p1, p)); + } catch (RejectedExecutionException e) { + sort2 = null; + try {new qsortthread(this, p, p1, p).call();} catch (Exception ee) {} + } + try { + sort3 = sortingthreadexecutor.submit(new qsortthread(this, p1, this.chunkcount, p1)); + } catch (RejectedExecutionException e) { + sort3 = null; + try {new qsortthread(this, p1, this.chunkcount, p1).call();} catch (Exception ee) {} + } + // wait for all results + if (sort0 != null) sort0.get(); + if (sort1 != null) sort1.get(); + if (sort2 != null) sort2.get(); + if (sort3 != null) sort3.get(); } catch (final InterruptedException e) { Log.logSevere("RowCollection", "", e); } catch (final ExecutionException e) {