Skip to content

Commit

Permalink
more multithreading support:
Browse files Browse the repository at this point in the history
- replaced some synchronized classes by classes from util.concurrent
- used a util.concurrent.SynchronousQueue to implement a persistent sorting thread in
  the very basic kelondroRowCollection which supports sorting with a second thread
  in case that a double-core processing CPU is used

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@4517 6c8d7289-2bf4-0310-a012-ef5d649a1542
  • Loading branch information
orbiter committed Feb 27, 2008
1 parent 6779b45 commit 1dce2f1
Show file tree
Hide file tree
Showing 13 changed files with 127 additions and 44 deletions.
3 changes: 2 additions & 1 deletion htroot/Status.java
Expand Up @@ -58,6 +58,7 @@
import de.anomic.server.serverDomains;
import de.anomic.server.serverMemory;
import de.anomic.server.serverObjects;
import de.anomic.server.serverProcessor;
import de.anomic.server.serverSwitch;
import de.anomic.tools.yFormatter;
import de.anomic.yacy.yacyCore;
Expand Down Expand Up @@ -293,7 +294,7 @@ else if (jobType.equals("remoteTriggeredCrawl"))
prop.put("freeMemory", serverMemory.bytesToString(rt.freeMemory()));
prop.put("totalMemory", serverMemory.bytesToString(rt.totalMemory()));
prop.put("maxMemory", serverMemory.bytesToString(rt.maxMemory()));
prop.put("processors", rt.availableProcessors());
prop.put("processors", serverProcessor.availableCPU);

// proxy traffic
//prop.put("trafficIn",bytesToString(httpdByteCountInputStream.getGlobalCount()));
Expand Down
3 changes: 2 additions & 1 deletion htroot/xml/status_p.java
Expand Up @@ -44,6 +44,7 @@
import de.anomic.http.httpdByteCountOutputStream;
import de.anomic.plasma.plasmaSwitchboard;
import de.anomic.server.serverObjects;
import de.anomic.server.serverProcessor;
import de.anomic.server.serverSwitch;
import de.anomic.yacy.yacyCore;

