Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Updated Krati to use new Indexed store...

  • Loading branch information...
commit e3385cc352c64b5e50c9a3ab2c084e591cd14b92 1 parent a24eb46
@rsumbaly authored
View
2  .classpath
@@ -49,7 +49,7 @@
<classpathentry kind="lib" path="lib/jackson-core-asl-1.4.0.jar"/>
<classpathentry kind="lib" path="lib/avro-modified-jdk5-1.3.0.jar"/>
<classpathentry kind="lib" path="contrib/hadoop/lib/pig-0.7.1-dev-core.jar"/>
- <classpathentry kind="lib" path="contrib/krati/lib/krati-0.3.4.jar"/>
<classpathentry kind="lib" path="lib/jna.jar"/>
+ <classpathentry kind="lib" path="contrib/krati/lib/krati-0.3.5.jar"/>
<classpathentry kind="output" path="classes"/>
</classpath>
View
7 contrib/krati/config/single_node_cluster/config/server.properties
@@ -21,8 +21,11 @@ mysql.database=test
# Krati
krati.segment.filesize.mb=256
-krati.load.factor=0.75
-krati.initLevel=2
+krati.init.level=2
+krati.lock.stripes=50
+krati.index.segment.mb=32
+krati.store.segment.mb=256
+krati.batch.size=10000
#ReadOnly
enable.readonly.engine=true
View
BIN  contrib/krati/lib/krati-0.3.4.jar
Binary file not shown
View
BIN  contrib/krati/lib/krati-0.3.5.jar
Binary file not shown
View
30 contrib/krati/src/java/voldemort/store/krati/KratiStorageConfiguration.java
@@ -2,9 +2,6 @@
import java.io.File;
-import krati.cds.impl.segment.MappedSegmentFactory;
-import krati.cds.impl.segment.SegmentFactory;
-
import org.apache.log4j.Logger;
import voldemort.server.VoldemortConfig;
@@ -12,7 +9,6 @@
import voldemort.store.StorageEngine;
import voldemort.utils.ByteArray;
import voldemort.utils.Props;
-import voldemort.utils.ReflectUtils;
public class KratiStorageConfiguration implements StorageConfiguration {
@@ -21,24 +17,21 @@
private static Logger logger = Logger.getLogger(KratiStorageConfiguration.class);
private final String dataDirectory;
- private final int lockStripes;
- private final int segmentFileSizeMb;
- private final int initLevel;
- private final double hashLoadFactor;
+ private final int lockStripes, storeSegmentFileSizeMb, indexSegmentFileSizeMb, initLevel,
+ batchSize;
private final Object lock = new Object();
- private final Class<?> factoryClass;
public KratiStorageConfiguration(VoldemortConfig config) {
Props props = config.getAllProps();
File kratiDir = new File(config.getDataDirectory(), "krati");
kratiDir.mkdirs();
this.dataDirectory = kratiDir.getAbsolutePath();
- this.segmentFileSizeMb = props.getInt("krati.segment.filesize.mb", 256);
- this.hashLoadFactor = props.getDouble("krati.load.factor", 0.75);
- this.initLevel = props.getInt("krati.initlevel", 2);
+ this.batchSize = props.getInt("krati.batch.size", 10000);
+ this.indexSegmentFileSizeMb = props.getInt("krati.index.segment.mb", 32);
+ this.storeSegmentFileSizeMb = props.getInt("krati.store.segment.mb", 256);
+ this.initLevel = props.getInt("krati.init.level", 2);
this.lockStripes = props.getInt("krati.lock.stripes", 50);
- this.factoryClass = ReflectUtils.loadClass(props.getString("krati.segment.factory.class",
- MappedSegmentFactory.class.getName()));
+
}
public void close() {}
@@ -51,12 +44,11 @@ public void close() {}
storeDir.mkdirs();
}
- SegmentFactory segmentFactory = (SegmentFactory) ReflectUtils.callConstructor(factoryClass);
return new KratiStorageEngine(storeName,
- segmentFactory,
- segmentFileSizeMb,
+ batchSize,
+ indexSegmentFileSizeMb,
+ storeSegmentFileSizeMb,
lockStripes,
- hashLoadFactor,
initLevel,
storeDir);
}
@@ -66,4 +58,4 @@ public String getType() {
return TYPE_NAME;
}
-}
+}
View
150 contrib/krati/src/java/voldemort/store/krati/KratiStorageEngine.java
@@ -6,16 +6,15 @@
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
-import krati.cds.array.DataArray;
-import krati.cds.impl.segment.SegmentFactory;
-import krati.cds.impl.store.DynamicDataStore;
-import krati.util.FnvHashFunction;
+import krati.core.segment.MemorySegmentFactory;
+import krati.core.segment.WriteBufferSegmentFactory;
+import krati.store.IndexedDataStore;
import org.apache.log4j.Logger;
@@ -39,24 +38,27 @@
private static final Logger logger = Logger.getLogger(KratiStorageEngine.class);
private final String name;
- private final DynamicDataStore datastore;
+ private final IndexedDataStore datastore;
private final StripedLock locks;
public KratiStorageEngine(String name,
- SegmentFactory segmentFactory,
- int segmentFileSizeMB,
+ int batchSize,
+ int indexSegmentFileSizeMB,
+ int storeSegmentFileSizeMB,
int lockStripes,
- double hashLoadFactor,
int initLevel,
File dataDirectory) {
this.name = Utils.notNull(name);
try {
- this.datastore = new DynamicDataStore(dataDirectory,
+ this.datastore = new IndexedDataStore(dataDirectory,
+ batchSize,
+ 5,
initLevel,
- segmentFileSizeMB,
- segmentFactory,
- hashLoadFactor,
- new FnvHashFunction());
+ indexSegmentFileSizeMB,
+ new MemorySegmentFactory(),
+ initLevel,
+ storeSegmentFileSizeMB,
+ new WriteBufferSegmentFactory(storeSegmentFileSizeMB));
this.locks = new StripedLock(lockStripes);
} catch(Exception e) {
throw new VoldemortException("Failure initializing store.", e);
@@ -104,46 +106,11 @@ public void truncate() {
}
public ClosableIterator<Pair<ByteArray, Versioned<byte[]>>> entries() {
- List<Pair<ByteArray, Versioned<byte[]>>> returnedList = new ArrayList<Pair<ByteArray, Versioned<byte[]>>>();
- DataArray array = datastore.getDataArray();
- for(int index = 0; index < array.length(); index++) {
- byte[] returnedBytes = array.getData(index);
- if(returnedBytes != null) {
- // Extract the key value pair from this
- // TODO: Move to DynamicDataStore code
- ByteBuffer bb = ByteBuffer.wrap(returnedBytes);
- int cnt = bb.getInt();
- if(cnt > 0) {
- int keyLen = bb.getInt();
- byte[] key = new byte[keyLen];
- bb.get(key);
-
- int valueLen = bb.getInt();
- byte[] value = new byte[valueLen];
- bb.get(value);
-
- List<Versioned<byte[]>> versions;
- try {
- versions = disassembleValues(value);
- } catch(IOException e) {
- versions = null;
- }
-
- if(versions != null) {
- Iterator<Versioned<byte[]>> iterVersions = versions.iterator();
- while(iterVersions.hasNext()) {
- Versioned<byte[]> currentVersion = iterVersions.next();
- returnedList.add(Pair.create(new ByteArray(key), currentVersion));
- }
- }
- }
- }
- }
- return new KratiClosableIterator(returnedList);
+ return new KratiEntriesIterator(datastore.iterator());
}
public ClosableIterator<ByteArray> keys() {
- return StoreUtils.keys(entries());
+ return new KratiKeysIterator(datastore.keyIterator());
}
public boolean delete(ByteArray key, Version maxVersion) throws VoldemortException {
@@ -279,31 +246,94 @@ else if(occured == Occured.AFTER)
return returnList;
}
- private class KratiClosableIterator implements
+ private class KratiEntriesIterator implements
ClosableIterator<Pair<ByteArray, Versioned<byte[]>>> {
- private Iterator<Pair<ByteArray, Versioned<byte[]>>> iter;
+ private Iterator<Entry<byte[], byte[]>> iterator;
+ private ByteArray currentKey;
+ private Iterator<Versioned<byte[]>> currentValues;
- public KratiClosableIterator(List<Pair<ByteArray, Versioned<byte[]>>> list) {
- iter = list.iterator();
+ public KratiEntriesIterator(Iterator<Entry<byte[], byte[]>> iterator) {
+ this.iterator = iterator;
}
public void close() {
// Nothing to close here
}
+ private boolean hasNextInCurrentValues() {
+ return currentValues != null && currentValues.hasNext();
+ }
+
public boolean hasNext() {
- return iter.hasNext();
+ return hasNextInCurrentValues() || iterator.hasNext();
+ }
+
+ private Pair<ByteArray, Versioned<byte[]>> nextInCurrentValues() {
+ Versioned<byte[]> item = currentValues.next();
+ return Pair.create(currentKey, item);
}
public Pair<ByteArray, Versioned<byte[]>> next() {
- return iter.next();
+ if(hasNextInCurrentValues()) {
+ return nextInCurrentValues();
+ } else {
+ // keep trying to get a next, until we find one (they could get
+ // removed)
+ while(true) {
+ Entry<byte[], byte[]> entry = iterator.next();
+
+ List<Versioned<byte[]>> list;
+ try {
+ list = disassembleValues(entry.getValue());
+ } catch(IOException e) {
+ list = new ArrayList<Versioned<byte[]>>();
+ }
+ synchronized(list) {
+ // okay we may have gotten an empty list, if so try
+ // again
+ if(list.size() == 0)
+ continue;
+
+ // grab a snapshot of the list while we have exclusive
+ // access
+ currentValues = new ArrayList<Versioned<byte[]>>(list).iterator();
+ }
+ currentKey = new ByteArray(entry.getKey());
+ return nextInCurrentValues();
+ }
+ }
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ }
+
+ private class KratiKeysIterator implements ClosableIterator<ByteArray> {
+
+ private Iterator<byte[]> iter;
+
+ public KratiKeysIterator(Iterator<byte[]> iterator) {
+ iter = iterator;
+ }
+
+ public void close() {
+ // Nothing to close here
+ }
+
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ public ByteArray next() {
+ return new ByteArray(iter.next());
}
public void remove() {
- Pair<ByteArray, Versioned<byte[]>> currentPair = next();
- delete(currentPair.getFirst(), currentPair.getSecond().getVersion());
+ throw new UnsupportedOperationException();
}
}
-}
+}
View
9 contrib/krati/test/voldemort/store/krati/KratiStorageEngineTest.java
@@ -2,7 +2,6 @@
import java.io.File;
-import krati.cds.impl.segment.MappedSegmentFactory;
import voldemort.TestUtils;
import voldemort.store.AbstractStorageEngineTest;
import voldemort.store.StorageEngine;
@@ -18,13 +17,7 @@ protected void setUp() throws Exception {
File storeDir = TestUtils.createTempDir();
storeDir.mkdirs();
storeDir.deleteOnExit();
- this.store = new KratiStorageEngine("storeName",
- new MappedSegmentFactory(),
- 10,
- 10,
- 0.75,
- 0,
- storeDir);
+ this.store = new KratiStorageEngine("storeName", 1000, 8, 8, 50, 2, storeDir);
}
@Override
Please sign in to comment.
Something went wrong with that request. Please try again.