Permalink
Browse files

Wired up redis implementation and fixed lots of bugs

  • Loading branch information...
justinsb committed Dec 13, 2013
1 parent 653c4e3 commit b1aa3130744ca0d4ef377c71b72b473cea6e6f99
Showing with 338 additions and 102 deletions.
  1. +6 −1 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/KeyValueServer.java
  2. +1 −1 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/KeyValueStateMachine.java
  3. +6 −0 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/KeyValueStore.java
  4. +8 −0 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/btree/ByteBuffers.java
  5. +1 −1 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/btree/PageStore.java
  6. +1 −1 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/btree/TransactionPage.java
  7. +1 −1 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/Codec.java
  8. +0 −10 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/PingCommand.java
  9. +0 −5 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/RedisCommand.java
  10. +9 −2 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/RedisEndpoint.java
  11. +30 −0 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/RedisRequest.java
  12. +71 −66 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/RedisRequestDecoder.java
  13. +28 −1 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/RedisRequestHandler.java
  14. +13 −1 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/RedisResponseEncoder.java
  15. +11 −0 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/RedisServer.java
  16. +15 −0 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/commands/EchoCommand.java
  17. +30 −0 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/commands/GetCommand.java
  18. +16 −0 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/commands/PingCommand.java
  19. +16 −0 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/commands/QuitCommand.java
  20. +10 −0 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/commands/RedisCommand.java
  21. +28 −0 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/commands/SetCommand.java
  22. +24 −6 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/{ → response}/BulkRedisResponse.java
  23. +3 −1 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/{ → response}/ErrorRedisReponse.java
  24. +1 −1 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/{ → response}/InlineRedisResponse.java
  25. +3 −1 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/{ → response}/IntegerRedisResponse.java
  26. +1 −1 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/{ → response}/RedisResponse.java
  27. +5 −2 cloudata-keyvalue/src/main/java/com/cloudata/keyvalue/redis/{ → response}/StatusRedisResponse.java
