Permalink
Browse files

Support increment, return useful values from actions

  • Loading branch information...
justinsb committed Dec 11, 2013
1 parent 4995a04 commit 436642052996373a87f9d926c88d8a8731de8364
@@ -52,7 +52,30 @@ public void put(long storeId, ByteString key, ByteString value) throws Exception
} finally {
response.close();
}
}
public KeyValueEntry increment(long storeId, ByteString key) throws Exception {
ClientResponse response = CLIENT.resource(url).path(toUrlPath(storeId, key)).queryParam("action", "increment")
.post(ClientResponse.class);
try {
int status = response.getStatus();
switch (status) {
case 200:
break;
default:
throw new IllegalStateException("Unexpected status: " + status);
}
InputStream is = response.getEntityInputStream();
ByteString value = ByteString.readFrom(is);
return new KeyValueEntry(key, value);
} finally {
response.close();
}
}
public static class KeyValueEntry {

Some generated files are not rendered by default. Learn more.

Oops, something went wrong.
@@ -46,10 +46,16 @@ public void init(RaftService raft, File stateDir) {
// return logFileSet.readLog(logId, begin, max);
// }
public Object put(long storeId, byte[] key, byte[] value) throws InterruptedException, RaftException {
KvEntry entry = KvEntry.newBuilder().setStoreId(storeId).setKey(ByteString.copyFrom(key))
.setAction(KvAction.SET).setValue(ByteString.copyFrom(value)).build();
// public Object put(long storeId, byte[] key, byte[] value) throws InterruptedException, RaftException {
// KvEntry entry = KvEntry.newBuilder().setStoreId(storeId).setKey(ByteString.copyFrom(key))
// .setAction(KvAction.SET).setValue(ByteString.copyFrom(value)).build();
//
// log.debug("Proposing operation {}", entry.getAction());
//
// return raft.commit(entry.toByteArray());
// }
public Object doAction(KvEntry entry) throws InterruptedException, RaftException {
log.debug("Proposing operation {}", entry.getAction());
return raft.commit(entry.toByteArray());
@@ -77,10 +83,10 @@ public Object applyOperation(@Nonnull ByteBuffer op) {
KeyValueStore keyValueStore = getKeyValueStore(storeId);
keyValueStore.doAction(entry.getAction(), key != null ? key.asReadOnlyByteBuffer() : null,
Object ret = keyValueStore.doAction(entry.getAction(), key != null ? key.asReadOnlyByteBuffer() : null,
value != null ? value.asReadOnlyByteBuffer() : null);
return null;
return ret;
} catch (InvalidProtocolBufferException e) {
log.error("Error deserializing operation", e);
throw new IllegalArgumentException("Error deserializing key value operation", e);
@@ -30,10 +30,11 @@ public KeyValueStore(File dir, boolean uniqueKeys) throws IOException {
this.btree = new Btree(pageStore, uniqueKeys);
}
public void doAction(KvAction action, ByteBuffer key, ByteBuffer value) {
public Object doAction(KvAction action, ByteBuffer key, ByteBuffer value) {
try (WriteTransaction txn = btree.beginReadWrite()) {
txn.doAction(btree, action, key, value);
Object ret = txn.doAction(btree, action, key, value);
txn.commit();
return ret;
}
}
@@ -412,7 +412,7 @@ Mutable getMutable() {
}
@Override
public void doAction(Transaction txn, KvAction action, ByteBuffer key, ByteBuffer value) {
public Object doAction(Transaction txn, KvAction action, ByteBuffer key, ByteBuffer value) {
int pos = findPos(key);
if (pos < 0) {
pos = 0;
@@ -424,13 +424,15 @@ public void doAction(Transaction txn, KvAction action, ByteBuffer key, ByteBuffe
ByteBuffer oldLbound = childPage.getKeyLbound();
childPage.doAction(txn, action, key, value);
Object ret = childPage.doAction(txn, action, key, value);
ByteBuffer newLbound = childPage.getKeyLbound();
if (!oldLbound.equals(newLbound)) {
getMutable().updateLbound(txn, pageNumber);
}
return ret;
}
@Override
@@ -54,4 +54,29 @@ private final static int compare(byte l, byte r) {
return Integer.compare(il, ir);
}
public static long parseLong(ByteBuffer buff) {
long v = 0;
boolean negative = false;
for (int i = buff.position(); i < buff.limit(); i++) {
byte b = buff.get(i);
if (b >= '0' && b <= '9') {
v = (v * 10) + (b - '0');
continue;
} else if (b == '-') {
if (i == 0) {
negative = true;
continue;
}
}
throw new IllegalArgumentException();
}
if (negative) {
v = -v;
}
return v;
}
}
@@ -91,13 +91,14 @@ int firstGTE(ByteBuffer find) {
return pos;
}
boolean doAction(KvAction action, ByteBuffer key, ByteBuffer value) {
boolean changed = false;
Object doAction(KvAction action, ByteBuffer key, ByteBuffer value) {
Object ret = null;
int position = firstGTE(key);
switch (action) {
case SET:
case SET: {
boolean done = false;
Entry newEntry = new Entry(key, value);
if (uniqueKeys) {
if (position < entries.size()) {
@@ -109,21 +110,62 @@ boolean doAction(KvAction action, ByteBuffer key, ByteBuffer value) {
entries.set(position, newEntry);
totalValueSize += value.remaining() - oldValue.remaining();
changed = true;
done = true;
}
}
}
if (!changed) {
if (!done) {
entries.add(position, newEntry);
totalKeySize += key.remaining();
totalValueSize += value.remaining();
}
break;
}
case INCREMENT: {
boolean done = false;
if (uniqueKeys) {
if (position < entries.size()) {
ByteBuffer midKey = getKey(position);
int comparison = ByteBuffers.compare(midKey, key);
if (comparison == 0) {
ByteBuffer oldValue = getValue(position);
long oldValueLong = ByteBuffers.parseLong(oldValue);
oldValueLong++;
ByteBuffer newValue = ByteBuffer.wrap(Long.toString(oldValueLong).getBytes());
Entry newEntry = new Entry(key, newValue);
entries.set(position, newEntry);
totalValueSize += newValue.remaining() - oldValue.remaining();
done = true;
ret = newValue;
}
}
if (!done) {
ByteBuffer newValue = ByteBuffer.wrap(Long.toString(1).getBytes());
Entry newEntry = new Entry(key, newValue);
entries.add(position, newEntry);
totalKeySize += key.remaining();
totalValueSize += newValue.remaining();
ret = newValue;
}
} else {
throw new UnsupportedOperationException();
}
break;
}
case DELETE:
case DELETE: {
if (uniqueKeys) {
if (position < entries.size()) {
ByteBuffer midKey = getKey(position);
@@ -135,10 +177,11 @@ boolean doAction(KvAction action, ByteBuffer key, ByteBuffer value) {
totalKeySize -= key.remaining();
totalValueSize -= oldValue.remaining();
changed = true;
ret = true;
log.info("Deleted entry @{}", position);
} else {
log.info("Key not found in delete");
ret = false;
}
}
} else {
@@ -147,11 +190,13 @@ boolean doAction(KvAction action, ByteBuffer key, ByteBuffer value) {
}
break;
}
default:
throw new UnsupportedOperationException();
}
return changed;
return ret;
}
Mutable(LeafPage page) {
@@ -474,8 +519,8 @@ Mutable getMutable() {
}
@Override
public void doAction(Transaction txn, KvAction action, ByteBuffer key, ByteBuffer value) {
getMutable().doAction(action, key, value);
public Object doAction(Transaction txn, KvAction action, ByteBuffer key, ByteBuffer value) {
return getMutable().doAction(action, key, value);
}
@Override
@@ -22,7 +22,7 @@ protected Page(Page parent, int pageNumber, ByteBuffer buffer) {
public abstract boolean walk(Transaction txn, ByteBuffer from, EntryListener listener);
public abstract void doAction(Transaction txn, KvAction action, ByteBuffer key, ByteBuffer value);
public abstract Object doAction(Transaction txn, KvAction action, ByteBuffer key, ByteBuffer value);
public abstract ByteBuffer getKeyLbound();
@@ -182,7 +182,7 @@ public boolean walk(Transaction txn, ByteBuffer from, EntryListener listener) {
}
@Override
public void doAction(Transaction txn, KvAction action, ByteBuffer key, ByteBuffer value) {
public Object doAction(Transaction txn, KvAction action, ByteBuffer key, ByteBuffer value) {
throw new UnsupportedOperationException();
}
@@ -175,8 +175,8 @@ public void commit() {
this.transactionId = transactionId;
}
public void doAction(Btree btree, KvAction action, ByteBuffer key, ByteBuffer value) {
getRootPage(btree, true).doAction(this, action, key, value);
public Object doAction(Btree btree, KvAction action, ByteBuffer key, ByteBuffer value) {
return getRootPage(btree, true).doAction(this, action, key, value);
}
int createdPageCount;
@@ -138,7 +138,7 @@ public boolean walk(Transaction txn, ByteBuffer from, EntryListener listener) {
}
@Override
public void doAction(Transaction txn, KvAction action, ByteBuffer key, ByteBuffer value) {
public Object doAction(Transaction txn, KvAction action, ByteBuffer key, ByteBuffer value) {
throw new IllegalStateException();
}
Oops, something went wrong.

0 comments on commit 4366420

Please sign in to comment.