Skip to content

Commit

Permalink
redesign of index abstract processing - currently disabled until enou…
Browse files Browse the repository at this point in the history
…gh peers have fix in SVN 6928

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6929 6c8d7289-2bf4-0310-a012-ef5d649a1542
  • Loading branch information
orbiter committed Jun 18, 2010
1 parent 090eae2 commit fbf021b
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 54 deletions.
120 changes: 93 additions & 27 deletions source/de/anomic/search/SearchEvent.java
Expand Up @@ -70,8 +70,7 @@ public final class SearchEvent {
private RankingProcess rankedCache; // ordered search results, grows dynamically as all the query threads enrich this container
private ResultFetcher results;

// class variables for search abstracts
private final IndexAbstracts rcAbstracts; // cache for index abstracts; word:TreeMap mapping where the embedded TreeMap is a urlhash:peerlist relation
private final SecondarySearchSuperviser secondarySearchSuperviser;

// class variables for remote searches
private yacySearch[] primarySearchThreads, secondarySearchThreads;
Expand All @@ -93,7 +92,8 @@ public final class SearchEvent {
this.peers = peers;
this.crawlResults = crawlResults;
this.query = query;
this.rcAbstracts = (query.queryHashes.size() > 1) ? new IndexAbstracts() : null; // generate abstracts only for combined searches
this.secondarySearchSuperviser = (query.queryHashes.size() > 1) ? new SecondarySearchSuperviser() : null; // generate abstracts only for combined searches
//if (this.secondarySearchSuperviser != null) this.secondarySearchSuperviser.start();
this.primarySearchThreads = null;
this.secondarySearchThreads = null;
this.preselectedPeerHashes = preselectedPeerHashes;
Expand Down Expand Up @@ -134,7 +134,7 @@ public final class SearchEvent {
peers,
crawlResults,
rankedCache,
rcAbstracts,
secondarySearchSuperviser,
fetchpeers,
Switchboard.urlBlacklist,
query.ranking,
Expand Down Expand Up @@ -331,25 +331,91 @@ public ResultEntry oneResult(final int item) {

boolean secondarySearchStartet = false;

void prepareSecondarySearch() {
if (secondarySearchStartet) return; // don't do this twice
public class SecondarySearchSuperviser extends Thread {

if ((rcAbstracts == null) || (rcAbstracts.size() != query.queryHashes.size())) return; // secondary search not possible (yet)
this.secondarySearchStartet = true;
// cache for index abstracts; word:TreeMap mapping where the embedded TreeMap is a urlhash:peerlist relation
// this relation contains the information where specific urls can be found in specific peers
TreeMap<String, TreeMap<String, String>> abstractsCache;

/*
// catch up index abstracts and join them; then call peers again to submit their urls
System.out.println("DEBUG-INDEXABSTRACT: " + rcAbstracts.size() + " word references caught, " + query.queryHashes.size() + " needed");
Iterator i = rcAbstracts.entrySet().iterator();
Map.Entry entry;
while (i.hasNext()) {
entry = (Map.Entry) i.next();
System.out.println("DEBUG-INDEXABSTRACT: hash " + (String) entry.getKey() + ": " + ((query.queryHashes.contains((String) entry.getKey())) ? "NEEDED" : "NOT NEEDED") + "; " + ((TreeMap) entry.getValue()).size() + " entries");
public SecondarySearchSuperviser() {
this.abstractsCache = new TreeMap<String, TreeMap<String, String>>();
}

/**
* add a single abstract to the existing set of abstracts
* @param wordhash
* @param singleAbstract // a mapping from url-hashes to a string of peer-hashes
*/
final TreeMap<String, String> abstractJoin = (rcAbstracts.size() == query.queryHashes.size()) ? SetTools.joinConstructive(rcAbstracts.values(), true) : new TreeMap<String, String>();
if (!abstractJoin.isEmpty()) {
public void addAbstract(String wordhash, TreeMap<String, String> singleAbstract) {
synchronized (abstractsCache) {
TreeMap<String, String> oldAbstract = abstractsCache.get(wordhash);
if (oldAbstract == null) {
// new abstracts in the cache
abstractsCache.put(wordhash, singleAbstract);
} else {
// extend the abstracts in the cache: join the single abstracts
for (Map.Entry<String, String> oneref: singleAbstract.entrySet()) {
String urlhash = oneref.getKey();
String peerlistNew = oneref.getValue();
String peerlistOld = oldAbstract.get(urlhash);
if (peerlistOld == null) {
oldAbstract.put(urlhash, peerlistNew);
} else {
oldAbstract.put(urlhash, peerlistOld + peerlistNew);
}
}
// abstractsCache.put(wordhash, oldAbstract);
}
}
}

private String wordsFromPeer(final String peerhash, final String urls) {
Map.Entry<String, TreeMap<String, String>> entry;
String word, peerlist, url, wordlist = "";
TreeMap<String, String> urlPeerlist;
int p;
boolean hasURL;
synchronized (this) {
final Iterator<Map.Entry <String, TreeMap<String, String>>> i = this.abstractsCache.entrySet().iterator();
while (i.hasNext()) {
entry = i.next();
word = entry.getKey();
urlPeerlist = entry.getValue();
hasURL = true;
for (int j = 0; j < urls.length(); j = j + 12) {
url = urls.substring(j, j + 12);
peerlist = urlPeerlist.get(url);
p = (peerlist == null) ? -1 : peerlist.indexOf(peerhash);
if ((p < 0) || (p % 12 != 0)) {
hasURL = false;
break;
}
}
if (hasURL) wordlist += word;
}
}
return wordlist;
}

public void run() {
try {Thread.sleep(5000);} catch (InterruptedException e) {}
prepareSecondarySearch();
}

private void prepareSecondarySearch() {

if (abstractsCache == null || abstractsCache.size() != query.queryHashes.size()) return; // secondary search not possible (yet)


// catch up index abstracts and join them; then call peers again to submit their urls
System.out.println("DEBUG-INDEXABSTRACT: " + abstractsCache.size() + " word references caught, " + query.queryHashes.size() + " needed");
for (Map.Entry<String, TreeMap<String, String>> entry: abstractsCache.entrySet()) {
System.out.println("DEBUG-INDEXABSTRACT: hash " + entry.getKey() + ": " + ((query.queryHashes.has(entry.getKey().getBytes()) ? "NEEDED" : "NOT NEEDED") + "; " + entry.getValue().size() + " entries"));
}

final TreeMap<String, String> abstractJoin = (abstractsCache.size() == query.queryHashes.size()) ? SetTools.joinConstructive(abstractsCache.values(), true) : new TreeMap<String, String>();
if (abstractJoin.isEmpty()) return;

//System.out.println("DEBUG-INDEXABSTRACT: index abstracts delivered " + abstractJoin.size() + " additional results for secondary search");
// generate query for secondary search
final TreeMap<String, String> secondarySearchURLs = new TreeMap<String, String>(); // a (peerhash:urlhash-liststring) mapping
Expand All @@ -363,7 +429,7 @@ void prepareSecondarySearch() {
entry1 = i1.next();
url = entry1.getKey();
ps = entry1.getValue();
//System.out.println("DEBUG-INDEXABSTRACT: url " + url + ": from peers " + peers);
System.out.println("DEBUG-INDEXABSTRACT: url " + url + ": from peers " + peers);
mypeercount = 0;
for (int j = 0; j < ps.length(); j = j + 12) {
peer = ps.substring(j, j + 12);
Expand All @@ -386,20 +452,20 @@ void prepareSecondarySearch() {
peer = entry1.getKey();
if (peer.equals(mypeerhash)) continue; // we dont need to ask ourself
urls = entry1.getValue();
words = rcAbstracts.wordsFromPeer(peer, urls);
words = wordsFromPeer(peer, urls);
assert words.length() >= 12 : "words = " + words;
//System.out.println("DEBUG-INDEXABSTRACT ***: peer " + peer + " has urls: " + urls);
//System.out.println("DEBUG-INDEXABSTRACT ***: peer " + peer + " from words: " + words);
System.out.println("DEBUG-INDEXABSTRACT ***: peer " + peer + " has urls: " + urls);
System.out.println("DEBUG-INDEXABSTRACT ***: peer " + peer + " from words: " + words);
secondarySearchThreads[c++] = yacySearch.secondaryRemoteSearch(
words, "", urls, this.query.getSegment(), peers, crawlResults, this.rankedCache, peer, Switchboard.urlBlacklist,
words, "", urls, query.getSegment(), peers, crawlResults, rankedCache, peer, Switchboard.urlBlacklist,
query.ranking, query.constraint, preselectedPeerHashes);

}
//} else {
//System.out.println("DEBUG-INDEXABSTRACT: no success using index abstracts from remote peers");

}

}


public void remove(final WordReferenceVars reference) {
this.rankedCache.remove(reference);
}
Expand Down
2 changes: 1 addition & 1 deletion source/de/anomic/server/serverCore.java
Expand Up @@ -701,7 +701,7 @@ private void listen() {
else reqProtocol = null;

if (this.request == null) break;
if (reqProtocol.equals("HTTP")) {
if (reqProtocol != null && reqProtocol.equals("HTTP")) {
this.commandObj = handlerPrototype.clone();
}
}
Expand Down
36 changes: 18 additions & 18 deletions source/de/anomic/yacy/yacyClient.java
Expand Up @@ -88,6 +88,7 @@
import de.anomic.http.server.RequestHeader;
import de.anomic.search.RankingProfile;
import de.anomic.search.RankingProcess;
import de.anomic.search.SearchEvent;
import de.anomic.search.Segment;
import de.anomic.search.Switchboard;
import de.anomic.search.SwitchboardConstants;
Expand Down Expand Up @@ -386,7 +387,7 @@ public static int search(
final Segment indexSegment,
final ResultURLs crawlResults,
final RankingProcess containerCache,
final Map<String, TreeMap<String, String>> abstractCache,
final SearchEvent.SecondarySearchSuperviser secondarySearchSuperviser,
final Blacklist blacklist,
final RankingProfile rankingProfile,
final Bitfield constraint
Expand Down Expand Up @@ -426,7 +427,7 @@ public static int search(
post.add(new DefaultCharsetStringPart("maxdist", Integer.toString(maxDistance)));
post.add(new DefaultCharsetStringPart("profile", crypt.simpleEncode(rankingProfile.toExternalString())));
post.add(new DefaultCharsetStringPart("constraint", (constraint == null) ? "" : constraint.exportB64()));
if (abstractCache != null) post.add(new DefaultCharsetStringPart("abstracts", "auto"));
if (secondarySearchSuperviser != null) post.add(new DefaultCharsetStringPart("abstracts", "auto"));
final long timestamp = System.currentTimeMillis();

// send request
Expand Down Expand Up @@ -579,32 +580,31 @@ public static int search(
}

// read index abstract
if (abstractCache != null) {
if (secondarySearchSuperviser != null) {
final Iterator<Map.Entry<String, String>> i = result.entrySet().iterator();
Map.Entry<String, String> entry;
TreeMap<String, String> singleAbstract;
String wordhash;
String whacc = "";
ByteBuffer ci;
while (i.hasNext()) {
int ac = 0;
abstractparser: while (i.hasNext()) {
entry = i.next();
if (entry.getKey().startsWith("indexabstract.")) {
wordhash = entry.getKey().substring(14);
synchronized (abstractCache) {
singleAbstract = abstractCache.get(wordhash); // a mapping from url-hashes to a string of peer-hashes
if (singleAbstract == null) singleAbstract = new TreeMap<String, String>();
try {
ci = new ByteBuffer(entry.getValue().getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
Log.logException(e);
return -1;
}
//System.out.println("DEBUG-ABSTRACTFETCH: for word hash " + wordhash + " received " + ci.toString());
ReferenceContainer.decompressIndex(singleAbstract, ci, target.hash);
abstractCache.put(wordhash, singleAbstract);
if (wordhash.charAt(0) == '[') break abstractparser;
whacc += wordhash;
try {
ci = new ByteBuffer(entry.getValue().getBytes("UTF-8"));
} catch (UnsupportedEncodingException e) {
Log.logException(e);
return -1;
}
//System.out.println("DEBUG-ABSTRACTFETCH: for word hash " + wordhash + " received " + ci.toString());
secondarySearchSuperviser.addAbstract(wordhash, ReferenceContainer.decompressIndex(ci, target.hash));
ac++;
}
}
if (abstractCache.size() > 0) yacyCore.log.logInfo("remote search: peer " + target.getName() + " sent " + abstractCache.size() + " index abstracts");
if (ac > 0) yacyCore.log.logInfo("remote search: peer " + target.getName() + " sent " + ac + " index abstracts for words "+ whacc);
}

// generate statistics
Expand Down
15 changes: 8 additions & 7 deletions source/de/anomic/yacy/yacySearch.java
Expand Up @@ -41,6 +41,7 @@
import de.anomic.search.QueryParams;
import de.anomic.search.RankingProfile;
import de.anomic.search.RankingProcess;
import de.anomic.search.SearchEvent;
import de.anomic.search.Segment;
import de.anomic.yacy.dht.PeerSelection;

Expand All @@ -53,7 +54,7 @@ public class yacySearch extends Thread {
final private int partitions;
final private Segment indexSegment;
final private RankingProcess containerCache;
final private Map<String, TreeMap<String, String>> abstractCache;
final private SearchEvent.SecondarySearchSuperviser secondarySearchSuperviser;
final private Blacklist blacklist;
final private yacySeed targetPeer;
private int urls;
Expand All @@ -79,7 +80,7 @@ public yacySearch(
final yacySeedDB peers,
final ResultURLs crawlResults,
final RankingProcess containerCache,
final Map<String, TreeMap<String, String>> abstractCache,
final SearchEvent.SecondarySearchSuperviser secondarySearchSuperviser,
final Blacklist blacklist,
final RankingProfile rankingProfile,
final Bitfield constraint) {
Expand All @@ -100,7 +101,7 @@ public yacySearch(
this.peers = peers;
this.crawlResults = crawlResults;
this.containerCache = containerCache;
this.abstractCache = abstractCache;
this.secondarySearchSuperviser = secondarySearchSuperviser;
this.blacklist = blacklist;
this.targetPeer = targetPeer;
this.urls = -1;
Expand All @@ -117,7 +118,7 @@ public void run() {
wordhashes, excludehashes, urlhashes, prefer, filter, language,
sitehash, authorhash,
count, maxDistance, global, partitions,
targetPeer, indexSegment, crawlResults, containerCache, abstractCache,
targetPeer, indexSegment, crawlResults, containerCache, secondarySearchSuperviser,
blacklist, rankingProfile, constraint);
if (urls >= 0) {
// urls is an array of url hashes. this is only used for log output
Expand Down Expand Up @@ -260,7 +261,7 @@ public static yacySearch[] primaryRemoteSearches(
final yacySeedDB peers,
final ResultURLs crawlResults,
final RankingProcess containerCache,
final Map<String, TreeMap<String, String>> abstractCache,
final SearchEvent.SecondarySearchSuperviser secondarySearchSuperviser,
int targets,
final Blacklist blacklist,
final RankingProfile rankingProfile,
Expand Down Expand Up @@ -290,7 +291,7 @@ public static yacySearch[] primaryRemoteSearches(
wordhashes, excludehashes, urlhashes, prefer, filter, language,
sitehash, authorhash,
count, maxDist, true, targets, targetPeers[i],
indexSegment, peers, crawlResults, containerCache, abstractCache, blacklist, rankingProfile, constraint);
indexSegment, peers, crawlResults, containerCache, secondarySearchSuperviser, blacklist, rankingProfile, constraint);
searchThreads[i].start();
}
return searchThreads;
Expand All @@ -316,7 +317,7 @@ public static yacySearch secondaryRemoteSearch(
if (clusterselection != null) targetPeer.setAlternativeAddress(clusterselection.get(targetPeer.hash.getBytes()));
final yacySearch searchThread = new yacySearch(
wordhashes, excludehashes, urlhashes, Pattern.compile(""), Pattern.compile(".*"), "", "", "", 0, 9999, true, 0, targetPeer,
indexSegment, peers, crawlResults, containerCache, new TreeMap<String, TreeMap<String, String>>(), blacklist, rankingProfile, constraint);
indexSegment, peers, crawlResults, containerCache, null, blacklist, rankingProfile, constraint);
searchThread.start();
return searchThread;
}
Expand Down
5 changes: 4 additions & 1 deletion source/net/yacy/kelondro/rwi/ReferenceContainer.java
Expand Up @@ -554,7 +554,8 @@ public static final <ReferenceType extends Reference> ByteBuffer compressIndex(f
return bb;
}

public static final <ReferenceType extends Reference> void decompressIndex(final TreeMap<String, String> target, ByteBuffer ci, final String peerhash) {
public static final TreeMap<String, String> decompressIndex(ByteBuffer ci, final String peerhash) {
TreeMap<String, String> target = new TreeMap<String, String>();
// target is a mapping from url-hashes to a string of peer-hashes
if ((ci.byteAt(0) == '{') && (ci.byteAt(ci.length() - 1) == '}')) {
//System.out.println("DEBUG-DECOMPRESS: input is " + ci.toString());
Expand All @@ -579,5 +580,7 @@ public static final <ReferenceType extends Reference> void decompressIndex(final
if (ci.byteAt(0) == ',') ci.trim(1);
}
}
return target;
}

}

0 comments on commit fbf021b

Please sign in to comment.