Skip to content

Commit

Permalink
added new concurrent merger class for IndexCell RWI data
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5735 6c8d7289-2bf4-0310-a012-ef5d649a1542
  • Loading branch information
orbiter committed Mar 20, 2009
1 parent 8c494af commit 37f892b
Show file tree
Hide file tree
Showing 7 changed files with 324 additions and 126 deletions.
2 changes: 1 addition & 1 deletion source/de/anomic/crawler/Latency.java
Expand Up @@ -150,7 +150,7 @@ public static long waitingRemaining(yacyURL url, final long minimumLocalDelta, f
waiting = Math.min(60000, waiting);

// return time that is remaining
System.out.println("Latency: " + (waiting - timeSinceLastAccess));
//System.out.println("Latency: " + (waiting - timeSinceLastAccess));
return Math.max(0, waiting - timeSinceLastAccess);
}

Expand Down
2 changes: 1 addition & 1 deletion source/de/anomic/crawler/NoticedURL.java
Expand Up @@ -43,7 +43,7 @@ public class NoticedURL {
public static final int STACK_TYPE_MOVIE = 12; // put on movie stack
public static final int STACK_TYPE_MUSIC = 13; // put on music stack

public static final long minimumLocalDeltaInit = 0; // the minimum time difference between access of the same local domain
public static final long minimumLocalDeltaInit = 10; // the minimum time difference between access of the same local domain
public static final long minimumGlobalDeltaInit = 500; // the minimum time difference between access of the same global domain

private Balancer coreStack; // links found by crawling to depth-1
Expand Down
41 changes: 39 additions & 2 deletions source/de/anomic/kelondro/blob/BLOBArray.java
Expand Up @@ -160,9 +160,46 @@ public synchronized void unmountBLOB(File location, boolean writeIDX) {
}
}

