Skip to content

Commit

Permalink
reverting again the changes to new concurrent chunkIterator
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5362 6c8d7289-2bf4-0310-a012-ef5d649a1542
  • Loading branch information
orbiter committed Nov 23, 2008
1 parent 45ad1c3 commit 9d64693
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 132 deletions.
157 changes: 32 additions & 125 deletions source/de/anomic/index/indexContainerHeap.java
Expand Up @@ -34,16 +34,18 @@
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;

import de.anomic.kelondro.kelondroBLOB;
import de.anomic.kelondro.kelondroBLOBBuffer;
import de.anomic.kelondro.kelondroBLOBHeap;
import de.anomic.kelondro.kelondroBase64Order;
import de.anomic.kelondro.kelondroByteOrder;
import de.anomic.kelondro.kelondroBytesLongMap;
import de.anomic.kelondro.kelondroCloneableIterator;
import de.anomic.kelondro.kelondroRow;
import de.anomic.kelondro.kelondroRowSet;
Expand All @@ -53,24 +55,7 @@ public final class indexContainerHeap {

private final kelondroRow payloadrow;
private final serverLog log;
private kelondroBytesLongMap index;
private SortedMap<String, indexContainer> cache;
private File backupFile;
private boolean readOnlyMode;
// index xor cache is used. If one is not null, then the other must be null

/*
* An indexContainerHeap is a caching structure for indexContainer objects
* A heap can have the following stati:
* write: the heap can be extended with more indexContainer entries.
* a heap that is open to be written may be dumped to a heap file.
* after that, the heap is still accessible, but only in read-status,
* which is not reversible. Once a heap is dumped, it can never be extended with new
* indexConatiner entries.
* A write-heap can also initiated using a restore of a dumped heap.
* read: a dumped head can be accessed using a heap index. when the heap is
* accessed the first time, all entries are scanned and an index is computed
*/

/**
* opens an existing heap file in undefined mode
Expand All @@ -83,13 +68,9 @@ public indexContainerHeap(final kelondroRow payloadrow, final serverLog log) {
this.payloadrow = payloadrow;
this.log = log;
this.cache = null;
this.index = null;
this.backupFile = null;
this.readOnlyMode = false;
}

public void clear() throws IOException {
if (index != null) index.clear();
if (cache != null) cache.clear();
initWriteMode();
}
Expand All @@ -100,7 +81,6 @@ public void clear() throws IOException {
*/
public void initWriteMode() {
this.cache = Collections.synchronizedSortedMap(new TreeMap<String, indexContainer>(new kelondroByteOrder.StringOrder(payloadrow.getOrdering())));
this.readOnlyMode = false;
}

/**
Expand All @@ -111,7 +91,6 @@ public void initWriteMode() {
* @throws IOException
*/
public void initWriteMode(final File heapFile) throws IOException {
this.readOnlyMode = false;
if (log != null) log.logInfo("restoring dump for rwi heap '" + heapFile.getName() + "'");
final long start = System.currentTimeMillis();
this.cache = Collections.synchronizedSortedMap(new TreeMap<String, indexContainer>(new kelondroByteOrder.StringOrder(payloadrow.getOrdering())));
Expand All @@ -127,60 +106,6 @@ public void initWriteMode(final File heapFile) throws IOException {
if (log != null) log.logInfo("finished rwi heap restore: " + cache.size() + " words, " + urlCount + " word/URL relations in " + ((System.currentTimeMillis() - start) / 1000) + " seconds");
}

/**
* init a heap file in read-only mode
* this initiates a heap index generation which is then be used to access elements of the heap
* during the life-time of this object, the file is _not_ open; it is opened each time
* the heap is accessed for reading
* @param heapFile
* @throws IOException
*/
public void initReadMode(final File heapFile) throws IOException {
this.readOnlyMode = true;
assert this.cache == null;
assert this.index == null;
this.backupFile = heapFile;
if (!(heapFile.exists())) throw new IOException("file " + heapFile + " does not exist");
if (heapFile.length() >= Integer.MAX_VALUE) throw new IOException("file " + heapFile + " too large, index can only be crated for files less than 2GB");
if (log != null) log.logInfo("creating index for rwi heap '" + heapFile.getName() + "'");

final long start = System.currentTimeMillis();
this.index = new kelondroBytesLongMap(payloadrow.primaryKeyLength, (kelondroByteOrder) payloadrow.getOrdering(), 0);
DataInputStream is = null;
final long urlCount = 0;
String wordHash;
final byte[] word = new byte[payloadrow.primaryKeyLength];
long seek = 0, seek0;
synchronized (index) {
is = new DataInputStream(new BufferedInputStream(new FileInputStream(heapFile), 64*1024));

// don't test available() here because this does not work for files > 2GB
loop: while (true) {
// remember seek position
seek0 = seek;

// read word
try {
is.readFully(word);
} catch (final IOException e) {
break loop; // terminate loop
}
wordHash = new String(word);
seek += wordHash.length();

// read collection
try {
seek += kelondroRowSet.skipNextRowSet(is, payloadrow);
} catch (final IOException e) {
break loop; // terminate loop
}
index.addl(word, seek0);
}
}
is.close();
if (log != null) log.logInfo("finished rwi heap indexing: " + urlCount + " word/URL relations in " + ((System.currentTimeMillis() - start) / 1000) + " seconds");
}

public void dump(final File heapFile) throws IOException {
assert this.cache != null;
if (log != null) log.logInfo("creating rwi heap dump '" + heapFile.getName() + "', " + cache.size() + " rwi's");
Expand Down Expand Up @@ -213,10 +138,35 @@ public void dump(final File heapFile) throws IOException {
os.flush();
os.close();
if (log != null) log.logInfo("finished rwi heap dump: " + wordcount + " words, " + urlcount + " word/URL relations in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds");
}

public void dump2(final File heapFile) throws IOException {
assert this.cache != null;
if (log != null) log.logInfo("creating alternative rwi heap dump '" + heapFile.getName() + "', " + cache.size() + " rwi's");
if (heapFile.exists()) heapFile.delete();
final kelondroBLOB dump = new kelondroBLOBBuffer(new kelondroBLOBHeap(heapFile, payloadrow.primaryKeyLength, kelondroBase64Order.enhancedCoder), 1024 * 1024 * 2, true);
final long startTime = System.currentTimeMillis();
long wordcount = 0, urlcount = 0;
String wordHash;
indexContainer container;

// finally delete the internal cache to switch handling to read-only mode
this.cache = null;
// if the cache will be used in read-only mode afterwards, it must be initiated with initReadMode(file);
// write wCache
synchronized (cache) {
for (final Map.Entry<String, indexContainer> entry: cache.entrySet()) {
// get entries
wordHash = entry.getKey();
container = entry.getValue();

// put entries on heap
if (container != null && wordHash.length() == payloadrow.primaryKeyLength) {
dump.put(wordHash.getBytes(), container.exportCollection());
urlcount += container.size();
}
wordcount++;
}
}
dump.close();
if (log != null) log.logInfo("finished alternative rwi heap dump: " + wordcount + " words, " + urlcount + " word/URL relations in " + ((System.currentTimeMillis() - startTime) / 1000) + " seconds");
}

public int size() {
Expand Down Expand Up @@ -348,18 +298,6 @@ public Iterator<indexContainer> iterator() {
* @return true, if the key is used in the heap; false othervise
*/
public boolean has(final String key) {
if (this.readOnlyMode) {
assert index != null;
assert index.row().primaryKeyLength == key.length();

// check if the index contains the key
try {
return index.getl(key.getBytes()) >= 0;
} catch (final IOException e) {
e.printStackTrace();
return false;
}
}
return this.cache.containsKey(key);
}

Expand All @@ -369,32 +307,6 @@ public boolean has(final String key) {
* @return the indexContainer if one exist, null otherwise
*/
public indexContainer get(final String key) {
if (this.readOnlyMode) try {
assert index != null;
assert index.row().primaryKeyLength == key.length();

// check if the index contains the key
final long pos = index.getl(key.getBytes());
if (pos < 0) return null;

// access the file and read the container
final RandomAccessFile raf = new RandomAccessFile(backupFile, "r");
final byte[] word = new byte[index.row().primaryKeyLength];

raf.seek(pos);
final int bytesRead = raf.read(word);
assert bytesRead == word.length;
assert key.equals(new String(word));

// read collection
final indexContainer container = new indexContainer(key, kelondroRowSet.importRowSet(raf, payloadrow));
raf.close();
return container;
} catch (final IOException e) {
log.logSevere("error accessing entry in heap file " + this.backupFile + ": " + e.getMessage());
return null;
}

return this.cache.get(key);
}

Expand All @@ -406,14 +318,12 @@ public indexContainer get(final String key) {
public synchronized indexContainer delete(final String wordHash) {
// returns the index that had been deleted
assert this.cache != null;
assert !this.readOnlyMode;
return cache.remove(wordHash);
}


public synchronized boolean removeReference(final String wordHash, final String urlHash) {
assert this.cache != null;
assert !this.readOnlyMode;
final indexContainer c = cache.get(wordHash);
if ((c != null) && (c.remove(urlHash) != null)) {
// removal successful
Expand All @@ -429,7 +339,6 @@ public synchronized boolean removeReference(final String wordHash, final String

public synchronized int removeReferences(final String wordHash, final Set<String> urlHashes) {
assert this.cache != null;
assert !this.readOnlyMode;
if (urlHashes.size() == 0) return 0;
final indexContainer c = cache.get(wordHash);
int count;
Expand All @@ -450,7 +359,6 @@ public synchronized int add(final indexContainer container) {
int added = 0;
if ((container == null) || (container.size() == 0)) return 0;
assert this.cache != null;
assert !this.readOnlyMode;

// put new words into cache
final String wordHash = container.getWordHash();
Expand All @@ -470,7 +378,6 @@ public synchronized int add(final indexContainer container) {

public synchronized void addEntry(final String wordHash, final indexRWIRowEntry newEntry) {
assert this.cache != null;
assert !this.readOnlyMode;
indexContainer container = cache.get(wordHash);
if (container == null) container = new indexContainer(wordHash, this.payloadrow, 1);
container.put(newEntry);
Expand Down
1 change: 1 addition & 0 deletions source/de/anomic/index/indexRAMRI.java
Expand Up @@ -320,6 +320,7 @@ public synchronized void close() {
// dump cache
try {
heap.dump(this.indexHeapFile);
//heap.dump2(new File(this.indexHeapFile.getAbsolutePath() + ".blob"));
} catch (final IOException e){
log.logSevere("unable to dump cache: " + e.getMessage(), e);
}
Expand Down
20 changes: 13 additions & 7 deletions source/de/anomic/kelondro/kelondroChunkIterator.java
Expand Up @@ -24,6 +24,8 @@

package de.anomic.kelondro;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
Expand All @@ -49,7 +51,7 @@ public class kelondroChunkIterator implements Iterator<byte[]> {
* @param chunksize: the size of the chunks that are returned by next(). remaining bytes until the lenght of recordsize are skipped
* @throws FileNotFoundException
*/
/*


private final DataInputStream stream;
private byte[] nextBytes;
Expand Down Expand Up @@ -99,7 +101,7 @@ public void remove() {
}


*/
/*
ExecutorService service = Executors.newFixedThreadPool(2);
filechunkProducer producer;
filechunkSlicer slicer;
Expand Down Expand Up @@ -142,7 +144,7 @@ public byte[] next() {
public void remove() {
throw new UnsupportedOperationException();
}

*/
private static class filechunkSlicer implements Callable<Integer> {

private filechunkProducer producer;
Expand All @@ -160,7 +162,7 @@ public filechunkSlicer(filechunkProducer producer, final int slicesize, int head

public byte[] consume() {
try {
byte[] b = slices.take(); // leer
byte[] b = slices.take();
if (b == poison) return null; else return b;
} catch (InterruptedException e) {
e.printStackTrace();
Expand Down Expand Up @@ -212,15 +214,14 @@ public Integer call() {
}
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

return new Integer(0);
}

}

private static class filechunk {
public byte[] b;
public int n;
Expand Down Expand Up @@ -265,14 +266,19 @@ public filechunk consume() {
if (f == poison) return null; else return f;
} catch (InterruptedException e) {
e.printStackTrace();
try {
this.fis.close();
} catch (IOException e1) {
e1.printStackTrace();
}
return null;
}
}

public Integer call() {
try {
filechunk c;
while(true) {
while (true) {
c = empty.take(); // leer
c.n = fis.read(c.b);
if (c.n <= 0) break;
Expand Down

0 comments on commit 9d64693

Please sign in to comment.