Permalink
Browse files

Make Value an object, to make code more robust

  • Loading branch information...
justinsb committed Dec 16, 2013
1 parent 3da392f commit d69d9244faa780b6e8d5dabbf56c4c1c98d31e06
Showing with 238 additions and 200 deletions.
  1. +4 −3 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/KeyValueStateMachine.java
  2. +8 −9 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/KeyValueStore.java
  3. +2 −4 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/btree/BranchPage.java
  4. +3 −1 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/btree/EntryListener.java
  5. +23 −16 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/btree/LeafPage.java
  6. +2 −1 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/btree/Page.java
  7. +1 −1 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/btree/TransactionPage.java
  8. +2 −2 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/btree/WriteTransaction.java
  9. +4 −6 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/btree/operation/AppendOperation.java
  10. +1 −3 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/btree/operation/DeleteOperation.java
  11. +3 −5 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/btree/operation/IncrementOperation.java
  12. +1 −3 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/btree/operation/KeyOperation.java
  13. +4 −6 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/btree/operation/SetOperation.java
  14. +143 −0 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/btree/operation/Value.java
  15. +0 −102 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/btree/operation/Values.java
  16. +1 −1 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/freemap/FreeSpaceMap.java
  17. +1 −3 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/RedisEndpoint.java
  18. +3 −4 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/RedisServer.java
  19. +2 −3 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/commands/ExistsCommand.java
  20. +20 −17 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/commands/GetCommand.java
  21. +2 −2 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/commands/SetCommand.java
  22. +5 −5 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/web/KeyValueEndpoint.java
  23. +3 −3 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/web/KeyValueQueryBodyWriter.java