public synchronized File unmountOldestBLOB() {
public synchronized File unmountSmallestBLOB() {
if (this.blobs.size() == 0) return null;
blobItem b = this.blobs.remove(0);
int bestIndex = -1;
long smallest = Long.MAX_VALUE;
for (int i = 0; i < this.blobs.size(); i++) {
if (this.blobs.get(i).location.length() < smallest) {
smallest = this.blobs.get(i).location.length();
bestIndex = i;
}
}
blobItem b = this.blobs.remove(bestIndex);
b.blob.close(false);
return b.location;
}

public synchronized File unmountOldestBLOB(boolean smallestFromFirst2) {
if (this.blobs.size() == 0) return null;
int idx = 0;
if (smallestFromFirst2 && this.blobs.get(1).location.length() < this.blobs.get(0).location.length()) idx = 1;
blobItem b = this.blobs.remove(idx);
b.blob.close(false);
return b.location;
}

public synchronized File unmountSimilarSizeBLOB(long otherSize) {
if (this.blobs.size() == 0 || otherSize == 0) return null;
blobItem b;
double delta, bestDelta = Double.MAX_VALUE;
int bestIndex = -1;
for (int i = 0; i < this.blobs.size(); i++) {
b = this.blobs.get(i);
if (b.location.length() == 0) continue;
delta = ((double) b.location.length()) / ((double) otherSize);
if (delta < 1.0) delta = 1.0 / delta;
if (delta < bestDelta) {
bestDelta = delta;
bestIndex = i;
}
}
b = this.blobs.remove(bestIndex);
b.blob.close(false);
return b.location;
}
Expand Down
7 changes: 4 additions & 3 deletions source/de/anomic/kelondro/text/IndexCell.java
Expand Up @@ -61,9 +61,10 @@ public IndexCell(
final ByteOrder wordOrder,
final Row payloadrow,
final int maxRamEntries,
final int maxArrayFiles
final int maxArrayFiles,
ReferenceContainerMerger merger
) throws IOException {
this.array = new ReferenceContainerArray(cellPath, wordOrder, payloadrow);
this.array = new ReferenceContainerArray(cellPath, wordOrder, payloadrow, merger);
this.ram = new ReferenceContainerCache(payloadrow, wordOrder);
this.ram.initWriteMode();
this.maxRamEntries = maxRamEntries;
Expand Down Expand Up @@ -269,7 +270,7 @@ private synchronized void cacheDump() throws IOException {
this.array.mountBLOBContainer(dumpFile);
int c = 0;
while (this.array.entries() > this.maxArrayFiles && c++ < 3) {
if (!this.array.mergeOldest()) break;
if (!this.array.merge(true)) break;
}
}

Expand Down
126 changes: 9 additions & 117 deletions source/de/anomic/kelondro/text/ReferenceContainerArray.java
@@ -1,9 +1,7 @@
// indexContainerBLOBHeap.java
// ReferenceContainerArray.java
// (C) 2009 by Michael Peter Christen; mc@yacy.net, Frankfurt a. M., Germany
// first published 04.01.2009 on http://yacy.net
//
// This is a part of YaCy, a peer-to-peer based web search engine
//
// $LastChangedDate: 2008-03-14 01:16:04 +0100 (Fr, 14 Mrz 2008) $
// $LastChangedRevision: 4558 $
// $LastChangedBy: orbiter $
Expand Down Expand Up @@ -34,17 +32,16 @@

import de.anomic.kelondro.blob.BLOB;
import de.anomic.kelondro.blob.BLOBArray;
import de.anomic.kelondro.blob.HeapWriter;
import de.anomic.kelondro.index.Row;
import de.anomic.kelondro.index.RowSet;
import de.anomic.kelondro.order.ByteOrder;
import de.anomic.kelondro.order.CloneableIterator;
import de.anomic.kelondro.text.ReferenceContainerCache.blobFileEntries;

public final class ReferenceContainerArray {

private final Row payloadrow;
private final BLOBArray array;
private final ReferenceContainerMerger merger;

/**
* open a index container based on a BLOB dump. The content of the BLOB will not be read
Expand All @@ -59,14 +56,16 @@ public final class ReferenceContainerArray {
public ReferenceContainerArray(
final File heapLocation,
final ByteOrder wordOrder,
final Row payloadrow) throws IOException {
final Row payloadrow,
ReferenceContainerMerger merger) throws IOException {
this.payloadrow = payloadrow;
this.array = new BLOBArray(
heapLocation,
"index",
payloadrow.primaryKeyLength,
wordOrder,
0);
this.merger = merger;
}

public synchronized void close() {
Expand Down Expand Up @@ -244,120 +243,13 @@ public int entries() {
return this.array.entries();
}

public synchronized boolean mergeOldest() throws IOException {
public synchronized boolean merge(boolean similar) throws IOException {
if (this.array.entries() < 2) return false;
File f1 = this.array.unmountOldestBLOB();
File f2 = this.array.unmountOldestBLOB();
System.out.println("*** DEBUG mergeOldest: vvvvvvvvv array has " + this.array.entries() + " entries vvvvvvvvv");
System.out.println("*** DEBUG mergeOldest: unmounted " + f1.getName());
System.out.println("*** DEBUG mergeOldest: unmounted " + f2.getName());
File newFile = merge(f1, f2);
if (newFile == null) return true;
this.array.mountBLOB(newFile);
System.out.println("*** DEBUG mergeOldest: mounted " + newFile.getName());
System.out.println("*** DEBUG mergeOldest: ^^^^^^^^^^^ array has " + this.array.entries() + " entries ^^^^^^^^^^^");
File f1 = this.array.unmountOldestBLOB(similar);
File f2 = (similar) ? this.array.unmountSimilarSizeBLOB(f1.length()) : this.array.unmountOldestBLOB(false);
merger.merge(f1, f2, this.array, this.payloadrow, newContainerBLOBFile());
return true;
}

private synchronized File merge(File f1, File f2) throws IOException {
// iterate both files and write a new one

CloneableIterator<ReferenceContainer> i1 = new blobFileEntries(f1, this.payloadrow);
CloneableIterator<ReferenceContainer> i2 = new blobFileEntries(f2, this.payloadrow);
if (!i1.hasNext()) {
if (i2.hasNext()) {
if (!f1.delete()) f1.deleteOnExit();
return f2;
} else {
if (!f1.delete()) f1.deleteOnExit();
if (!f2.delete()) f2.deleteOnExit();
return null;
}
} else if (!i2.hasNext()) {
if (!f2.delete()) f2.deleteOnExit();
return f1;
}
assert i1.hasNext();
assert i2.hasNext();
File newFile = newContainerBLOBFile();
HeapWriter writer = new HeapWriter(newFile, this.array.keylength(), this.array.ordering());
merge(i1, i2, writer);
writer.close(true);
// we don't need the old files any more
if (!f1.delete()) f1.deleteOnExit();
if (!f2.delete()) f2.deleteOnExit();
return newFile;
}

private synchronized void merge(CloneableIterator<ReferenceContainer> i1, CloneableIterator<ReferenceContainer> i2, HeapWriter writer) throws IOException {
assert i1.hasNext();
assert i2.hasNext();
ReferenceContainer c1, c2, c1o, c2o;
c1 = i1.next();
c2 = i2.next();
int e;
while (true) {
assert c1 != null;
assert c2 != null;
e = this.array.ordering().compare(c1.getWordHash().getBytes(), c2.getWordHash().getBytes());
if (e < 0) {
writer.add(c1.getWordHash().getBytes(), c1.exportCollection());
if (i1.hasNext()) {
c1o = c1;
c1 = i1.next();
assert this.array.ordering().compare(c1.getWordHash().getBytes(), c1o.getWordHash().getBytes()) > 0;
continue;
}
break;
}
if (e > 0) {
writer.add(c2.getWordHash().getBytes(), c2.exportCollection());
if (i2.hasNext()) {
c2o = c2;
c2 = i2.next();
assert this.array.ordering().compare(c2.getWordHash().getBytes(), c2o.getWordHash().getBytes()) > 0;
continue;
}
break;
}
assert e == 0;
// merge the entries
writer.add(c1.getWordHash().getBytes(), (c1.merge(c2)).exportCollection());
if (i1.hasNext() && i2.hasNext()) {
c1 = i1.next();
c2 = i2.next();
continue;
}
if (i1.hasNext()) c1 = i1.next();
if (i2.hasNext()) c2 = i2.next();
break;

}
// catch up remaining entries
assert !(i1.hasNext() && i2.hasNext());
while (i1.hasNext()) {
//System.out.println("FLUSH REMAINING 1: " + c1.getWordHash());
writer.add(c1.getWordHash().getBytes(), c1.exportCollection());
if (i1.hasNext()) {
c1o = c1;
c1 = i1.next();
assert this.array.ordering().compare(c1.getWordHash().getBytes(), c1o.getWordHash().getBytes()) > 0;
continue;
}
break;
}
while (i2.hasNext()) {
//System.out.println("FLUSH REMAINING 2: " + c2.getWordHash());
writer.add(c2.getWordHash().getBytes(), c2.exportCollection());
if (i2.hasNext()) {
c2o = c2;
c2 = i2.next();
assert this.array.ordering().compare(c2.getWordHash().getBytes(), c2o.getWordHash().getBytes()) > 0;
continue;
}
break;
}
// finished with writing
}

}

0 comments on commit 37f892b

Please sign in to comment.