Skip to content

Commit

Permalink
refactoring and enhanced concurrency
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@7155 6c8d7289-2bf4-0310-a012-ef5d649a1542
  • Loading branch information
orbiter committed Sep 15, 2010
1 parent 83ac078 commit 0cf0068
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 146 deletions.
110 changes: 47 additions & 63 deletions source/de/anomic/search/ReferenceOrder.java
Expand Up @@ -64,9 +64,10 @@ public ReferenceOrder(final RankingProfile profile, String language) {
}

public BlockingQueue<WordReferenceVars> normalizeWith(final ReferenceContainer<WordReference> container) {
BlockingQueue<WordReferenceVars> vars = WordReferenceVars.transform(container);
LinkedBlockingQueue<WordReferenceVars> out = new LinkedBlockingQueue<WordReferenceVars>();
Thread distributor = new NormalizeDistributor(vars, out);
int threads = cores + 1;
if (container.size() < 20) threads = 2;
Thread distributor = new NormalizeDistributor(container, out, threads);
distributor.start();

// return the resulting queue while the processing queues are still working
Expand All @@ -75,21 +76,25 @@ public BlockingQueue<WordReferenceVars> normalizeWith(final ReferenceContainer<W

public class NormalizeDistributor extends Thread {

BlockingQueue<WordReferenceVars> vars;
ReferenceContainer<WordReference> container;
LinkedBlockingQueue<WordReferenceVars> out;
private int threads;

public NormalizeDistributor(BlockingQueue<WordReferenceVars> vars, LinkedBlockingQueue<WordReferenceVars> out) {
this.vars = vars;
public NormalizeDistributor(ReferenceContainer<WordReference> container, LinkedBlockingQueue<WordReferenceVars> out, int threads) {
this.container = container;
this.out = out;
this.threads = threads;
}

@Override
public void run() {
// transform the reference container into a stream of parsed entries
BlockingQueue<WordReferenceVars> vars = WordReferenceVars.transform(container);

// start the transformation threads
int cores0 = cores + 1;
Semaphore termination = new Semaphore(cores0);
NormalizeWorker[] worker = new NormalizeWorker[cores0];
for (int i = 0; i < cores0; i++) {
Semaphore termination = new Semaphore(this.threads);
NormalizeWorker[] worker = new NormalizeWorker[this.threads];
for (int i = 0; i < this.threads; i++) {
worker[i] = new NormalizeWorker(out, termination);
worker[i].start();
}
Expand All @@ -99,17 +104,20 @@ public void run() {
int p = 0;
try {
while ((iEntry = vars.take()) != WordReferenceVars.poison) {
worker[p % cores0].add(iEntry);
worker[p % this.threads].add(iEntry);
p++;
}
} catch (InterruptedException e) {
}

// insert poison to stop the queues
for (int i = 0; i < cores0; i++) worker[i].add(WordReferenceVars.poison);
for (int i = 0; i < this.threads; i++) worker[i].add(WordReferenceVars.poison);
}
}

/**
* normalize ranking: find minimum and maximum of separate ranking criteria
*/
public class NormalizeWorker extends Thread {

private final BlockingQueue<WordReferenceVars> out;
Expand All @@ -131,7 +139,34 @@ public void add(WordReferenceVars entry) {

public void run() {
try {
addNormalizer(decodedEntries, out);
WordReferenceVars iEntry;
Map<String, Integer> doms0 = new HashMap<String, Integer>();
String dom;
Integer count;
final Integer int1 = 1;
while ((iEntry = decodedEntries.take()) != WordReferenceVars.poison) {
out.put(iEntry);
// find min/max
if (min == null) min = iEntry.clone(); else min.min(iEntry);
if (max == null) max = iEntry.clone(); else max.max(iEntry);
// update domcount
dom = new String(iEntry.metadataHash(), 6, 6);
count = doms0.get(dom);
if (count == null) {
doms0.put(dom, int1);
} else {
doms0.put(dom, Integer.valueOf(count.intValue() + 1));
}
}

// update domain score
Map.Entry<String, Integer> entry;
final Iterator<Map.Entry<String, Integer>> di = doms0.entrySet().iterator();
while (di.hasNext()) {
entry = di.next();
doms.addScore(entry.getKey(), (entry.getValue()).intValue());
}
if (!doms.isEmpty()) maxdomcount = doms.getMaxScore();
} catch (InterruptedException e) {
Log.logException(e);
} catch (Exception e) {
Expand All @@ -146,57 +181,6 @@ public void run() {
}
}

/**
* normalize ranking: find minimum and maximum of separate ranking criteria
* @param decodedEntries
* @param out
* @throws InterruptedException
*/
public void addNormalizer(BlockingQueue<WordReferenceVars> decodedEntries, final BlockingQueue<WordReferenceVars> out) throws InterruptedException {
WordReferenceVars iEntry;
Map<String, Integer> doms0 = new HashMap<String, Integer>();
String dom;
Integer count;
final Integer int1 = 1;
while ((iEntry = decodedEntries.take()) != WordReferenceVars.poison) {
out.put(iEntry);
// find min/max
if (min == null) min = iEntry.clone(); else min.min(iEntry);
if (max == null) max = iEntry.clone(); else max.max(iEntry);
// update domcount
dom = new String(iEntry.metadataHash(), 6, 6);
count = doms0.get(dom);
if (count == null) {
doms0.put(dom, int1);
} else {
doms0.put(dom, Integer.valueOf(count.intValue() + 1));
}
}

// update domain score
Map.Entry<String, Integer> entry;
final Iterator<Map.Entry<String, Integer>> di = doms0.entrySet().iterator();
while (di.hasNext()) {
entry = di.next();
doms.addScore(entry.getKey(), (entry.getValue()).intValue());
}
if (!doms.isEmpty()) this.maxdomcount = doms.getMaxScore();
}

public void addNormalizer(WordReferenceVars iEntry, final BlockingQueue<WordReferenceVars> out) throws InterruptedException {
out.put(iEntry);

// find min/max
if (min == null) min = iEntry.clone(); else min.min(iEntry);
if (max == null) max = iEntry.clone(); else max.max(iEntry);

// update domcount
String dom = new String(iEntry.metadataHash(), 6, 6);
doms.incScore(dom);

if (!doms.isEmpty()) this.maxdomcount = doms.getMaxScore();
}

public int authority(final byte[] urlHash) {
return (doms.getScore(new String(urlHash, 6, 6)) << 8) / (1 + this.maxdomcount);
}
Expand Down
4 changes: 2 additions & 2 deletions source/de/anomic/search/ResultFetcher.java
Expand Up @@ -357,10 +357,10 @@ private int fillImagesCache() {
public ArrayList<ReverseElement<ResultEntry>> completeResults(final long waitingtime) {
final long timeout = System.currentTimeMillis() + waitingtime;
while ((result.sizeAvailable() < query.neededResults()) && (anyWorkerAlive()) && (System.currentTimeMillis() < timeout)) {
try {Thread.sleep(100);} catch (final InterruptedException e) {}
try {Thread.sleep(20);} catch (final InterruptedException e) {}
//System.out.println("+++DEBUG-completeResults+++ sleeping " + 200);
}
return this.result.list(this.result.sizeAvailable());
return this.result.list(Math.min(query.neededResults(), this.result.sizeAvailable()));
}

public long postRanking(
Expand Down
137 changes: 64 additions & 73 deletions source/de/anomic/yacy/yacyClient.java
Expand Up @@ -401,7 +401,7 @@ public static int search(
final long timestamp = System.currentTimeMillis();
SearchResult result;
try {
result = searchClient(
result = new SearchResult(
yacyNetwork.basicRequestParts(Switchboard.getSwitchboard(), target.hash, crypt.randomSalt()),
mySeed, wordhashes, excludehashes, urlhashes, prefer, filter, language,
sitehash, authorhash, count, maxDistance, global, partitions, target.getHexHash() + ".yacyh", target.getClusterAddress(),
Expand All @@ -411,7 +411,6 @@ public static int search(
//yacyCore.peerActions.peerDeparture(target, "search request to peer created io exception: " + e.getMessage());
return -1;
}
if (result == null) return -1;
// computation time
final long totalrequesttime = System.currentTimeMillis() - timestamp;

Expand Down Expand Up @@ -553,69 +552,6 @@ public static int search(
return result.urlcount;
}

public static SearchResult searchClient(
LinkedHashMap<String,ContentBody> parts,
final yacySeed mySeed,
final String wordhashes,
final String excludehashes,
final String urlhashes,
final Pattern prefer,
final Pattern filter,
final String language,
final String sitehash,
final String authorhash,
final int count,
final int maxDistance,
final boolean global,
final int partitions,
String hostname,
String hostaddress,
final SearchEvent.SecondarySearchSuperviser secondarySearchSuperviser,
final RankingProfile rankingProfile,
final Bitfield constraint
) throws IOException {
// send a search request to peer with remote Hash

// INPUT:
// iam : complete seed of the requesting peer
// youare : seed hash of the target peer, used for testing network stability
// key : transmission key for response
// search : a list of search words
// hsearch : a string of word hashes
// fwdep : forward depth. if "0" then peer may NOT ask another peer for more results
// fwden : forward deny, a list of seed hashes. They may NOT be target of forward hopping
// count : maximum number of wanted results
// global : if "true", then result may consist of answers from other peers
// partitions : number of remote peers that are asked (for evaluation of QPM)
// duetime : maximum time that a peer should spent to create a result

// send request
Map<String, String> resultMap = null;
parts.put("myseed", new StringBody((mySeed == null) ? "" : mySeed.genSeedStr(parts.get("key").toString())));
parts.put("count", new StringBody(Integer.toString(Math.max(10, count))));
parts.put("resource", new StringBody(((global) ? "global" : "local")));
parts.put("partitions", new StringBody(Integer.toString(partitions)));
parts.put("query", new StringBody(wordhashes));
parts.put("exclude", new StringBody(excludehashes));
parts.put("duetime", new StringBody("1000"));
parts.put("urls", new StringBody(urlhashes));
parts.put("prefer", new StringBody(prefer.toString()));
parts.put("filter", new StringBody(filter.toString()));
parts.put("language", new StringBody(language));
parts.put("sitehash", new StringBody(sitehash));
parts.put("authorhash", new StringBody(authorhash));
parts.put("ttl", new StringBody("0"));
parts.put("maxdist", new StringBody(Integer.toString(maxDistance)));
parts.put("profile", new StringBody(crypt.simpleEncode(rankingProfile.toExternalString())));
parts.put("constraint", new StringBody((constraint == null) ? "" : constraint.exportB64()));
if (secondarySearchSuperviser != null) parts.put("abstracts", new StringBody("auto"));
resultMap = FileUtils.table(HTTPConnector.getConnector(HTTPLoader.crawlerUserAgent).post(new MultiProtocolURI("http://" + hostaddress + "/yacy/search.html"), 60000, hostname, parts));
//resultMap = FileUtils.table(HTTPConnector.getConnector(HTTPLoader.crawlerUserAgent).post(new MultiProtocolURI("http://" + target.getClusterAddress() + "/yacy/search.html"), 60000, target.getHexHash() + ".yacyh", parts));

if (resultMap == null || resultMap.isEmpty()) throw new IOException("resultMap is NULL");
return new SearchResult(resultMap);
}

public static class SearchResult {

public String version; // version : application version of responder
Expand All @@ -631,11 +567,70 @@ public static class SearchResult {
public List<URIMetadataRow> links; // LURLs of search
public Map<byte[], String> indexabstract; // index abstracts, a collection of url-hashes per word

public SearchResult(Map<String, String> resultMap) throws IOException {
public SearchResult(
LinkedHashMap<String,ContentBody> parts,
final yacySeed mySeed,
final String wordhashes,
final String excludehashes,
final String urlhashes,
final Pattern prefer,
final Pattern filter,
final String language,
final String sitehash,
final String authorhash,
final int count,
final int maxDistance,
final boolean global,
final int partitions,
String hostname,
String hostaddress,
final SearchEvent.SecondarySearchSuperviser secondarySearchSuperviser,
final RankingProfile rankingProfile,
final Bitfield constraint) throws IOException {
// send a search request to peer with remote Hash

// INPUT:
// iam : complete seed of the requesting peer
// youare : seed hash of the target peer, used for testing network stability
// key : transmission key for response
// search : a list of search words
// hsearch : a string of word hashes
// fwdep : forward depth. if "0" then peer may NOT ask another peer for more results
// fwden : forward deny, a list of seed hashes. They may NOT be target of forward hopping
// count : maximum number of wanted results
// global : if "true", then result may consist of answers from other peers
// partitions : number of remote peers that are asked (for evaluation of QPM)
// duetime : maximum time that a peer should spent to create a result

// send request
Map<String, String> resultMap = null;
parts.put("myseed", new StringBody((mySeed == null) ? "" : mySeed.genSeedStr(parts.get("key").toString())));
parts.put("count", new StringBody(Integer.toString(Math.max(10, count))));
parts.put("resource", new StringBody(((global) ? "global" : "local")));
parts.put("partitions", new StringBody(Integer.toString(partitions)));
parts.put("query", new StringBody(wordhashes));
parts.put("exclude", new StringBody(excludehashes));
parts.put("duetime", new StringBody("1000"));
parts.put("urls", new StringBody(urlhashes));
parts.put("prefer", new StringBody(prefer.toString()));
parts.put("filter", new StringBody(filter.toString()));
parts.put("language", new StringBody(language));
parts.put("sitehash", new StringBody(sitehash));
parts.put("authorhash", new StringBody(authorhash));
parts.put("ttl", new StringBody("0"));
parts.put("maxdist", new StringBody(Integer.toString(maxDistance)));
parts.put("profile", new StringBody(crypt.simpleEncode(rankingProfile.toExternalString())));
parts.put("constraint", new StringBody((constraint == null) ? "" : constraint.exportB64()));
if (secondarySearchSuperviser != null) parts.put("abstracts", new StringBody("auto"));
resultMap = FileUtils.table(HTTPConnector.getConnector(HTTPLoader.crawlerUserAgent).post(new MultiProtocolURI("http://" + hostaddress + "/yacy/search.html"), 60000, hostname, parts));
//resultMap = FileUtils.table(HTTPConnector.getConnector(HTTPLoader.crawlerUserAgent).post(new MultiProtocolURI("http://" + target.getClusterAddress() + "/yacy/search.html"), 60000, target.getHexHash() + ".yacyh", parts));

// evaluate request result
if (resultMap == null || resultMap.isEmpty()) throw new IOException("resultMap is NULL");
try {
this.searchtime = Integer.parseInt(resultMap.get("searchtime"));
} catch (final NumberFormatException e) {
throw new IOException("wrong output format for searchtime: " + e.getMessage());
throw new IOException("wrong output format for searchtime: " + e.getMessage() + ", map = " + resultMap.toString());
}
try {
this.joincount = Integer.parseInt(resultMap.get("joincount")); // the complete number of hits at remote site
Expand Down Expand Up @@ -1104,7 +1099,7 @@ public static void main(final String[] args) {
long time = System.currentTimeMillis();
SearchResult result;
try {
result = searchClient(
result = new SearchResult(
yacyNetwork.basicRequestParts((String) null, (String) null, "freeworld"),
null, // sb.peers.mySeed(),
new String(wordhashe),
Expand All @@ -1124,13 +1119,9 @@ public static void main(final String[] args) {
new RankingProfile(ContentDomain.TEXT), // rankingProfile,
null // constraint);
);
if (result == null) {
System.out.println("no response");
} else {
for (URIMetadataRow link: result.links) {
for (URIMetadataRow link: result.links) {
System.out.println(link.metadata().url().toNormalform(true, false));
System.out.println(link.snippet());
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
Expand Down

0 comments on commit 0cf0068

Please sign in to comment.