Skip to content

Commit

Permalink
Basic redis endpoint (only supports ping)
Browse files Browse the repository at this point in the history
Heavily based on the (Apache) redis-protocol project:
https://github.com/spullara/redis-protocol
  • Loading branch information
justinsb committed Dec 12, 2013
1 parent 4366420 commit 653c4e3
Show file tree
Hide file tree
Showing 18 changed files with 714 additions and 4 deletions.
@@ -1,12 +1,15 @@
package com.cloudata.keyvalue;

import java.io.File;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.List;

import org.robotninjas.barge.ClusterConfig;
import org.robotninjas.barge.RaftService;
import org.robotninjas.barge.Replica;

import com.cloudata.keyvalue.redis.RedisEndpoint;
import com.cloudata.keyvalue.web.WebModule;
import com.google.common.collect.Lists;
import com.google.inject.Guice;
Expand All @@ -26,13 +29,16 @@ public class KeyValueServer {
private final List<Replica> peers;
private RaftService raft;
private SelectorThread selector;
private final SocketAddress redisSocketAddress;
private RedisEndpoint redisEndpoint;

public KeyValueServer(File baseDir, Replica local, List<Replica> peers, int httpPort) {
super();
public KeyValueServer(File baseDir, Replica local, List<Replica> peers, int httpPort,
SocketAddress redisSocketAddress) {
this.baseDir = baseDir;
this.local = local;
this.peers = peers;
this.httpPort = httpPort;
this.redisSocketAddress = redisSocketAddress;
}

public synchronized void start() throws Exception {
Expand Down Expand Up @@ -63,6 +69,11 @@ public synchronized void start() throws Exception {
IoCComponentProviderFactory ioc = new GuiceComponentProviderFactory(rc, injector);

this.selector = GrizzlyServerFactory.create(baseUri, rc, ioc);

if (redisSocketAddress != null) {
this.redisEndpoint = new RedisEndpoint(redisSocketAddress);
this.redisEndpoint.start();
}
}

public String getHttpUrl() {
Expand All @@ -79,7 +90,10 @@ public static void main(String... args) throws Exception {

File baseDir = new File(args[0]);
int httpPort = (9990 + port);
final KeyValueServer server = new KeyValueServer(baseDir, local, members, httpPort);
int redisPort = 6379 + port - 1;

SocketAddress redisSocketAddress = new InetSocketAddress(redisPort);
final KeyValueServer server = new KeyValueServer(baseDir, local, members, httpPort, redisSocketAddress);
server.start();

Runtime.getRuntime().addShutdownHook(new Thread() {
Expand Down
@@ -0,0 +1,83 @@
package com.cloudata.keyvalue.redis;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import java.nio.charset.Charset;

import com.google.common.base.Charsets;

public class BulkRedisResponse extends RedisResponse {
public static final BulkRedisResponse NIL_REPLY = new BulkRedisResponse();

public static final char MARKER = '$';
private final ByteBuf data;
private final int length;

private BulkRedisResponse() {
this.data = null;
this.length = -1;
}

public BulkRedisResponse(byte[] bytes) {
this.data = Unpooled.wrappedBuffer(bytes);
this.length = bytes.length;
}

public BulkRedisResponse(ByteBuf bytes) {
this.data = bytes;
this.length = bytes.capacity();
}

// @Override
// public ByteBuf data() {
// return data;
// }

public String asAsciiString() {
if (data == null) {
return null;
}
return data.toString(Charsets.US_ASCII);
}

public String asUTF8String() {
if (data == null) {
return null;
}
return data.toString(Charsets.UTF_8);
}

public String asString(Charset charset) {
if (data == null) {
return null;
}
return data.toString(charset);
}

@Override
public void encode(ByteBuf os) {
os.writeByte(MARKER);
Codec.writeLong(os, length);
if (length > 0) {
os.writeBytes(data);

// Special case: null response does not have a CRLF
os.writeBytes(CRLF);
}
}

@Override
public void encodeInline(ByteBuf os) {
if (length > 0) {
os.writeByte('+');
os.writeBytes(data);
}
os.writeBytes(CRLF);
}

@Override
public String toString() {
return asUTF8String();
}
}
@@ -0,0 +1,69 @@
package com.cloudata.keyvalue.redis;

import io.netty.buffer.ByteBuf;

import java.io.IOException;

public class Codec {
public static final char CR = '\r';
public static final char LF = '\n';

static class Cache {
private static final int cacheLow = -255;
private static final int cacheHigh = 255;
private static byte[][] cache;

static {
cache = new byte[1 + cacheHigh - cacheLow][];
for (int i = 0; i < cache.length; i++) {
long v = cacheLow + i;
cache[i] = Long.toString(v).getBytes();
}
}
}

public static void writeLong(ByteBuf out, long value) {
if (value >= Cache.cacheLow && value <= Cache.cacheHigh) {
out.writeBytes(Cache.cache[(int) (value - Cache.cacheLow)]);
} else {
// TODO: Can we do much better (avoid the array)? I don't think we can...
StringBuilder sb = new StringBuilder(20);
sb.append(value);
for (int i = 0; i < sb.length(); i++) {
char c = sb.charAt(i);
assert c == '-' || (c >= '0' && c <= '9');
out.writeByte(c);
}
}
}

public static long readLong(ByteBuf buf) throws IOException {
long v = 0;
boolean negative = false;
int read = buf.readByte();
if (read == '-') {
negative = true;
read = buf.readByte();
}
do {
if (read == CR) {
if (buf.readByte() == LF) {
break;
}
}
int value = read - '0';
if (value >= 0 && value < 10) {
v *= 10;
v += value;
} else {
throw new IOException("Invalid character in integer");
}
read = buf.readByte();
} while (true);

if (negative) {
v = -v;
}
return v;
}
}
@@ -0,0 +1,44 @@
package com.cloudata.keyvalue.redis;

import io.netty.buffer.ByteBuf;

import com.google.common.base.Charsets;

public class ErrorRedisReponse extends RedisResponse {
public static final byte MARKER = '-';

public static final ErrorRedisReponse NOT_IMPLEMENTED = new ErrorRedisReponse("Not yet implemented");

private final String message;
private final byte[] messageBytes;

public ErrorRedisReponse(String message) {
this.message = message;
this.messageBytes = message.getBytes(Charsets.UTF_8);
}

// @Override
// public String data() {
// return message;
// }

@Override
public void encode(ByteBuf os) {
os.writeByte(MARKER);
os.writeBytes(messageBytes);
os.writeBytes(CRLF);
}

@Override
public void encodeInline(ByteBuf os) {
os.writeByte('-');
os.writeBytes(messageBytes);
os.writeBytes(CRLF);
}

@Override
public String toString() {
return "ERROR:" + message;
}

}
@@ -0,0 +1,26 @@
package com.cloudata.keyvalue.redis;

import io.netty.buffer.ByteBuf;

public class InlineRedisResponse extends RedisResponse {
final RedisResponse inner;

public InlineRedisResponse(RedisResponse inner) {
this.inner = inner;
}

@Override
public void encode(ByteBuf out) {
if (inner == null) {
out.writeBytes(CRLF);
} else {
inner.encodeInline(out);
}
}

@Override
public void encodeInline(ByteBuf out) {
throw new IllegalStateException();
}

}
@@ -0,0 +1,54 @@
package com.cloudata.keyvalue.redis;

import io.netty.buffer.ByteBuf;

public class IntegerRedisResponse extends RedisResponse {
public static final char MARKER = ':';

private final long value;

private static final int cacheLow = -255;
private static final int cacheHigh = 255;
private static IntegerRedisResponse[] cache;

static {
cache = new IntegerRedisResponse[1 + cacheHigh - cacheLow];
for (int i = 0; i < cache.length; i++) {
cache[i + cacheLow] = new IntegerRedisResponse(i);
}
}

public static IntegerRedisResponse valueOf(long v) {
if (v >= cacheLow && v <= cacheHigh) {
return cache[(int) (v - cacheLow)];
} else {
return new IntegerRedisResponse(v);
}
}

public IntegerRedisResponse(long value) {
this.value = value;
}

// @Override
// public Long data() {
// return value;
// }

@Override
public void encode(ByteBuf os) {
os.writeByte(MARKER);
Codec.writeLong(os, value);
os.writeBytes(CRLF);
}

@Override
public String toString() {
return Long.toString(value);
}

@Override
public void encodeInline(ByteBuf out) {
encode(out);
}
}
@@ -0,0 +1,10 @@
package com.cloudata.keyvalue.redis;

public class PingCommand implements RedisCommand {

@Override
public RedisResponse execute(RedisServer server, RedisRequest command) throws RedisException {
return StatusRedisResponse.PONG;
}

}
@@ -0,0 +1,5 @@
package com.cloudata.keyvalue.redis;

interface RedisCommand {
RedisResponse execute(RedisServer server, RedisRequest command) throws RedisException;
}

0 comments on commit 653c4e3

Please sign in to comment.