Skip to content

Commit

Permalink
more refactoring of the index classes
Browse files Browse the repository at this point in the history
git-svn-id: https://svn.berlios.de/svnroot/repos/yacy/trunk@5995 6c8d7289-2bf4-0310-a012-ef5d649a1542
  • Loading branch information
orbiter committed May 29, 2009
1 parent 3d5f2ff commit 1c69d9b
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 220 deletions.
202 changes: 0 additions & 202 deletions source/de/anomic/kelondro/table/ChunkIterator.java
Expand Up @@ -31,9 +31,6 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;

public class ChunkIterator implements Iterator<byte[]> {

Expand Down Expand Up @@ -96,203 +93,4 @@ public byte[] next() {
public void remove() {
throw new UnsupportedOperationException();
}


/*
ExecutorService service = Executors.newFixedThreadPool(2);
filechunkProducer producer;
filechunkSlicer slicer;
Future<Integer> producerResult;
Future<Integer> slicerResult;
byte[] nextRecord;
public kelondroChunkIterator(final File file, final int recordsize, final int chunksize) throws FileNotFoundException {
assert (file.exists());
assert file.length() % recordsize == 0;
this.chunksize = chunksize;
service = Executors.newFixedThreadPool(2);
// buffer size and count calculation is critical, because wrong values
// will cause blocking of the concurrent consumer/producer threads
int filebuffersize = 1024 * 16;
int chunkbuffercountmin = filebuffersize / recordsize + 1; // minimum
int filebuffercount = 1024 * 1024 / filebuffersize; // max 1 MB
int chunkbuffercount = chunkbuffercountmin * filebuffercount + 1;
producer = new filechunkProducer(file, filebuffersize, filebuffercount);
slicer = new filechunkSlicer(producer, recordsize, chunksize, chunkbuffercount);
producerResult = service.submit(producer);
slicerResult = service.submit(slicer);
service.shutdown();
nextRecord = slicer.consume();
}
public boolean hasNext() {
return nextRecord != null;
}
public byte[] next() {
if (nextRecord == null) return null;
byte[] n = nextRecord;
nextRecord = slicer.consume();
return n;
}
public void remove() {
throw new UnsupportedOperationException();
}
*/
private static class filechunkSlicer implements Callable<Integer> {

private filechunkProducer producer;
private static byte[] poison = new byte[0];
private BlockingQueue<byte[]> slices;
private int slicesize, head;

public filechunkSlicer(filechunkProducer producer, final int slicesize, int head, int stacksize) throws FileNotFoundException {
assert producer != null;
this.producer = producer;
this.slices = new ArrayBlockingQueue<byte[]>(stacksize);
this.slicesize = slicesize;
this.head = head;
}

public byte[] consume() {
try {
byte[] b = slices.take();
if (b == poison) return null; else return b;
} catch (InterruptedException e) {
e.printStackTrace();
return null;
}
}

private void slice(byte[] from, int startfrom, byte[] to, int startto, int len) {
if (startto >= head) return;
if (startto + len > head) len = head - startto;
assert to.length == head;
System.arraycopy(from, startfrom, to, startto, len);
}

public Integer call() {
filechunk c;
int p;
try {
byte[] slice = new byte[head];
int slicec = 0;
consumer: while(true) {
c = producer.consume();
if (c == null) {
// finished. put poison into slices queue
slices.put(poison);
break consumer;
}
p = 0;
// copy as much as possible to the current slice
slicefiller: while (true) {
assert slicesize > slicec;
if (c.n - p >= slicesize - slicec) {
// a full slice can be produced
slice(c.b, p, slice, slicec, slicesize - slicec);
// the slice is now full
p += slicesize - slicec;
slices.put(slice);
slice = new byte[head];
slicec = 0;
continue slicefiller;
} else {
// fill only a part of the slice and wait for next chunk
slice(c.b, p, slice, slicec, c.n - p);
// the chunk is now fully read
producer.recycle(c);
slicec += c.n - p;
continue consumer;
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}

return Integer.valueOf(0);
}

}

private static class filechunk {
public byte[] b;
public int n;
public filechunk(int len) {
b = new byte[len];
n = 0;
}
}

/**
* the filechunkProducer reads an in put file and stores chunks of the results
* into a buffer. All elements stored in the buffer must be recycled.
* The class does not allocate more memory than a given chunk size multiplied with a
* number of chunks that shall be stored in a queue for processing.
*/
private static class filechunkProducer implements Callable<Integer> {

private BlockingQueue<filechunk> empty;
private BlockingQueue<filechunk> filed;
private static filechunk poison = new filechunk(0);
private FileInputStream fis;

public filechunkProducer(File in, int bufferSize, int bufferCount) throws FileNotFoundException {
empty = new ArrayBlockingQueue<filechunk>(bufferCount);
filed = new ArrayBlockingQueue<filechunk>(bufferCount);
fis = new FileInputStream(in);
// fill the empty queue
for (int i = 0; i < bufferCount; i++) empty.add(new filechunk(bufferSize));
}

public void recycle(filechunk c) {
try {
empty.put(c);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public filechunk consume() {
try {
filechunk f = filed.take(); // leer
if (f == poison) return null; else return f;
} catch (InterruptedException e) {
e.printStackTrace();
try {
this.fis.close();
} catch (IOException e1) {
e1.printStackTrace();
}
return null;
}
}

public Integer call() {
try {
filechunk c;
while (true) {
c = empty.take(); // leer
c.n = fis.read(c.b);
if (c.n <= 0) break;
filed.put(c);
}
// put poison into consumer queue so he can stop consuming
filed.put(poison);
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
return Integer.valueOf(0);
}

}

}
8 changes: 0 additions & 8 deletions source/de/anomic/kelondro/text/BufferedIndex.java
Expand Up @@ -87,14 +87,6 @@ public interface BufferedIndex<ReferenceType extends Reference> extends Index<Re
*/
public long getBufferSizeBytes();

/**
* clean the buffer for a given time. The buffer may need operations
* for flushing, cleaning etc. The buffer operates this cleanup by itself,
* but may perform better if in spare time this method is called
* @param time the number of milliseconds that the operation may take
*/
public void cleanupBuffer(int time);

/**
* get the size of the buffer backend
* @return number of word references
Expand Down
4 changes: 0 additions & 4 deletions source/de/anomic/kelondro/text/IndexCell.java
Expand Up @@ -313,10 +313,6 @@ public void mountBLOBFile(File blobFile) throws IOException {
// for migration of cache files
this.array.mountBLOBFile(blobFile);
}

public void cleanupBuffer(int time) {
// do nothing
}

public int getBackendSize() {
return this.array.size();
Expand Down
4 changes: 2 additions & 2 deletions source/de/anomic/kelondro/text/Segment.java
Expand Up @@ -63,9 +63,9 @@ public final class Segment {
public static final int writeBufferSize = 4 * 1024 * 1024;

// the reference factory
public static final ReferenceFactory<WordReference> wordReferenceFactory = new WordReferenceFactory();
public static final ReferenceFactory<WordReference> wordReferenceFactory = new WordReferenceFactory();
public static final ReferenceFactory<NavigationReference> navigationReferenceFactory = new NavigationReferenceFactory();
public static final ByteOrder wordOrder = Base64Order.enhancedCoder;
public static final ByteOrder wordOrder = Base64Order.enhancedCoder;

private final Log log;
private final IndexCell<WordReference> termIndex;
Expand Down
5 changes: 1 addition & 4 deletions source/de/anomic/plasma/plasmaSwitchboard.java
Expand Up @@ -991,8 +991,7 @@ public boolean isInMyCluster(final yacySeed seed) {
return false;
}
}



public String urlExists(final String hash) {
// tests if hash occurrs in any database
// if it exists, the name of the database is returned,
Expand Down Expand Up @@ -1217,8 +1216,6 @@ public void enQueue(final IndexingStack.QueueEntry job) {
}

public void deQueueFreeMem() {
// flush some entries from the RAM cache
indexSegment.termIndex().cleanupBuffer(5000);
// empty some caches
indexSegment.urlMetadata().clearCache();
plasmaSearchEvent.cleanupEvents(true);
Expand Down

0 comments on commit 1c69d9b

Please sign in to comment.