Permalink
Browse files

Support keyspaces with redis select command

  • Loading branch information...
justinsb committed Dec 16, 2013
1 parent 96e26b1 commit 0b727bcc3c5674512489d2a99a6f4e938becd970
@@ -21,6 +21,7 @@
import com.cloudata.keyvalue.redis.commands.PingCommand;
import com.cloudata.keyvalue.redis.commands.QuitCommand;
import com.cloudata.keyvalue.redis.commands.RedisCommand;
import com.cloudata.keyvalue.redis.commands.SelectCommand;
import com.cloudata.keyvalue.redis.commands.SetCommand;
import com.cloudata.keyvalue.redis.response.ErrorRedisReponse;
import com.cloudata.keyvalue.redis.response.InlineRedisResponse;
@@ -44,6 +45,8 @@
addMethod("ping", new PingCommand());
addMethod("quit", new QuitCommand());
addMethod("select", new SelectCommand());
addMethod("get", new GetCommand());
addMethod("exists", new ExistsCommand());
@@ -28,8 +28,8 @@ public Object doAction(ByteString key, KeyOperation<?> operation) throws RedisEx
}
}
public ByteBuffer get(ByteBuffer key) {
return stateMachine.get(storeId, key);
public ByteBuffer get(ByteString key) {
return stateMachine.get(storeId, key.asReadOnlyByteBuffer());
}
}
@@ -4,14 +4,26 @@
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import java.io.IOException;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedOutputStream;
public class RedisSession {
private static final AttributeKey<RedisSession> KEY = AttributeKey.valueOf(RedisSession.class.getSimpleName());
private final RedisServer server;
private int keyspaceId;
private ByteString keyspaceIdPrefix;
public RedisSession(RedisServer server) {
this.server = server;
setKeyspaceId(0);
}
public static RedisSession get(RedisServer server, ChannelHandlerContext ctx) {
@@ -29,4 +41,35 @@ public static RedisSession get(RedisServer server, ChannelHandlerContext ctx) {
}
}
public void setKeyspaceId(int keyspaceId) {
Preconditions.checkArgument(keyspaceId >= 0);
this.keyspaceId = keyspaceId;
if (keyspaceId < 128) {
assert 1 == CodedOutputStream.computeInt32SizeNoTag(keyspaceId);
byte[] data = new byte[1];
data[0] = (byte) keyspaceId;
this.keyspaceIdPrefix = ByteString.copyFrom(data);
} else {
try {
byte[] data = new byte[CodedOutputStream.computeInt32SizeNoTag(keyspaceId)];
CodedOutputStream c = CodedOutputStream.newInstance(data);
c.writeInt32NoTag(keyspaceId);
c.flush();
this.keyspaceIdPrefix = ByteString.copyFrom(data);
} catch (IOException e) {
throw Throwables.propagate(e);
}
}
}
public int getKeyspaceId() {
return keyspaceId;
}
public ByteString mapToKey(ByteString key) {
return keyspaceIdPrefix.concat(key);
}
}
@@ -20,7 +20,7 @@ public RedisResponse execute(RedisServer server, RedisSession session, RedisRequ
return ErrorRedisReponse.NOT_IMPLEMENTED;
}
ByteString key = command.getByteString(1);
ByteString key = session.mapToKey(command.getByteString(1));
byte[] value = command.get(2);
AppendOperation operation = new AppendOperation(ByteBuffer.wrap(value));
@@ -16,7 +16,7 @@
public RedisResponse execute(RedisServer server, RedisSession session, RedisRequest command) throws RedisException {
long delta = command.getLong(2);
delta = -delta;
return execute(server, command, delta);
return execute(server, session, command, delta);
}
}
@@ -14,6 +14,6 @@
@Override
public RedisResponse execute(RedisServer server, RedisSession session, RedisRequest command) throws RedisException {
return execute(server, command, -1);
return execute(server, session, command, -1);
}
}
@@ -16,7 +16,7 @@ public RedisResponse execute(RedisServer server, RedisSession session, RedisRequ
// TODO: Put into one transaction
for (int i = 1; i < command.getArgc(); i++) {
ByteString key = command.getByteString(i);
ByteString key = session.mapToKey(command.getByteString(i));
DeleteOperation operation = new DeleteOperation();
Number deleted = (Number) server.doAction(key, operation);
@@ -11,15 +11,16 @@
import com.cloudata.keyvalue.redis.RedisSession;
import com.cloudata.keyvalue.redis.response.IntegerRedisResponse;
import com.cloudata.keyvalue.redis.response.RedisResponse;
import com.google.protobuf.ByteString;
public class ExistsCommand implements RedisCommand {
private static final Logger log = LoggerFactory.getLogger(ExistsCommand.class);
@Override
public RedisResponse execute(RedisServer server, RedisSession session, RedisRequest command) throws RedisException {
byte[] key = command.get(1);
ByteString key = session.mapToKey(command.getByteString(1));
ByteBuffer value = server.get(ByteBuffer.wrap(key));
ByteBuffer value = server.get(key);
if (value == null) {
return IntegerRedisResponse.ZERO;
@@ -11,15 +11,16 @@
import com.cloudata.keyvalue.redis.RedisSession;
import com.cloudata.keyvalue.redis.response.BulkRedisResponse;
import com.cloudata.keyvalue.redis.response.RedisResponse;
import com.google.protobuf.ByteString;
public class GetCommand implements RedisCommand {
private static final Logger log = LoggerFactory.getLogger(GetCommand.class);
@Override
public RedisResponse execute(RedisServer server, RedisSession session, RedisRequest command) throws RedisException {
byte[] key = command.get(1);
ByteString key = session.mapToKey(command.getByteString(1));
ByteBuffer value = server.get(ByteBuffer.wrap(key));
ByteBuffer value = server.get(key);
if (value == null) {
return BulkRedisResponse.NIL_REPLY;
@@ -22,12 +22,12 @@
public RedisResponse execute(RedisServer server, RedisSession session, RedisRequest command) throws RedisException {
long delta = command.getLong(2);
return execute(server, command, delta);
return execute(server, session, command, delta);
}
protected IntegerRedisResponse execute(RedisServer server, RedisRequest command, long deltaLong)
throws RedisException {
ByteString key = command.getByteString(1);
protected IntegerRedisResponse execute(RedisServer server, RedisSession session, RedisRequest command,
long deltaLong) throws RedisException {
ByteString key = session.mapToKey(command.getByteString(1));
IncrementOperation operation = new IncrementOperation(deltaLong);
ByteBuffer value = (ByteBuffer) server.doAction(key, operation);
@@ -14,6 +14,6 @@
@Override
public RedisResponse execute(RedisServer server, RedisSession session, RedisRequest command) throws RedisException {
return execute(server, command, 1);
return execute(server, session, command, 1);
}
}
@@ -0,0 +1,29 @@
package com.cloudata.keyvalue.redis.commands;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.cloudata.keyvalue.redis.RedisException;
import com.cloudata.keyvalue.redis.RedisRequest;
import com.cloudata.keyvalue.redis.RedisServer;
import com.cloudata.keyvalue.redis.RedisSession;
import com.cloudata.keyvalue.redis.response.ErrorRedisReponse;
import com.cloudata.keyvalue.redis.response.RedisResponse;
import com.cloudata.keyvalue.redis.response.StatusRedisResponse;
import com.google.common.primitives.Ints;
public class SelectCommand implements RedisCommand {
private static final Logger log = LoggerFactory.getLogger(SelectCommand.class);
@Override
public RedisResponse execute(RedisServer server, RedisSession session, RedisRequest command) throws RedisException {
long keyspaceId = command.getLong(1);
if (keyspaceId < 0 || keyspaceId >= Integer.MAX_VALUE) {
return new ErrorRedisReponse("invalid DB index");
}
session.setKeyspaceId(Ints.checkedCast(keyspaceId));
return StatusRedisResponse.OK;
}
}
@@ -20,7 +20,7 @@ public RedisResponse execute(RedisServer server, RedisSession session, RedisRequ
return ErrorRedisReponse.NOT_IMPLEMENTED;
}
ByteString key = command.getByteString(1);
ByteString key = session.mapToKey(command.getByteString(1));
byte[] value = command.get(2);
SetOperation operation = new SetOperation(ByteBuffer.wrap(value));

0 comments on commit 0b727bc

Please sign in to comment.