Permalink
Browse files

Support variant value types

Currently we don't do anything particularly interesting with it...
  • Loading branch information...
justinsb committed Dec 16, 2013
1 parent 0b727bc commit 3da392f59434987765f2db48d69830fbfa048b28
@@ -60,15 +60,15 @@ public void init(RaftService raft, File stateDir) {
// return raft.commit(entry.toByteArray());
// }
public Object doAction(long storeId, ByteString key, KeyOperation operation) throws InterruptedException,
public <V> V doAction(long storeId, ByteString key, KeyOperation<V> operation) throws InterruptedException,
RaftException {
KvEntry.Builder entry = operation.serialize();
entry.setKey(key);
entry.setStoreId(storeId);
log.debug("Proposing operation {}", entry.getAction());
return raft.commit(entry.build().toByteArray());
return (V) raft.commit(entry.build().toByteArray());
}
@Override
@@ -91,7 +91,7 @@ public Object applyOperation(@Nonnull ByteBuffer op) {
switch (entry.getAction()) {
case APPEND:
operation = new AppendOperation(value.asReadOnlyByteBuffer());
operation = new AppendOperation(value);
break;
case DELETE:
@@ -8,25 +8,22 @@
public class AppendOperation extends KeyOperation<Integer> {
final ByteBuffer appendValue;
final ByteString appendValue;
private int newLength;
public AppendOperation(ByteBuffer appendValue) {
public AppendOperation(ByteString appendValue) {
this.appendValue = appendValue;
}
@Override
public ByteBuffer doAction(ByteBuffer oldValue) {
if (oldValue == null) {
this.newLength = appendValue.remaining();
return appendValue;
this.newLength = appendValue.size();
return Values.fromRawBytes(appendValue);
} else {
int n = oldValue.remaining() + appendValue.remaining();
ByteBuffer appended = ByteBuffer.allocate(n);
appended.put(oldValue.duplicate());
appended.put(appendValue.duplicate());
appended.flip();
this.newLength = n;
ByteBuffer appended = Values.concat(oldValue, appendValue);
this.newLength = Values.sizeAsBytes(appended);
return appended;
}
}
@@ -35,7 +32,7 @@ public ByteBuffer doAction(ByteBuffer oldValue) {
public KvEntry.Builder serialize() {
KvEntry.Builder b = KvEntry.newBuilder();
b.setAction(KvAction.APPEND);
b.setValue(ByteString.copyFrom(appendValue));
b.setValue(appendValue);
return b;
}
@@ -2,14 +2,18 @@
import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudata.keyvalue.KeyValueProto.KvAction;
import com.cloudata.keyvalue.KeyValueProto.KvEntry;
import com.cloudata.keyvalue.btree.ByteBuffers;
public class IncrementOperation extends KeyOperation<ByteBuffer> {
public class IncrementOperation extends KeyOperation<Long> {
private static final Logger log = LoggerFactory.getLogger(IncrementOperation.class);
final long delta;
private ByteBuffer newValue;
private Long result;
public IncrementOperation(long delta) {
this.delta = delta;
@@ -20,16 +24,18 @@ public ByteBuffer doAction(ByteBuffer oldValue) {
long oldValueLong = 0;
if (oldValue != null) {
oldValueLong = ByteBuffers.parseLong(oldValue);
oldValueLong = Values.asLong(oldValue);
}
long newValueLong = oldValueLong + delta;
ByteBuffer newValue = ByteBuffer.wrap(Long.toString(newValueLong).getBytes());
ByteBuffer newValue = Values.fromLong(newValueLong);
this.newValue = newValue.duplicate();
log.debug("Increment: {} -> {}", oldValueLong, newValueLong);
// this.newValueLong = newValueLong;
// this.newValue = newValue.duplicate();
this.result = newValueLong;
return newValue;
}
@@ -43,8 +49,8 @@ public ByteBuffer doAction(ByteBuffer oldValue) {
}
@Override
public ByteBuffer getResult() {
return newValue;
public Long getResult() {
return result;
}
}
@@ -0,0 +1,102 @@
package com.cloudata.keyvalue.btree.operation;
import java.nio.ByteBuffer;
import com.cloudata.keyvalue.btree.ByteBuffers;
import com.google.protobuf.ByteString;
public class Values {
public static final byte FORMAT_RAW = 0;
public static final byte FORMAT_INT64 = 1;
static final ByteString PREFIX_RAW = ByteString.copyFrom(new byte[] { FORMAT_RAW });
static final ByteString PREFIX_INT64 = ByteString.copyFrom(new byte[] { FORMAT_INT64 });
public static ByteBuffer fromRawBytes(byte[] data) {
return fromRawBytes(ByteString.copyFrom(data));
}
public static ByteBuffer fromRawBytes(ByteString b) {
return PREFIX_RAW.concat(b).asReadOnlyByteBuffer();
}
// TODO: Fix this...
public static ByteBuffer concat(ByteBuffer oldValue, ByteString appendValue) {
ByteBuffer old = Values.asBytes(oldValue);
int n = old.remaining() + appendValue.size();
ByteBuffer appended = ByteBuffer.allocate(1 + n);
appended.put(FORMAT_RAW);
appended.put(old.duplicate());
appended.put(appendValue.asReadOnlyByteBuffer());
appended.flip();
return appended;
}
public static ByteBuffer fromLong(long v) {
ByteBuffer buffer = ByteBuffer.allocate(9);
buffer.put(FORMAT_INT64);
buffer.putLong(v);
buffer.flip();
return buffer;
}
public static byte getType(ByteBuffer data) {
return data.get(data.position());
}
public static ByteBuffer asBytes(ByteBuffer data) {
switch (getType(data)) {
case FORMAT_RAW: {
ByteBuffer b = data.duplicate();
b.position(b.position() + 1);
return b;
}
case FORMAT_INT64: {
long v = data.getLong(data.position() + 1);
return ByteBuffer.wrap(Long.toString(v).getBytes());
}
default:
throw new IllegalStateException();
}
}
public static long asLong(ByteBuffer data) {
switch (getType(data)) {
case FORMAT_RAW: {
ByteBuffer b = data.duplicate();
b.position(b.position() + 1);
long v = ByteBuffers.parseLong(b);
return v;
}
case FORMAT_INT64:
return data.getLong(data.position() + 1);
default:
throw new IllegalStateException();
}
}
public static int sizeAsBytes(ByteBuffer data) {
switch (getType(data)) {
case FORMAT_RAW: {
return data.remaining() - 1;
}
case FORMAT_INT64: {
// This isn't fast...
throw new UnsupportedOperationException();
}
default:
throw new IllegalStateException();
}
}
}
@@ -17,7 +17,7 @@ public RedisServer(KeyValueStateMachine stateMachine, long storeId) {
this.storeId = storeId;
}
public Object doAction(ByteString key, KeyOperation<?> operation) throws RedisException {
public <V> V doAction(ByteString key, KeyOperation<V> operation) throws RedisException {
try {
return stateMachine.doAction(storeId, key, operation);
} catch (InterruptedException e) {
@@ -1,7 +1,5 @@
package com.cloudata.keyvalue.redis.commands;
import java.nio.ByteBuffer;
import com.cloudata.keyvalue.btree.operation.AppendOperation;
import com.cloudata.keyvalue.redis.RedisException;
import com.cloudata.keyvalue.redis.RedisRequest;
@@ -21,10 +19,10 @@ public RedisResponse execute(RedisServer server, RedisSession session, RedisRequ
}
ByteString key = session.mapToKey(command.getByteString(1));
byte[] value = command.get(2);
ByteString value = command.getByteString(2);
AppendOperation operation = new AppendOperation(ByteBuffer.wrap(value));
Number ret = (Number) server.doAction(key, operation);
AppendOperation operation = new AppendOperation(value);
Number ret = server.doAction(key, operation);
return IntegerRedisResponse.valueOf(ret.longValue());
}
@@ -5,6 +5,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudata.keyvalue.btree.operation.Values;
import com.cloudata.keyvalue.redis.RedisException;
import com.cloudata.keyvalue.redis.RedisRequest;
import com.cloudata.keyvalue.redis.RedisServer;
@@ -21,11 +22,24 @@ public RedisResponse execute(RedisServer server, RedisSession session, RedisRequ
ByteString key = session.mapToKey(command.getByteString(1));
ByteBuffer value = server.get(key);
if (value == null) {
return BulkRedisResponse.NIL_REPLY;
}
return new BulkRedisResponse(value);
switch (Values.getType(value)) {
case Values.FORMAT_INT64: {
long v = Values.asLong(value);
ByteBuffer valueBuffer = ByteBuffer.wrap(Long.toString(v).getBytes());
return new BulkRedisResponse(valueBuffer);
}
case Values.FORMAT_RAW: {
ByteBuffer bytes = Values.asBytes(value);
return new BulkRedisResponse(bytes);
}
default:
throw new IllegalStateException();
}
}
}
@@ -1,11 +1,8 @@
package com.cloudata.keyvalue.redis.commands;
import java.nio.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudata.keyvalue.btree.ByteBuffers;
import com.cloudata.keyvalue.btree.operation.IncrementOperation;
import com.cloudata.keyvalue.redis.RedisException;
import com.cloudata.keyvalue.redis.RedisRequest;
@@ -30,9 +27,7 @@ protected IntegerRedisResponse execute(RedisServer server, RedisSession session,
ByteString key = session.mapToKey(command.getByteString(1));
IncrementOperation operation = new IncrementOperation(deltaLong);
ByteBuffer value = (ByteBuffer) server.doAction(key, operation);
long v = ByteBuffers.parseLong(value);
Long v = server.doAction(key, operation);
return new IntegerRedisResponse(v);
}
}
@@ -1,8 +1,7 @@
package com.cloudata.keyvalue.redis.commands;
import java.nio.ByteBuffer;
import com.cloudata.keyvalue.btree.operation.SetOperation;
import com.cloudata.keyvalue.btree.operation.Values;
import com.cloudata.keyvalue.redis.RedisException;
import com.cloudata.keyvalue.redis.RedisRequest;
import com.cloudata.keyvalue.redis.RedisServer;
@@ -21,9 +20,9 @@ public RedisResponse execute(RedisServer server, RedisSession session, RedisRequ
}
ByteString key = session.mapToKey(command.getByteString(1));
byte[] value = command.get(2);
ByteString value = command.getByteString(2);
SetOperation operation = new SetOperation(ByteBuffer.wrap(value));
SetOperation operation = new SetOperation(Values.fromRawBytes(value));
server.doAction(key, operation);
return StatusRedisResponse.OK;
Oops, something went wrong.

0 comments on commit 3da392f

Please sign in to comment.