Skip to content

Commit

Permalink
speed enhancement for BLOBHeap opening process
Browse files Browse the repository at this point in the history
using concurrency of FileIO and content processing

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5360 6c8d7289-2bf4-0310-a012-ef5d649a1542
  • Loading branch information
orbiter committed Nov 23, 2008
1 parent 1545e54 commit 2e21200
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 22 deletions.
32 changes: 19 additions & 13 deletions source/de/anomic/kelondro/kelondroBLOBHeap.java
Expand Up @@ -33,6 +33,7 @@
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;

import de.anomic.server.serverMemory;
import de.anomic.server.logging.serverLog;
Expand Down Expand Up @@ -79,13 +80,14 @@ public kelondroBLOBHeap(final File heapFile, final int keylength, final kelondro
this.ordering = ordering;
this.heapFile = heapFile;

this.index = new kelondroBytesLongMap(keylength, this.ordering, 0);
this.index = null; // will be created as result of initialization process
this.free = new TreeMap<Long, Integer>();
this.file = new RandomAccessFile(heapFile, "rw");
final byte[] key = new byte[keylength];
byte[] key = new byte[keylength];
int reclen;
long seek = 0;

kelondroBytesLongMap.initDataConsumer indexready = kelondroBytesLongMap.asynchronusInitializer(keylength, this.ordering, 0, Math.max(10, (int) (Runtime.getRuntime().freeMemory() / (10 * 1024 * 1024))));

loop: while (true) { // don't test available() here because this does not work for files > 2GB

try {
Expand Down Expand Up @@ -115,21 +117,17 @@ public kelondroBLOBHeap(final File heapFile, final int keylength, final kelondro
// it is an empty record, store to free list
if (reclen > 0) free.put(seek, reclen);
} else {
// store key and access address of entry in index
try {
if (this.ordering.wellformed(key)) {
index.addl(key, seek);
} else {
serverLog.logWarning("kelondroBLOBHeap", "BLOB " + heapFile.getName() + ": skiped not wellformed key " + new String(key) + " at seek pos " + seek);
}
} catch (final IOException e) {
e.printStackTrace();
break loop;
if (this.ordering.wellformed(key)) {
indexready.consume(key, seek);
key = new byte[keylength];
} else {
serverLog.logWarning("kelondroBLOBHeap", "BLOB " + heapFile.getName() + ": skiped not wellformed key " + new String(key) + " at seek pos " + seek);
}
}
// new seek position
seek += 4L + reclen;
}
indexready.finish();

// try to merge free entries
if (this.free.size() > 1) {
Expand Down Expand Up @@ -157,6 +155,14 @@ public kelondroBLOBHeap(final File heapFile, final int keylength, final kelondro
serverLog.logInfo("kelondroBLOBHeap", "BLOB " + heapFile.getName() + ": merged " + merged + " free records");
}

try {
this.index = indexready.result();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}

// DEBUG
/*
Iterator<byte[]> i = index.keys(true, null);
Expand Down
85 changes: 77 additions & 8 deletions source/de/anomic/kelondro/kelondroBytesLongMap.java
Expand Up @@ -27,18 +27,18 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class kelondroBytesLongMap {

private final kelondroRow rowdef;
private kelondroIndex index;

public kelondroBytesLongMap(final kelondroIndex ki) {
assert (ki.row().columns() == 2); // must be a key/index relation
assert (ki.row().width(1) == 8); // the value must be a b256-encoded int, 4 bytes long
this.index = ki;
this.rowdef = ki.row();
}
private kelondroRAMIndex index;

public kelondroBytesLongMap(final int keylength, final kelondroByteOrder objectOrder, final int space) {
this.rowdef = new kelondroRow(new kelondroColumn[]{new kelondroColumn("key", kelondroColumn.celltype_binary, kelondroColumn.encoder_bytes, keylength, "key"), new kelondroColumn("long c-8 {b256}")}, objectOrder, 0);
Expand Down Expand Up @@ -132,4 +132,73 @@ public synchronized void close() {
index = null;
}

public static initDataConsumer asynchronusInitializer(final int keylength, final kelondroByteOrder objectOrder, final int space, int bufferSize) {
initDataConsumer initializer = new initDataConsumer(new kelondroBytesLongMap(keylength, objectOrder, space), bufferSize);
ExecutorService service = Executors.newSingleThreadExecutor();
initializer.setResult(service.submit(initializer));
service.shutdown();
return initializer;
}

public static class entry {
public byte[] key;
public long l;
public entry(final byte[] key, final long l) {
this.key = key;
this.l = l;
}
}

public static class initDataConsumer implements Callable<kelondroBytesLongMap> {

private BlockingQueue<entry> cache;
private final entry poison = new entry(new byte[0], 0);
private kelondroBytesLongMap map;
private Future<kelondroBytesLongMap> result;

public initDataConsumer(kelondroBytesLongMap map, int bufferCount) {
this.map = map;
cache = new ArrayBlockingQueue<entry>(bufferCount);
}

protected void setResult(Future<kelondroBytesLongMap> result) {
this.result = result;
}

public void consume(final byte[] key, final long l) {
try {
cache.put(new entry(key, l));
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public void finish() {
try {
cache.put(poison);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public kelondroBytesLongMap result() throws InterruptedException, ExecutionException {
return this.result.get();
}

public kelondroBytesLongMap call() throws IOException {
try {
entry c;
while(true) {
c = cache.take();
if (c == poison) break;
map.addl(c.key, c.l);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
map.index.finishInitialization();
return map;
}

}
}
2 changes: 1 addition & 1 deletion source/de/anomic/kelondro/kelondroRAMIndex.java
Expand Up @@ -57,7 +57,7 @@ public kelondroRow row() {
return index0.row();
}

private final void finishInitialization() {
protected final void finishInitialization() {
if (index1 == null) {
// finish initialization phase
index0.sort();
Expand Down

0 comments on commit 2e21200

Please sign in to comment.