Skip to content

Commit

Permalink
replaced the storing procedure for the index ram cache with a method …
Browse files Browse the repository at this point in the history
…that generates BLOBHeap-compatible dumps

this is a migration step to support a new method to store the web index, which will also based on the same data structure. made also a lot of refactoring for a better structuring of the BLOBHeap class.

git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5430 6c8d7289-2bf4-0310-a012-ef5d649a1542
  • Loading branch information
orbiter committed Jan 1, 2009
1 parent db1cfae commit b6bba18
Show file tree
Hide file tree
Showing 11 changed files with 780 additions and 365 deletions.
97 changes: 82 additions & 15 deletions source/de/anomic/index/indexContainerHeap.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,9 @@
package de.anomic.index;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -42,8 +39,8 @@
import java.util.SortedMap;
import java.util.TreeMap;

import de.anomic.kelondro.kelondroBLOB;
import de.anomic.kelondro.kelondroBLOBHeap;
import de.anomic.kelondro.kelondroBLOBHeapReader;
import de.anomic.kelondro.kelondroBLOBHeapWriter;
import de.anomic.kelondro.kelondroBase64Order;
import de.anomic.kelondro.kelondroByteOrder;
import de.anomic.kelondro.kelondroCloneableIterator;
Expand Down Expand Up @@ -90,7 +87,7 @@ public void initWriteMode() {
* @param heapFile
* @throws IOException
*/
public void initWriteMode(final File heapFile) throws IOException {
public void initWriteModeFromHeap(final File heapFile) throws IOException {
if (log != null) log.logInfo("restoring dump for rwi heap '" + heapFile.getName() + "'");
final long start = System.currentTimeMillis();
this.cache = Collections.synchronizedSortedMap(new TreeMap<String, indexContainer>(new kelondroByteOrder.StringOrder(payloadrow.getOrdering())));
Expand All @@ -103,10 +100,33 @@ public void initWriteMode(final File heapFile) throws IOException {
urlCount += container.size();
}
}
if (log != null) log.logInfo("finished rwi heap restore: " + cache.size() + " words, " + urlCount + " word/URL relations in " + ((System.currentTimeMillis() - start) / 1000) + " seconds");
if (log != null) log.logInfo("finished rwi heap restore: " + cache.size() + " words, " + urlCount + " word/URL relations in " + (System.currentTimeMillis() - start) + " milliseconds");
}

public void dump(final File heapFile) throws IOException {
/**
* this is the new cache file format initialization
* @param heapFile
* @throws IOException
*/
public void initWriteModeFromBLOB(final File blobFile) throws IOException {
if (log != null) log.logInfo("restoring rwi blob dump '" + blobFile.getName() + "'");
final long start = System.currentTimeMillis();
this.cache = Collections.synchronizedSortedMap(new TreeMap<String, indexContainer>(new kelondroByteOrder.StringOrder(payloadrow.getOrdering())));
int urlCount = 0;
synchronized (cache) {
for (final indexContainer container : new blobFileEntries(blobFile, this.payloadrow)) {
// TODO: in this loop a lot of memory may be allocated. A check if the memory gets low is necessary. But what do when the memory is low?
if (container == null) break;
cache.put(container.getWordHash(), container);
urlCount += container.size();
}
}
// remove idx and gap files if they exist here
kelondroBLOBHeapWriter.deleteAllFingerprints(blobFile);
if (log != null) log.logInfo("finished rwi blob restore: " + cache.size() + " words, " + urlCount + " word/URL relations in " + (System.currentTimeMillis() - start) + " milliseconds");
}
/*
public void dumpold(final File heapFile) throws IOException {
assert this.cache != null;
if (log != null) log.logInfo("creating rwi heap dump '" + heapFile.getName() + "', " + cache.size() + " rwi's");
if (heapFile.exists()) heapFile.delete();
Expand Down Expand Up @@ -137,14 +157,14 @@ public void dump(final File heapFile) throws IOException {
}
os.flush();
os.close();
if (log != null) log.logInfo("finished rwi heap dump: " + wordcount + " words, " + urlcount + " word/URL relations in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds");
if (log != null) log.logInfo("finished rwi heap dump: " + wordcount + " words, " + urlcount + " word/URL relations in " + (System.currentTimeMillis() - startTime) + " milliseconds");
}

public void dump2(final File heapFile) throws IOException {
*/
public void dump(final File heapFile) throws IOException {
assert this.cache != null;
if (log != null) log.logInfo("creating alternative rwi heap dump '" + heapFile.getName() + "', " + cache.size() + " rwi's");
if (heapFile.exists()) heapFile.delete();
final kelondroBLOB dump = new kelondroBLOBHeap(heapFile, payloadrow.primaryKeyLength, kelondroBase64Order.enhancedCoder, 1024 * 1024 * 10);
final kelondroBLOBHeapWriter dump = new kelondroBLOBHeapWriter(heapFile, payloadrow.primaryKeyLength, kelondroBase64Order.enhancedCoder);
final long startTime = System.currentTimeMillis();
long wordcount = 0, urlcount = 0;
String wordHash;
Expand All @@ -159,14 +179,14 @@ public void dump2(final File heapFile) throws IOException {

// put entries on heap
if (container != null && wordHash.length() == payloadrow.primaryKeyLength) {
dump.put(wordHash.getBytes(), container.exportCollection());
dump.add(wordHash.getBytes(), container.exportCollection());
urlcount += container.size();
}
wordcount++;
}
}
dump.close();
if (log != null) log.logInfo("finished alternative rwi heap dump: " + wordcount + " words, " + urlcount + " word/URL relations in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds");
if (log != null) log.logInfo("finished alternative rwi heap dump: " + wordcount + " words, " + urlcount + " word/URL relations in " + (System.currentTimeMillis() - startTime) + " milliseconds");
}

public int size() {
Expand All @@ -184,7 +204,7 @@ public static class heapFileEntries implements Iterator<indexContainer>, Iterabl

public heapFileEntries(final File heapFile, final kelondroRow payloadrow) throws IOException {
if (!(heapFile.exists())) throw new IOException("file " + heapFile + " does not exist");
is = new DataInputStream(new BufferedInputStream(new FileInputStream(heapFile), 64*1024));
is = new DataInputStream(new BufferedInputStream(new FileInputStream(heapFile), 1024*1024));
word = new byte[payloadrow.primaryKeyLength];
this.payloadrow = payloadrow;
this.nextContainer = next0();
Expand Down Expand Up @@ -231,6 +251,53 @@ protected void finalize() {
}
}

/**
* static iterator of BLOBHeap files: is used to import heap dumps into a write-enabled index heap
*/
public static class blobFileEntries implements Iterator<indexContainer>, Iterable<indexContainer> {
Iterator<Map.Entry<String, byte[]>> blobs;
kelondroRow payloadrow;

public blobFileEntries(final File blobFile, final kelondroRow payloadrow) throws IOException {
this.blobs = new kelondroBLOBHeapReader.entries(blobFile, payloadrow.primaryKeyLength);
this.payloadrow = payloadrow;
}

public boolean hasNext() {
return blobs.hasNext();
}

/**
* return an index container
* because they may get very large, it is wise to deallocate some memory before calling next()
*/
public indexContainer next() {
try {
Map.Entry<String, byte[]> entry = blobs.next();
byte[] payload = entry.getValue();
return new indexContainer(entry.getKey(), kelondroRowSet.importRowSet(payload, payloadrow));
} catch (final IOException e) {
return null;
}
}

public void remove() {
throw new UnsupportedOperationException("heap dumps are read-only");
}

public Iterator<indexContainer> iterator() {
return this;
}

public void close() {
blobs = null;
}

protected void finalize() {
this.close();
}
}

public synchronized int maxReferences() {
// iterate to find the max score
int max = 0;
Expand Down
60 changes: 40 additions & 20 deletions source/de/anomic/index/indexRAMRI.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,19 @@ public final class indexRAMRI implements indexRI, indexRIReader {
public int cacheReferenceCountLimit; // the maximum number of references to a single RWI entity
public long cacheReferenceAgeLimit; // the maximum age (= time not changed) of a RWI entity
private final serverLog log;
private final File indexHeapFile;
private final File oldDumpFile, newDumpFile;
private indexContainerHeap heap;

@SuppressWarnings("unchecked")
public indexRAMRI(final File databaseRoot, final kelondroRow payloadrow, final int entityCacheMaxSize, final int wCacheReferenceCountLimitInit, final long wCacheReferenceAgeLimitInit, final String newHeapName, final serverLog log) {
public indexRAMRI(
final File databaseRoot,
final kelondroRow payloadrow,
final int entityCacheMaxSize,
final int wCacheReferenceCountLimitInit,
final long wCacheReferenceAgeLimitInit,
final String oldHeapName,
final String newHeapName,
final serverLog log) {

// creates a new index cache
// the cache has a back-end where indexes that do not fit in the cache are flushed
Expand All @@ -62,25 +70,37 @@ public indexRAMRI(final File databaseRoot, final kelondroRow payloadrow, final i
this.cacheReferenceCountLimit = wCacheReferenceCountLimitInit;
this.cacheReferenceAgeLimit = wCacheReferenceAgeLimitInit;
this.log = log;
this.indexHeapFile = new File(databaseRoot, newHeapName);
this.oldDumpFile = new File(databaseRoot, oldHeapName);
this.newDumpFile = new File(databaseRoot, newHeapName);
this.heap = new indexContainerHeap(payloadrow, log);

// read in dump of last session
if (indexHeapFile.exists()) {
try {
heap.initWriteMode(indexHeapFile);
for (final indexContainer ic : (Iterable<indexContainer>) heap.wordContainers(null, false)) {
this.hashDate.setScore(ic.getWordHash(), intTime(ic.lastWrote()));
this.hashScore.setScore(ic.getWordHash(), ic.size());
}
} catch (final IOException e){
log.logSevere("unable to restore cache dump: " + e.getMessage(), e);
// get empty dump
heap.initWriteMode();
} catch (final NegativeArraySizeException e){
log.logSevere("unable to restore cache dump: " + e.getMessage(), e);
// get empty dump
heap.initWriteMode();
boolean initFailed = false;
if (newDumpFile.exists() && oldDumpFile.exists()) {
// we need only one, delete the old
oldDumpFile.delete();
}
if (oldDumpFile.exists()) try {
heap.initWriteModeFromHeap(oldDumpFile);
} catch (IOException e) {
initFailed = true;
e.printStackTrace();
}
if (newDumpFile.exists()) try {
heap.initWriteModeFromBLOB(newDumpFile);
} catch (IOException e) {
initFailed = true;
e.printStackTrace();
}
if (initFailed) {
log.logSevere("unable to restore cache dump");
// get empty dump
heap.initWriteMode();
} else if (oldDumpFile.exists() || newDumpFile.exists()) {
// initialize scores for cache organization
for (final indexContainer ic : (Iterable<indexContainer>) heap.wordContainers(null, false)) {
this.hashDate.setScore(ic.getWordHash(), intTime(ic.lastWrote()));
this.hashScore.setScore(ic.getWordHash(), ic.size());
}
} else {
heap.initWriteMode();
Expand Down Expand Up @@ -319,8 +339,8 @@ public synchronized void addEntry(final String wordHash, final indexRWIRowEntry
public synchronized void close() {
// dump cache
try {
heap.dump(this.indexHeapFile);
//heap.dump2(new File(this.indexHeapFile.getAbsolutePath() + ".blob"));
//heap.dumpold(this.oldDumpFile);
heap.dump(this.newDumpFile);
} catch (final IOException e){
log.logSevere("unable to dump cache: " + e.getMessage(), e);
}
Expand Down
31 changes: 17 additions & 14 deletions source/de/anomic/kelondro/kelondroBLOBGap.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
Expand All @@ -56,21 +56,24 @@ public kelondroBLOBGap() {
* initialize a kelondroBLOBGap with the content of a dump
* @param file
* @throws IOException
* @throws IOException
*/
public kelondroBLOBGap(final File file) throws IOException {
super();
// read the index dump and fill the index
InputStream is = new BufferedInputStream(new FileInputStream(file), 1024 * 1024);
byte[] k = new byte[8];
byte[] v = new byte[4];
int c;
DataInputStream is = new DataInputStream(new BufferedInputStream(new FileInputStream(file), 1024 * 1024));
long p;
int l;
while (true) {
c = is.read(k);
if (c <= 0) break;
c = is.read(v);
if (c <= 0) break;
this.put(new Long(kelondroNaturalOrder.decodeLong(k)), new Integer((int) kelondroNaturalOrder.decodeLong(v)));
try {
p = is.readLong();
l = is.readInt();
this.put(new Long(p), new Integer(l));
} catch (IOException e) {
break;
}
}
is.close();
}

/**
Expand All @@ -81,13 +84,13 @@ public kelondroBLOBGap(final File file) throws IOException {
*/
public int dump(File file) throws IOException {
Iterator<Map.Entry<Long, Integer>> i = this.entrySet().iterator();
OutputStream os = new BufferedOutputStream(new FileOutputStream(file), 1024 * 1024);
DataOutputStream os = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(file), 1024 * 1024));
int c = 0;
Map.Entry<Long, Integer> e;
while (i.hasNext()) {
e = i.next();
os.write(kelondroNaturalOrder.encodeLong(e.getKey().longValue(), 8));
os.write(kelondroNaturalOrder.encodeLong(e.getValue().longValue(), 4));
os.writeLong(e.getKey().longValue());
os.writeInt(e.getValue().intValue());
c++;
}
os.flush();
Expand Down
Loading

0 comments on commit b6bba18

Please sign in to comment.