Expand Down Expand Up @@ -75,7 +76,7 @@ public static serverObjects respond(httpHeader header, serverObjects post, serve
prop.putNum("freeMemory", rt.freeMemory());
prop.putNum("totalMemory", rt.totalMemory());
prop.putNum("maxMemory", rt.maxMemory());
prop.putNum("processors", rt.availableProcessors());
prop.putNum("processors", serverProcessor.availableCPU);

// proxy traffic
prop.put("trafficIn", httpdByteCountInputStream.getGlobalCount());
Expand Down
5 changes: 2 additions & 3 deletions source/de/anomic/index/indexRWIEntryOrder.java
Expand Up @@ -36,6 +36,7 @@
import de.anomic.plasma.plasmaCondenser;
import de.anomic.plasma.plasmaSearchRankingProcess;
import de.anomic.plasma.plasmaSearchRankingProfile;
import de.anomic.server.serverProcessor;
import de.anomic.yacy.yacyURL;

public class indexRWIEntryOrder {
Expand All @@ -44,8 +45,6 @@ public class indexRWIEntryOrder {
private kelondroMScoreCluster<String> doms; // collected for "authority" heuristic
private int maxdomcount;

private static final int processors = Runtime.getRuntime().availableProcessors(); // for multiprocessor support, used during normalization

public indexRWIEntryOrder(plasmaSearchRankingProfile profile) {
this.min = null;
this.max = null;
Expand All @@ -60,7 +59,7 @@ public ArrayList<indexRWIVarEntry> normalizeWith(indexContainer container) {
ArrayList<indexRWIVarEntry> result = null;

//long s0 = System.currentTimeMillis();
if ((processors > 1) && (container.size() > 600)) {
if ((serverProcessor.useCPU > 1) && (container.size() > 600)) {
// run minmax with two threads
int middle = container.size() / 2;
minmaxfinder mmf0 = new minmaxfinder(container, 0, middle);
Expand Down
79 changes: 63 additions & 16 deletions source/de/anomic/kelondro/kelondroRowCollection.java
Expand Up @@ -31,17 +31,29 @@
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.SynchronousQueue;

import de.anomic.server.serverFileUtils;
import de.anomic.server.serverMemory;
import de.anomic.server.serverProcessor;
import de.anomic.server.logging.serverLog;
import de.anomic.yacy.yacySeedDB;

public class kelondroRowCollection {

public static final double growfactor = 1.4;
private static final int isortlimit = 20;
private static final Integer dummy = new Integer(0);

public static final qsortthread sortingthread;
static {
if (serverProcessor.useCPU > 1) {
sortingthread = new qsortthread();
sortingthread.start();
} else {
sortingthread = null;
}
}

protected byte[] chunkcache;
protected int chunkcount;
Expand All @@ -57,8 +69,6 @@ public class kelondroRowCollection {
private static final int exp_order_bound = 5;
private static final int exp_collection = 6;

private static int processors = Runtime.getRuntime().availableProcessors();

public kelondroRowCollection(kelondroRowCollection rc) {
this.rowdef = rc.rowdef;
this.chunkcache = rc.chunkcache;
Expand Down Expand Up @@ -465,12 +475,11 @@ public synchronized final void sort() {
}
byte[] swapspace = new byte[this.rowdef.objectsize];
int p = partition(0, this.chunkcount, this.sortBound, swapspace);
if ((processors > 1) && (this.chunkcount >= 10000)) {
// sort this using multi-threading; use one second thread
qsortthread qs = new qsortthread(0, p, 0);
qs.start();
if ((sortingthread != null) && (p > 50) && (sortingthread.isAlive())) {
// sort this using multi-threading
sortingthread.process(this, 0, p, 0);
qsort(p, this.chunkcount, 0, swapspace);
try {qs.join();} catch (InterruptedException e) {e.printStackTrace();}
sortingthread.waitFinish();
} else {
qsort(0, p, 0, swapspace);
qsort(p, this.chunkcount, 0, swapspace);
Expand All @@ -479,18 +488,56 @@ public synchronized final void sort() {
//assert this.isSorted();
}

private class qsortthread extends Thread {
private int sl, sr, sb;
public qsortthread(int L, int R, int S) {
this.sl = L;
this.sr = R;
this.sb = S;
public static class qsortthread extends Thread {
private boolean terminate;
private SynchronousQueue<qsortobject> startObject;
private SynchronousQueue<Integer> finishObject;
public qsortthread() {
this.terminate = false;
this.startObject = new SynchronousQueue<qsortobject>();
this.finishObject = new SynchronousQueue<Integer>();
this.setName("kelondroRowCollection SORT THREAD");
}
public void process(kelondroRowCollection rc, int L, int R, int S) {
assert rc != null;
synchronized (startObject) {
try {this.startObject.put(new qsortobject(rc, L, R, S));} catch (InterruptedException e) {}
}
}
public void waitFinish() {
try {this.finishObject.take();} catch (InterruptedException e) {}
}
public void terminate() {
this.terminate = true;
this.interrupt();
}
public void run() {
qsort(sl, sr, sb, new byte[rowdef.objectsize]);
qsortobject so = null;
while (!terminate) {
try {so = this.startObject.take();} catch (InterruptedException e) {
break;
}
assert so != null;
so.rc.qsort(so.sl, so.sr, so.sb, new byte[so.rc.rowdef.objectsize]);
try {this.finishObject.put(dummy);} catch (InterruptedException e1) {
break;
}
so = null;
}
}
}

private static class qsortobject {
protected kelondroRowCollection rc;
protected int sl, sr, sb;
public qsortobject(kelondroRowCollection rc, int L, int R, int S) {
this.rc = rc;
this.sl = L;
this.sr = R;
this.sb = S;
}
}

private final void qsort(int L, int R, int S, byte[] swapspace) {
if (R - L < isortlimit) {
isort(L, R, swapspace);
Expand Down Expand Up @@ -790,11 +837,11 @@ public static void test(int testsize) {
}
long t2 = System.currentTimeMillis();
System.out.println("copy c -> d: " + (t2 - t1) + " milliseconds, " + d(testsize, (t2 - t1)) + " entries/millisecond");
processors = 1;
serverProcessor.useCPU = 1;
c.sort();
long t3 = System.currentTimeMillis();
System.out.println("sort c (1) : " + (t3 - t2) + " milliseconds, " + d(testsize, (t3 - t2)) + " entries/millisecond");
processors = 2;
serverProcessor.useCPU = 2;
d.sort();
long t4 = System.currentTimeMillis();
System.out.println("sort d (2) : " + (t4 - t3) + " milliseconds, " + d(testsize, (t4 - t3)) + " entries/millisecond");
Expand Down
2 changes: 1 addition & 1 deletion source/de/anomic/kelondro/kelondroSortStack.java
Expand Up @@ -53,7 +53,7 @@ public int size() {
return this.onstack.size();
}

public synchronized void push(stackElement se) {
public void push(stackElement se) {
push(se.element, se.weight);
}

Expand Down
2 changes: 1 addition & 1 deletion source/de/anomic/plasma/plasmaSearchAPI.java
Expand Up @@ -90,7 +90,7 @@ public static void listHosts(serverObjects prop, String startHash) {

public static plasmaSearchRankingProcess genSearchresult(serverObjects prop, plasmaSwitchboard sb, String keyhash, kelondroBitfield filter, int sortorder) {
plasmaSearchQuery query = new plasmaSearchQuery(keyhash, -1, sb.getRanking(), filter);
plasmaSearchRankingProcess ranked = new plasmaSearchRankingProcess(sb.wordIndex, query, sortorder, Integer.MAX_VALUE);
plasmaSearchRankingProcess ranked = new plasmaSearchRankingProcess(sb.wordIndex, query, sortorder, Integer.MAX_VALUE, 1);
ranked.execQuery();

if (ranked.filteredCount() == 0) {
Expand Down
4 changes: 2 additions & 2 deletions source/de/anomic/plasma/plasmaSearchEvent.java
Expand Up @@ -123,7 +123,7 @@ private plasmaSearchEvent(plasmaSearchQuery query,
if ((query.domType == plasmaSearchQuery.SEARCHDOM_GLOBALDHT) ||
(query.domType == plasmaSearchQuery.SEARCHDOM_CLUSTERALL)) {
// do a global search
this.rankedCache = new plasmaSearchRankingProcess(wordIndex, query, 2, max_results_preparation);
this.rankedCache = new plasmaSearchRankingProcess(wordIndex, query, 2, max_results_preparation, 16);

int fetchpeers = 30;

Expand Down Expand Up @@ -156,7 +156,7 @@ private plasmaSearchEvent(plasmaSearchQuery query,
serverLog.logFine("SEARCH_EVENT", "SEARCH TIME AFTER GLOBAL-TRIGGER TO " + primarySearchThreads.length + " PEERS: " + ((System.currentTimeMillis() - start) / 1000) + " seconds");
} else {
// do a local search
this.rankedCache = new plasmaSearchRankingProcess(wordIndex, query, 2, max_results_preparation);
this.rankedCache = new plasmaSearchRankingProcess(wordIndex, query, 2, max_results_preparation, 2);
this.rankedCache.execQuery();
//plasmaWordIndex.Finding finding = wordIndex.retrieveURLs(query, false, 2, ranking, process);

Expand Down
11 changes: 6 additions & 5 deletions source/de/anomic/plasma/plasmaSearchRankingProcess.java
Expand Up @@ -34,6 +34,7 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;

import de.anomic.htmlFilter.htmlFilterContentScraper;
import de.anomic.index.indexContainer;
Expand Down Expand Up @@ -62,14 +63,14 @@ public final class plasmaSearchRankingProcess {
private int maxentries;
private int remote_peerCount, remote_indexCount, remote_resourceSize, local_resourceSize;
private indexRWIEntryOrder order;
private HashMap<String, Integer> urlhashes; // map for double-check; String/Long relation, addresses ranking number (backreference for deletion)
private ConcurrentHashMap<String, Integer> urlhashes; // map for double-check; String/Long relation, addresses ranking number (backreference for deletion)
private kelondroMScoreCluster<String> ref; // reference score computation for the commonSense heuristic
private int[] flagcount; // flag counter
private TreeSet<String> misses; // contains url-hashes that could not been found in the LURL-DB
private plasmaWordIndex wordIndex;
private Map<String, indexContainer>[] localSearchContainerMaps;
private HashMap<String, indexContainer>[] localSearchContainerMaps;

public plasmaSearchRankingProcess(plasmaWordIndex wordIndex, plasmaSearchQuery query, int sortorder, int maxentries) {
public plasmaSearchRankingProcess(plasmaWordIndex wordIndex, plasmaSearchQuery query, int sortorder, int maxentries, int concurrency) {
// we collect the urlhashes and construct a list with urlEntry objects
// attention: if minEntries is too high, this method will not terminate within the maxTime
// sortorder: 0 = hash, 1 = url, 2 = ranking
Expand All @@ -84,7 +85,7 @@ public plasmaSearchRankingProcess(plasmaWordIndex wordIndex, plasmaSearchQuery q
this.remote_indexCount = 0;
this.remote_resourceSize = 0;
this.local_resourceSize = 0;
this.urlhashes = new HashMap<String, Integer>();
this.urlhashes = new ConcurrentHashMap<String, Integer>(0, 0.75f, concurrency);
this.ref = new kelondroMScoreCluster<String>();
this.misses = new TreeSet<String>();
this.wordIndex = wordIndex;
Expand Down Expand Up @@ -262,7 +263,7 @@ private boolean testFlags(indexRWIEntry ientry) {
return false;
}

public synchronized Map<String, indexContainer>[] searchContainerMaps() {
public Map<String, indexContainer>[] searchContainerMaps() {
// direct access to the result maps is needed for abstract generation
// this is only available if execQuery() was called before
return localSearchContainerMaps;
Expand Down
16 changes: 8 additions & 8 deletions source/de/anomic/plasma/plasmaWordIndex.java
Expand Up @@ -385,11 +385,11 @@ public indexContainer getContainer(String wordHash, Set<String> urlselection) {
return container;
}

public Map<String, indexContainer> getContainers(Set<String> wordHashes, Set<String> urlselection, boolean deleteIfEmpty, boolean interruptIfEmpty) {
public HashMap<String, indexContainer> getContainers(Set<String> wordHashes, Set<String> urlselection, boolean deleteIfEmpty, boolean interruptIfEmpty) {
// return map of wordhash:indexContainer

// retrieve entities that belong to the hashes
HashMap<String, indexContainer> containers = new HashMap<String, indexContainer>();
HashMap<String, indexContainer> containers = new HashMap<String, indexContainer>(wordHashes.size());
String singleHash;
indexContainer singleContainer;
Iterator<String> i = wordHashes.iterator();
Expand All @@ -402,30 +402,30 @@ public Map<String, indexContainer> getContainers(Set<String> wordHashes, Set<Str
singleContainer = getContainer(singleHash, urlselection);

// check result
if (((singleContainer == null) || (singleContainer.size() == 0)) && (interruptIfEmpty)) return new HashMap<String, indexContainer>();
if (((singleContainer == null) || (singleContainer.size() == 0)) && (interruptIfEmpty)) return new HashMap<String, indexContainer>(0);

containers.put(singleHash, singleContainer);
}
return containers;
}

@SuppressWarnings("unchecked")
public Map<String, indexContainer>[] localSearchContainers(plasmaSearchQuery query, Set<String> urlselection) {
public HashMap<String, indexContainer>[] localSearchContainers(plasmaSearchQuery query, Set<String> urlselection) {
// search for the set of hashes and return a map of of wordhash:indexContainer containing the seach result

// retrieve entities that belong to the hashes
Map<String, indexContainer> inclusionContainers = (query.queryHashes.size() == 0) ? new HashMap<String, indexContainer>() : getContainers(
HashMap<String, indexContainer> inclusionContainers = (query.queryHashes.size() == 0) ? new HashMap<String, indexContainer>(0) : getContainers(
query.queryHashes,
urlselection,
true,
true);
if ((inclusionContainers.size() != 0) && (inclusionContainers.size() < query.queryHashes.size())) inclusionContainers = new HashMap<String, indexContainer>(); // prevent that only a subset is returned
Map<String, indexContainer> exclusionContainers = (inclusionContainers.size() == 0) ? new HashMap<String, indexContainer>() : getContainers(
if ((inclusionContainers.size() != 0) && (inclusionContainers.size() < query.queryHashes.size())) inclusionContainers = new HashMap<String, indexContainer>(0); // prevent that only a subset is returned
HashMap<String, indexContainer> exclusionContainers = (inclusionContainers.size() == 0) ? new HashMap<String, indexContainer>(0) : getContainers(
query.excludeHashes,
urlselection,
true,
true);
return new Map[]{inclusionContainers, exclusionContainers};
return new HashMap[]{inclusionContainers, exclusionContainers};
}

public int size() {
Expand Down
4 changes: 2 additions & 2 deletions source/de/anomic/server/serverDomains.java
Expand Up @@ -28,21 +28,21 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import de.anomic.kelondro.kelondroMScoreCluster;
import de.anomic.plasma.plasmaSwitchboard;

public class serverDomains {

// a dns cache
private static final Map<String, InetAddress> nameCacheHit = Collections.synchronizedMap(new HashMap<String, InetAddress>()); // a not-synchronized map resulted in deadlocks
private static final Map<String, InetAddress> nameCacheHit = new ConcurrentHashMap<String, InetAddress>(); // a not-synchronized map resulted in deadlocks
private static final Set<String> nameCacheMiss = Collections.synchronizedSet(new HashSet<String>());
private static final kelondroMScoreCluster<String> nameCacheHitAges = new kelondroMScoreCluster<String>();
private static final kelondroMScoreCluster<String> nameCacheMissAges = new kelondroMScoreCluster<String>();
Expand Down
33 changes: 33 additions & 0 deletions source/de/anomic/server/serverProcessor.java
@@ -0,0 +1,33 @@
// serverProcessor.java
// (C) 2008 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 27.02.2008 on http://yacy.net
//
// $LastChangedDate: 2006-04-02 22:40:07 +0200 (So, 02 Apr 2006) $
// $LastChangedRevision: 1986 $
// $LastChangedBy: orbiter $
//
// LICENSE
//
// This program is free software; you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 2 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA

package de.anomic.server;


public class serverProcessor {

public static final int availableCPU = Runtime.getRuntime().availableProcessors();
public static int useCPU = availableCPU;

}

0 comments on commit 1dce2f1

Please sign in to comment.