@@ -10,6 +10,7 @@
import org.robotninjas.barge.Replica;
import com.cloudata.keyvalue.redis.RedisEndpoint;
import com.cloudata.keyvalue.redis.RedisServer;
import com.cloudata.keyvalue.web.WebModule;
import com.google.common.collect.Lists;
import com.google.inject.Guice;
@@ -71,7 +72,11 @@ public synchronized void start() throws Exception {
this.selector = GrizzlyServerFactory.create(baseUri, rc, ioc);
if (redisSocketAddress != null) {
this.redisEndpoint = new RedisEndpoint(redisSocketAddress);
long storeId = 1;
KeyValueStore keyValueStore = stateMachine.getKeyValueStore(storeId);
RedisServer redisServer = new RedisServer(keyValueStore);
this.redisEndpoint = new RedisEndpoint(redisSocketAddress, redisServer);
this.redisEndpoint.start();
}
}
@@ -96,7 +96,7 @@ public Object applyOperation(@Nonnull ByteBuffer op) {
}
}
private KeyValueStore getKeyValueStore(long id) {
KeyValueStore getKeyValueStore(long id) {
try {
return keyValueStoreCache.get(id);
} catch (ExecutionException e) {
@@ -9,6 +9,7 @@
import com.cloudata.keyvalue.KeyValueProto.KvAction;
import com.cloudata.keyvalue.btree.Btree;
import com.cloudata.keyvalue.btree.ByteBuffers;
import com.cloudata.keyvalue.btree.EntryListener;
import com.cloudata.keyvalue.btree.MmapPageStore;
import com.cloudata.keyvalue.btree.PageStore;
@@ -63,7 +64,12 @@ public ByteBuffer get(final ByteBuffer key) {
txn.walk(btree, key, listener);
ByteBuffer value = listener.foundValue;
// log.debug("Value for {}: {}", key, value);
if (value != null) {
// Once the transaction goes away the values may be invalid
value = ByteBuffers.clone(value);
}
// log.debug("Value for {}: {}", key, value);
return value;
@@ -79,4 +79,12 @@ public static long parseLong(ByteBuffer buff) {
}
return v;
}
public static ByteBuffer clone(ByteBuffer b) {
int n = b.remaining();
ByteBuffer buff = ByteBuffer.allocate(n);
buff.put(b.duplicate());
buff.flip();
return buff;
}
}
@@ -71,7 +71,7 @@ public long assignTransactionId() {
public ReadOnlyTransaction beginReadOnlyTransaction() {
synchronized (this) {
log.info("Starting new read-write transaction with root page: {}", currentRootPage);
log.info("Starting new read-only transaction with root page: {}", currentRootPage);
ReadOnlyTransaction txn = new ReadOnlyTransaction(this, currentRootPage, currentTransactionId);
readTransactions.add(txn);
return txn;
@@ -213,7 +213,7 @@ public byte getPageType() {
@Override
public void dump(PrintStream os) {
throw new UnsupportedOperationException();
os.println("TransactionPage: " + getTransactionId());
}
public void setRootPageId(int newRootPageId) {
@@ -37,7 +37,7 @@ public static void writeLong(ByteBuf out, long value) {
}
}
public static long readLong(ByteBuf buf) throws IOException {
public static long readLongWithCrlf(ByteBuf buf) throws IOException {
long v = 0;
boolean negative = false;
int read = buf.readByte();

This file was deleted.

Oops, something went wrong.

This file was deleted.

Oops, something went wrong.
@@ -8,21 +8,28 @@
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import java.net.SocketAddress;
;
public class RedisEndpoint {
DefaultEventExecutorGroup group;
final SocketAddress localAddress;
public RedisEndpoint(SocketAddress localAddress) {
final RedisServer redisServer;
public RedisEndpoint(SocketAddress localAddress, RedisServer redisServer) {
this.localAddress = localAddress;
this.redisServer = redisServer;
}
public void start() throws InterruptedException {
final RedisRequestHandler commandHandler = new RedisRequestHandler(new RedisServer());
final RedisRequestHandler commandHandler = new RedisRequestHandler(redisServer);
ServerBootstrap b = new ServerBootstrap();
int nThreads = 1;
@@ -25,4 +25,34 @@ public boolean isInline() {
return command[0];
}
public byte[] get(int i) {
return command[i];
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(getClass().getSimpleName());
sb.append(" [");
for (int i = 0; i < command.length; i++) {
if (i != 0) {
sb.append(",");
}
byte[] a = command[i];
if (a == null) {
sb.append("(null)");
}
sb.append(new String(a));
}
if (inline) {
sb.append(" (inline)");
}
sb.append("]");
return sb.toString();
}
public int getArgc() {
return command.length;
}
}
@@ -35,84 +35,89 @@ private static int findCrLf(final ByteBuf buffer) {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
switch (state) {
case ARGUMENT_COUNT: {
int crlf = findCrLf(in);
if (crlf == -1) {
return;
}
byte b = in.getByte(in.readerIndex());
if (b == '*') {
in.skipBytes(1);
long l = Codec.readLong(in);
if (l > Integer.MAX_VALUE) {
throw new IllegalArgumentException();
}
int numArgs = (int) l;
if (numArgs < 0) {
throw new RedisException("Invalid size: " + numArgs);
while (true) {
switch (state) {
case ARGUMENT_COUNT: {
int crlf = findCrLf(in);
if (crlf == -1) {
return;
}
readCrLf(in);
byte b = in.getByte(in.readerIndex());
if (b == '*') {
in.skipBytes(1);
long l = Codec.readLongWithCrlf(in);
if (l > Integer.MAX_VALUE) {
throw new IllegalArgumentException();
}
int numArgs = (int) l;
if (numArgs < 0) {
throw new RedisException("Invalid size: " + numArgs);
}
this.arguments = new byte[numArgs][];
this.argumentsPos = 0;
state = State.ARGUMENT_LENGTH;
} else {
// Inline command
byte[] command = in.readBytes(crlf - in.readerIndex()).array();
readCrLf(in);
out.add(new RedisRequest(command, true));
assert state == State.ARGUMENT_COUNT;
}
break;
}
this.arguments = new byte[numArgs][];
this.argumentsPos = 0;
state = State.ARGUMENT_LENGTH;
} else {
// Inline command
byte[] command = in.readBytes(crlf - in.readerIndex()).array();
readCrLf(in);
case ARGUMENT_LENGTH: {
int crlf = findCrLf(in);
if (crlf == -1) {
return;
}
out.add(new RedisRequest(command, true));
byte b = in.readByte();
if (b == '$') {
long l = Codec.readLongWithCrlf(in);
if (l > Integer.MAX_VALUE) {
throw new IllegalArgumentException();
}
int length = (int) l;
if (length < 0) {
throw new RedisException("Invalid length: " + length);
}
this.argumentLength = length;
state = State.ARGUMENT_DATA;
} else {
throw new IOException("Expected $ character");
}
break;
}
break;
}
case ARGUMENT_LENGTH: {
int crlf = findCrLf(in);
if (crlf == -1) {
return;
}
case ARGUMENT_DATA: {
if (in.readableBytes() < (argumentLength + 2)) {
return;
}
byte b = in.readByte();
if (b == '$') {
long l = Codec.readLong(in);
ByteBuf buf = in.readBytes(argumentLength);
assert buf.arrayOffset() == 0;
assert buf.readableBytes() == buf.array().length;
byte[] data = buf.array();
readCrLf(in);
if (l > Integer.MAX_VALUE) {
throw new IllegalArgumentException();
}
int length = (int) l;
if (length < 0) {
throw new RedisException("Invalid length: " + length);
arguments[argumentsPos] = data;
argumentsPos++;
if (argumentsPos >= arguments.length) {
out.add(new RedisRequest(arguments));
arguments = null;
state = State.ARGUMENT_COUNT;
} else {
state = State.ARGUMENT_LENGTH;
}
this.argumentLength = length;
state = State.ARGUMENT_DATA;
} else {
throw new IOException("Expected $ character");
}
break;
}
case ARGUMENT_DATA: {
if (in.readableBytes() < (argumentLength + 2)) {
return;
break;
}
byte[] data = in.readBytes(argumentLength).array();
readCrLf(in);
arguments[argumentsPos] = data;
argumentsPos++;
if (argumentsPos >= arguments.length) {
out.add(new RedisRequest(arguments));
arguments = null;
state = State.ARGUMENT_COUNT;
}
break;
}
}
}
@@ -6,6 +6,19 @@
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudata.keyvalue.redis.commands.EchoCommand;
import com.cloudata.keyvalue.redis.commands.GetCommand;
import com.cloudata.keyvalue.redis.commands.PingCommand;
import com.cloudata.keyvalue.redis.commands.QuitCommand;
import com.cloudata.keyvalue.redis.commands.RedisCommand;
import com.cloudata.keyvalue.redis.commands.SetCommand;
import com.cloudata.keyvalue.redis.response.ErrorRedisReponse;
import com.cloudata.keyvalue.redis.response.InlineRedisResponse;
import com.cloudata.keyvalue.redis.response.RedisResponse;
import com.cloudata.keyvalue.redis.response.StatusRedisResponse;
import com.google.common.base.Charsets;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
@@ -15,11 +28,17 @@
*/
@ChannelHandler.Sharable
public class RedisRequestHandler extends SimpleChannelInboundHandler<RedisRequest> {
private static final Logger log = LoggerFactory.getLogger(RedisRequestHandler.class);
static final Map<ByteString, RedisCommand> methods = Maps.newHashMap();
static {
addMethod("echo", new EchoCommand());
addMethod("ping", new PingCommand());
addMethod("quit", new QuitCommand());
addMethod("set", new SetCommand());
addMethod("get", new GetCommand());
}
static void addMethod(String name, RedisCommand action) {
@@ -75,14 +94,22 @@ protected void channelRead0(ChannelHandlerContext ctx, RedisRequest msg) throws
}
}
log.debug("Executing command: {}", msg);
RedisCommand command = methods.get(ByteString.copyFrom(name));
RedisResponse reply;
if (command == null) {
reply = new ErrorRedisReponse("unknown command '" + new String(name, Charsets.US_ASCII) + "'");
} else {
reply = command.execute(server, msg);
try {
reply = command.execute(server, msg);
} catch (Throwable t) {
log.warn("Error executing command", t);
reply = ErrorRedisReponse.INTERNAL_ERROR;
}
}
if (reply == StatusRedisResponse.QUIT) {
// TODO: Pipelined responses? Do we need to flush?
ctx.close();
} else {
if (msg.isInline()) {
Oops, something went wrong.

0 comments on commit b1aa313

Please sign in to comment.