Skip to content

Commit

Permalink
fixes and enhancements for balancer:
Browse files Browse the repository at this point in the history
- crawl lists for each domain now uses a HandleSet which should use less memory than LinkedLists
- but: fill more entries into the domain lists (all available entries)
- fixes to selection criteria (best domain selection)

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@6909 6c8d7289-2bf4-0310-a012-ef5d649a1542
  • Loading branch information
orbiter committed Jun 1, 2010
1 parent 9cde054 commit a83772c
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 68 deletions.
114 changes: 55 additions & 59 deletions source/de/anomic/crawler/Balancer.java
Expand Up @@ -26,7 +26,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
Expand Down Expand Up @@ -54,7 +53,7 @@ public class Balancer {
private static final String localhost = "localhost";

// class variables
private final ConcurrentHashMap<String, LinkedList<byte[]>> domainStacks; // a map from host name to lists with url hashs
private final ConcurrentHashMap<String, HandleSet> domainStacks; // a map from host name to lists with url hashs
private final ConcurrentLinkedQueue<byte[]> top;
private final TreeMap<Long, byte[]> delayed;
private BufferedObjectIndex urlFileIndex;
Expand All @@ -72,7 +71,7 @@ public Balancer(
final boolean useTailCache,
final boolean exceed134217727) {
this.cacheStacksPath = cachePath;
this.domainStacks = new ConcurrentHashMap<String, LinkedList<byte[]>>();
this.domainStacks = new ConcurrentHashMap<String, HandleSet>();
this.top = new ConcurrentLinkedQueue<byte[]>();
this.delayed = new TreeMap<Long, byte[]>();
this.minimumLocalDelta = minimumLocalDelta;
Expand Down Expand Up @@ -198,16 +197,11 @@ public synchronized int remove(final HandleSet urlHashes) throws IOException {
}

// iterate through the domain stacks
final Iterator<Map.Entry<String, LinkedList<byte[]>>> q = domainStacks.entrySet().iterator();
Map.Entry<String, LinkedList<byte[]>> se;
LinkedList<byte[]> stack;
final Iterator<Map.Entry<String, HandleSet>> q = domainStacks.entrySet().iterator();
HandleSet stack;
while (q.hasNext()) {
se = q.next();
stack = se.getValue();
final Iterator<byte[]> i = stack.iterator();
while (i.hasNext()) {
if (urlHashes.has(i.next())) i.remove();
}
stack = q.next().getValue();
for (byte[] handle: urlHashes) stack.remove(handle);
if (stack.isEmpty()) q.remove();
}

Expand Down Expand Up @@ -235,7 +229,7 @@ public boolean isEmpty() {
private boolean domainStacksNotEmpty() {
if (domainStacks == null) return false;
synchronized (domainStacks) {
for (LinkedList<byte[]> l: domainStacks.values()) {
for (HandleSet l: domainStacks.values()) {
if (!l.isEmpty()) return true;
}
}
Expand All @@ -257,37 +251,35 @@ public void push(final Request entry) throws IOException, RowSpaceExceededExcept
assert urlFileIndex.has(hash) : "hash = " + new String(hash);

// add the hash to a queue
pushHashToDomainStacks(entry.url().getHost(), entry.url().hash(), 50);
pushHashToDomainStacks(entry.url().getHost(), entry.url().hash());
}
}

private void pushHashToDomainStacks(String host, final byte[] urlhash, final int maxstacksize) {
private void pushHashToDomainStacks(String host, final byte[] urlhash) throws RowSpaceExceededException {
// extend domain stack
if (host == null) host = localhost;
LinkedList<byte[]> domainList = domainStacks.get(host);
HandleSet domainList = domainStacks.get(host);
if (domainList == null) {
// create new list
domainList = new LinkedList<byte[]>();
domainList.add(urlhash);
domainList = new HandleSet(12, Base64Order.enhancedCoder, 1);
domainList.put(urlhash);
domainStacks.put(host, domainList);
} else {
// extend existent domain list
if (domainList.size() < maxstacksize) domainList.addLast(urlhash);
domainList.put(urlhash);
}
}

private void removeHashFromDomainStacks(String host, final byte[] urlhash) {
// extend domain stack
// reduce domain stack
if (host == null) host = localhost;
final LinkedList<byte[]> domainList = domainStacks.get(host);
if (domainList == null) return;
final Iterator<byte[]> i = domainList.iterator();
while (i.hasNext()) {
if (Base64Order.enhancedCoder.equal(i.next(), urlhash)) {
i.remove();
return;
}
final HandleSet domainList = domainStacks.get(host);
if (domainList == null) {
domainStacks.remove(host);
return;
}
domainList.remove(urlhash);
if (domainList.size() == 0) domainStacks.remove(host);
}

private byte[] nextFromDelayed() {
Expand Down Expand Up @@ -320,23 +312,25 @@ private byte[] anyFromDelayed() {
public Request pop(final boolean delay, final CrawlProfile profile) throws IOException {
// returns a crawl entry from the stack and ensures minimum delta times

filltop(delay, -600000, false);
filltop(delay, -60000, false);
filltop(delay, -10000, false);
filltop(delay, -6000, false);
filltop(delay, -4000, false);
filltop(delay, -3000, false);
filltop(delay, -2000, false);
filltop(delay, -1000, false);
filltop(delay, -500, false);
filltop(delay, 0, true);
filltop(delay, 500, true);
filltop(delay, 1000, true);
filltop(delay, 2000, true);
filltop(delay, 3000, true);
filltop(delay, 4000, true);
filltop(delay, 6000, true);
filltop(delay, Long.MAX_VALUE, true);
try {
filltop(delay, -600000, false);
filltop(delay, -60000, false);
filltop(delay, -10000, false);
filltop(delay, -6000, false);
filltop(delay, -4000, false);
filltop(delay, -3000, false);
filltop(delay, -2000, false);
filltop(delay, -1000, false);
filltop(delay, -500, false);
filltop(delay, 0, true);
filltop(delay, 500, true);
filltop(delay, 1000, true);
filltop(delay, 2000, true);
filltop(delay, 3000, true);
filltop(delay, 4000, true);
filltop(delay, 6000, true);
filltop(delay, Long.MAX_VALUE, true);
} catch (RowSpaceExceededException e) {}

long sleeptime = 0;
Request crawlEntry = null;
Expand Down Expand Up @@ -440,21 +434,21 @@ public Request pop(final boolean delay, final CrawlProfile profile) throws IOExc
return crawlEntry;
}

private void filltop(final boolean delay, final long maximumwaiting, final boolean acceptonebest) {
private void filltop(final boolean delay, final long maximumwaiting, final boolean acceptonebest) throws RowSpaceExceededException {
if (!this.top.isEmpty()) return;

//System.out.println("*** DEBUG started filltop delay=" + ((delay) ? "true":"false") + ", maximumwaiting=" + maximumwaiting + ", acceptonebest=" + ((acceptonebest) ? "true":"false"));

// check if we need to get entries from the file index
try {
fillDomainStacks(200);
fillDomainStacks();
} catch (IOException e) {
Log.logException(e);
}

// iterate over the domain stacks
final Iterator<Map.Entry<String, LinkedList<byte[]>>> i = this.domainStacks.entrySet().iterator();
Map.Entry<String, LinkedList<byte[]>> entry;
final Iterator<Map.Entry<String, HandleSet>> i = this.domainStacks.entrySet().iterator();
Map.Entry<String, HandleSet> entry;
long smallestWaiting = Long.MAX_VALUE;
byte[] besturlhash = null;
String besthost = null;
Expand All @@ -467,22 +461,21 @@ private void filltop(final boolean delay, final long maximumwaiting, final boole
continue;
}

byte[] n = entry.getValue().getFirst();
byte[] n = entry.getValue().removeOne();
if (n == null) continue;
besthost = entry.getKey();
if (delay) {
final long w = Latency.waitingRemainingGuessed(besthost, minimumLocalDelta, minimumGlobalDelta);
final long w = Latency.waitingRemainingGuessed(entry.getKey(), minimumLocalDelta, minimumGlobalDelta);
if (w > maximumwaiting) {
if (w < smallestWaiting) {
smallestWaiting = w;
besturlhash = n;
besthost = entry.getKey();
}
entry.getValue().put(n); // put entry back
continue;
}
}

n = entry.getValue().removeFirst();
this.top.add(n);
if (entry.getValue().isEmpty()) i.remove();
}
Expand All @@ -494,10 +487,9 @@ private void filltop(final boolean delay, final long maximumwaiting, final boole
}
}

private void fillDomainStacks(final int maxdomstacksize) throws IOException {
private void fillDomainStacks() throws IOException {
if (!this.domainStacks.isEmpty() && System.currentTimeMillis() - lastDomainStackFill < 120000L) return;
this.domainStacks.clear();
//synchronized (this.delayed) { delayed.clear(); }
this.lastDomainStackFill = System.currentTimeMillis();
final HandleSet handles = this.urlFileIndex.keysFromBuffer(objectIndexBufferSize / 2);
final CloneableIterator<byte[]> i = handles.keys(true, null);
Expand All @@ -508,8 +500,11 @@ private void fillDomainStacks(final int maxdomstacksize) throws IOException {
handle = i.next();
request = new Request(this.urlFileIndex.get(handle));
host = request.url().getHost();
pushHashToDomainStacks(host, handle, 1000);
if (this.domainStacks.size() > maxdomstacksize) break;
try {
pushHashToDomainStacks(host, handle);
} catch (RowSpaceExceededException e) {
break;
}
}
Log.logInfo("BALANCER", "re-fill of domain stacks; fileIndex.size() = " + this.urlFileIndex.size() + ", domainStacks.size = " + domainStacks.size() + ", collection time = " + (System.currentTimeMillis() - this.lastDomainStackFill) + " ms");
this.domStackInitSize = this.domainStacks.size();
Expand All @@ -536,9 +531,10 @@ public ArrayList<Request> top(int count) {
loop: while (count > 0) {
// iterate over the domain stacks
int celsize = cel.size();
ll: for (LinkedList<byte[]> list: this.domainStacks.values()) {
ll: for (HandleSet list: this.domainStacks.values()) {
if (list.size() <= depth) continue ll;
byte[] n = list.get(depth);
byte[] n = list.getOne(depth);
if (n == null) continue ll;
try {
Row.Entry rowEntry = urlFileIndex.get(n);
if (rowEntry == null) continue;
Expand Down
19 changes: 11 additions & 8 deletions source/net/yacy/cora/protocol/ProxySettings.java
Expand Up @@ -20,8 +20,8 @@

package net.yacy.cora.protocol;

import java.util.HashSet;
import java.util.Set;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.commons.httpclient.HostConfiguration;
import org.apache.commons.httpclient.HttpClient;
Expand All @@ -32,12 +32,15 @@
*/
public final class ProxySettings {

// Dummy value to associate with an Object in the backing Map
private static final Object PRESENT = new Object();

public static boolean use = false, use4YaCy = false, use4ssl = false;
public static String host = null, user = "", password = "";
public static int port = 0;
public static String[] noProxy = null;
public static final Set<String> allowProxy = new HashSet<String>();
public static final Set<String> disallowProxy = new HashSet<String>();
public static final Map<String, Object> allowProxy = new ConcurrentHashMap<String, Object>();
public static final Map<String, Object> disallowProxy = new ConcurrentHashMap<String, Object>();

/**
* produce a HostConfiguration (apache object) with the proxy access information included
Expand All @@ -59,15 +62,15 @@ public static HostConfiguration getProxyHostConfig(HttpClient apacheHttpClient)
*/
public static boolean useForHost(final String host) {
if (!use) return false;
if (allowProxy.contains(host)) return true;
if (disallowProxy.contains(host)) return false;
if (allowProxy.containsKey(host)) return true;
if (disallowProxy.containsKey(host)) return false;
for (String pattern: noProxy) {
if (host.matches(pattern)) {
disallowProxy.add(host);
disallowProxy.put(host, PRESENT);
return false;
}
}
allowProxy.add(host);
allowProxy.put(host, PRESENT);
return true;
}

Expand Down
16 changes: 15 additions & 1 deletion source/net/yacy/kelondro/index/HandleSet.java
Expand Up @@ -164,13 +164,27 @@ public final synchronized boolean remove(final byte[] key) {
return indexentry != null;
}

public final synchronized byte[] removeone() {
public final synchronized byte[] removeOne() {
Row.Entry indexentry;
indexentry = index.removeOne();
if (indexentry == null) return null;
return indexentry.getColBytes(0, true);
}

/**
* get one entry; objects are taken from the end of the list
* a getOne(0) would return the same object as removeOne() would remove
* @param idx
* @return entry from the end of the list
*/
public final synchronized byte[] getOne(int idx) {
if (idx >= this.size()) return null;
Row.Entry indexentry;
indexentry = index.get(this.size() - 1 - idx, true);
if (indexentry == null) return null;
return indexentry.getColBytes(0, true);
}

public final synchronized boolean isEmpty() {
return index.isEmpty();
}
Expand Down

0 comments on commit a83772c

Please sign in to comment.