@@ -21,6 +21,7 @@
import com.cloudata.keyvalue.btree.operation.IncrementOperation;
import com.cloudata.keyvalue.btree.operation.KeyOperation;
import com.cloudata.keyvalue.btree.operation.SetOperation;
import com.cloudata.keyvalue.btree.operation.Value;
import com.cloudata.keyvalue.web.KeyValueQuery;
import com.google.common.base.Throwables;
import com.google.common.cache.CacheBuilder;
@@ -108,7 +109,7 @@ public Object applyOperation(@Nonnull ByteBuffer op) {
}
case SET:
operation = new SetOperation(value.asReadOnlyByteBuffer());
operation = new SetOperation(Value.deserialize(entry.getValue().asReadOnlyByteBuffer()));
break;
default:
@@ -158,9 +159,9 @@ public KeyValueStore load(@Nonnull Long id) throws Exception {
}
}
public ByteBuffer get(long storeId, ByteBuffer key) {
public Value get(long storeId, ByteString key) {
KeyValueStore keyValueStore = getKeyValueStore(storeId);
return keyValueStore.get(key);
return keyValueStore.get(key.asReadOnlyByteBuffer());
}
public KeyValueQuery scan(long storeId) {
@@ -8,13 +8,13 @@
import org.slf4j.LoggerFactory;
import com.cloudata.keyvalue.btree.Btree;
import com.cloudata.keyvalue.btree.ByteBuffers;
import com.cloudata.keyvalue.btree.EntryListener;
import com.cloudata.keyvalue.btree.MmapPageStore;
import com.cloudata.keyvalue.btree.PageStore;
import com.cloudata.keyvalue.btree.ReadOnlyTransaction;
import com.cloudata.keyvalue.btree.WriteTransaction;
import com.cloudata.keyvalue.btree.operation.KeyOperation;
import com.cloudata.keyvalue.btree.operation.Value;
import com.cloudata.keyvalue.web.KeyValueQuery;
public class KeyValueStore {
@@ -32,24 +32,23 @@ public KeyValueStore(File dir, boolean uniqueKeys) throws IOException {
this.btree = new Btree(pageStore, uniqueKeys);
}
public Object doAction(ByteBuffer key, KeyOperation operation) {
public void doAction(ByteBuffer key, KeyOperation<?> operation) {
try (WriteTransaction txn = btree.beginReadWrite()) {
Object ret = txn.doAction(btree, key, operation);
txn.doAction(btree, key, operation);
txn.commit();
return ret;
}
}
static class GetEntryListener implements EntryListener {
final ByteBuffer findKey;
ByteBuffer foundValue;
Value foundValue;
public GetEntryListener(ByteBuffer findKey) {
this.findKey = findKey;
}
@Override
public boolean found(ByteBuffer key, ByteBuffer value) {
public boolean found(ByteBuffer key, Value value) {
// log.debug("Found {}={}", ByteBuffers.toHex(key), ByteBuffers.toHex(value));
if (key.equals(findKey)) {
foundValue = value;
@@ -59,17 +58,17 @@ public boolean found(ByteBuffer key, ByteBuffer value) {
}
};
public ByteBuffer get(final ByteBuffer key) {
public Value get(final ByteBuffer key) {
try (ReadOnlyTransaction txn = btree.beginReadOnly()) {
GetEntryListener listener = new GetEntryListener(key);
txn.walk(btree, key, listener);
ByteBuffer value = listener.foundValue;
Value value = listener.foundValue;
// log.debug("Value for {}: {}", key, value);
if (value != null) {
// Once the transaction goes away the values may be invalid
value = ByteBuffers.clone(value);
value = value.clone();
}
// log.debug("Value for {}: {}", key, value);
@@ -412,7 +412,7 @@ Mutable getMutable() {
}
@Override
public Object doAction(Transaction txn, ByteBuffer key, KeyOperation operation) {
public <V> void doAction(Transaction txn, ByteBuffer key, KeyOperation<V> operation) {
int pos = findPos(key);
if (pos < 0) {
pos = 0;
@@ -424,15 +424,13 @@ public Object doAction(Transaction txn, ByteBuffer key, KeyOperation operation)
ByteBuffer oldLbound = childPage.getKeyLbound();
Object ret = childPage.doAction(txn, key, operation);
childPage.doAction(txn, key, operation);
ByteBuffer newLbound = childPage.getKeyLbound();
if (!oldLbound.equals(newLbound)) {
getMutable().updateLbound(txn, pageNumber);
}
return ret;
}
@Override
@@ -2,7 +2,9 @@
import java.nio.ByteBuffer;
import com.cloudata.keyvalue.btree.operation.Value;
public interface EntryListener {
public boolean found(ByteBuffer key, ByteBuffer value);
public boolean found(ByteBuffer key, Value value);
}
@@ -9,6 +9,7 @@
import org.slf4j.LoggerFactory;
import com.cloudata.keyvalue.btree.operation.KeyOperation;
import com.cloudata.keyvalue.btree.operation.Value;
import com.cloudata.util.Hex;
import com.google.common.collect.Lists;
import com.google.common.primitives.Shorts;
@@ -91,24 +92,25 @@ int firstGTE(ByteBuffer find) {
return pos;
}
Object doAction(ByteBuffer key, KeyOperation operation) {
<V> void doAction(ByteBuffer key, KeyOperation<V> operation) {
int position = firstGTE(key);
ByteBuffer oldValue = null;
Value oldValue = null;
ByteBuffer oldValueBuffer = null;
if (uniqueKeys) {
if (position < entries.size()) {
ByteBuffer positionKey = getKey(position);
int positionComparison = ByteBuffers.compare(positionKey, key);
if (positionComparison == 0) {
oldValue = getValue(position);
oldValueBuffer = getValue(position);
oldValue = Value.deserialize(oldValueBuffer);
}
}
} else {
throw new UnsupportedOperationException();
}
ByteBuffer newValue = operation.doAction(oldValue);
Value newValue = operation.doAction(oldValue);
if (newValue == oldValue) {
// No change (either both null or no value change)
@@ -117,27 +119,30 @@ Object doAction(ByteBuffer key, KeyOperation operation) {
entries.remove(position);
totalKeySize -= key.remaining();
totalValueSize -= oldValue.remaining();
totalValueSize -= oldValueBuffer.remaining();
log.info("Deleted entry @{}", position);
} else if (newValue != null && oldValue == null) {
// Insert new entry
Entry newEntry = new Entry(key, newValue);
ByteBuffer newValueBuffer = newValue.serialize();
Entry newEntry = new Entry(key, newValueBuffer);
entries.add(position, newEntry);
totalKeySize += key.remaining();
totalValueSize += newValue.remaining();
totalValueSize += newValueBuffer.remaining();
} else {
// Update value
assert newValue != null;
assert oldValue != null;
Entry newEntry = new Entry(key, newValue);
ByteBuffer newValueBuffer = newValue.serialize();
Entry newEntry = new Entry(key, newValueBuffer);
entries.set(position, newEntry);
totalValueSize += newValue.remaining() - oldValue.remaining();
totalValueSize += newValueBuffer.remaining() - oldValueBuffer.remaining();
}
return newValue;
// return newValue;
}
Mutable(LeafPage page) {
@@ -283,9 +288,11 @@ public boolean walk(Transaction txn, ByteBuffer from, EntryListener listener) {
int pos = from != null ? firstGTE(from) : 0;
while (pos < n) {
ByteBuffer key = getKey(pos);
ByteBuffer value = getValue(pos);
ByteBuffer valueBuffer = getValue(pos);
Value value = Value.deserialize(valueBuffer);
boolean keepGoing = listener.found(key.duplicate(), value.duplicate());
boolean keepGoing = listener.found(key.duplicate(), value);
if (!keepGoing) {
return false;
}
@@ -386,7 +393,7 @@ public boolean walk(Transaction txn, ByteBuffer from, EntryListener listener) {
ByteBuffer key = getKey(pos);
ByteBuffer value = getValue(pos);
boolean keepGoing = listener.found(key, value);
boolean keepGoing = listener.found(key, Value.deserialize(value));
if (!keepGoing) {
return false;
}
@@ -460,8 +467,8 @@ Mutable getMutable() {
}
@Override
public Object doAction(Transaction txn, ByteBuffer key, KeyOperation operation) {
return getMutable().doAction(key, operation);
public <V> void doAction(Transaction txn, ByteBuffer key, KeyOperation<V> operation) {
getMutable().doAction(key, operation);
}
@Override
@@ -22,7 +22,7 @@ protected Page(Page parent, int pageNumber, ByteBuffer buffer) {
public abstract boolean walk(Transaction txn, ByteBuffer from, EntryListener listener);
public abstract Object doAction(Transaction txn, ByteBuffer key, KeyOperation operation);
public abstract <V> void doAction(Transaction txn, ByteBuffer key, KeyOperation<V> operation);
public abstract ByteBuffer getKeyLbound();
@@ -54,4 +54,5 @@ public String dump() {
dump(ps);
return new String(baos.toByteArray());
}
}
@@ -182,7 +182,7 @@ public boolean walk(Transaction txn, ByteBuffer from, EntryListener listener) {
}
@Override
public Object doAction(Transaction txn, ByteBuffer key, KeyOperation operation) {
public <V> void doAction(Transaction txn, ByteBuffer key, KeyOperation<V> operation) {
throw new UnsupportedOperationException();
}
@@ -175,8 +175,8 @@ public void commit() {
this.transactionId = transactionId;
}
public Object doAction(Btree btree, ByteBuffer key, KeyOperation operation) {
return getRootPage(btree, true).doAction(this, key, operation);
public <V> void doAction(Btree btree, ByteBuffer key, KeyOperation<V> operation) {
getRootPage(btree, true).doAction(this, key, operation);
}
int createdPageCount;
@@ -1,7 +1,5 @@
package com.cloudata.keyvalue.btree.operation;
import java.nio.ByteBuffer;
import com.cloudata.keyvalue.KeyValueProto.KvAction;
import com.cloudata.keyvalue.KeyValueProto.KvEntry;
import com.google.protobuf.ByteString;
@@ -16,14 +14,14 @@ public AppendOperation(ByteString appendValue) {
}
@Override
public ByteBuffer doAction(ByteBuffer oldValue) {
public Value doAction(Value oldValue) {
if (oldValue == null) {
this.newLength = appendValue.size();
return Values.fromRawBytes(appendValue);
return Value.fromRawBytes(appendValue);
} else {
ByteBuffer appended = Values.concat(oldValue, appendValue);
Value appended = oldValue.concat(appendValue);
this.newLength = Values.sizeAsBytes(appended);
this.newLength = appended.sizeAsBytes();
return appended;
}
}
@@ -1,7 +1,5 @@
package com.cloudata.keyvalue.btree.operation;
import java.nio.ByteBuffer;
import com.cloudata.keyvalue.KeyValueProto.KvAction;
import com.cloudata.keyvalue.KeyValueProto.KvEntry;
@@ -10,7 +8,7 @@
private int deleteCount;
@Override
public ByteBuffer doAction(ByteBuffer oldValue) {
public Value doAction(Value oldValue) {
if (oldValue != null) {
deleteCount++;
}
@@ -1,7 +1,5 @@
package com.cloudata.keyvalue.btree.operation;
import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -20,16 +18,16 @@ public IncrementOperation(long delta) {
}
@Override
public ByteBuffer doAction(ByteBuffer oldValue) {
public Value doAction(Value oldValue) {
long oldValueLong = 0;
if (oldValue != null) {
oldValueLong = Values.asLong(oldValue);
oldValueLong = oldValue.asLong();
}
long newValueLong = oldValueLong + delta;
ByteBuffer newValue = Values.fromLong(newValueLong);
Value newValue = Value.fromLong(newValueLong);
log.debug("Increment: {} -> {}", oldValueLong, newValueLong);
@@ -1,11 +1,9 @@
package com.cloudata.keyvalue.btree.operation;
import java.nio.ByteBuffer;
import com.cloudata.keyvalue.KeyValueProto.KvEntry;
public abstract class KeyOperation<V> {
public abstract ByteBuffer doAction(ByteBuffer oldValue);
public abstract Value doAction(Value oldValue);
public abstract KvEntry.Builder serialize();
@@ -1,29 +1,27 @@
package com.cloudata.keyvalue.btree.operation;
import java.nio.ByteBuffer;
import com.cloudata.keyvalue.KeyValueProto.KvAction;
import com.cloudata.keyvalue.KeyValueProto.KvEntry;
import com.google.protobuf.ByteString;
public class SetOperation extends KeyOperation<Void> {
final ByteBuffer newValue;
final Value newValue;
public SetOperation(ByteBuffer newValue) {
public SetOperation(Value newValue) {
this.newValue = newValue;
}
@Override
public ByteBuffer doAction(ByteBuffer oldValue) {
public Value doAction(Value oldValue) {
return newValue;
}
@Override
public KvEntry.Builder serialize() {
KvEntry.Builder b = KvEntry.newBuilder();
b.setAction(KvAction.SET);
b.setValue(ByteString.copyFrom(newValue));
b.setValue(ByteString.copyFrom(newValue.serialize()));
return b;
}
Oops, something went wrong.

0 comments on commit d69d924

Please sign in to comment.