Permalink
Browse files

[split] finagle redis client from Tumblr

it's currently unpublished, but fully functional.
  • Loading branch information...
mariusae committed Jan 6, 2012
1 parent ce6fd00 commit c30ba58acfc19b96807f72162dcdd913365e2de2
@@ -0,0 +1,4 @@
+package com.twitter.finagle.redis
+
+case class ServerError(message: String) extends Exception(message)
+case class ClientError(message: String) extends Exception(message)
@@ -0,0 +1,59 @@
+package com.twitter.finagle.redis
+
+import protocol.{Command, CommandCodec, Reply, ReplyCodec}
+
+import com.twitter.finagle.{Codec, CodecFactory, Service}
+import com.twitter.finagle.tracing.ClientRequestTracingFilter
+import com.twitter.naggati.{Codec => NaggatiCodec}
+import com.twitter.util.Future
+import org.jboss.netty.channel.{ChannelPipelineFactory, Channels}
+
+object Redis {
+ def apply() = new Redis
+ def get() = apply()
+}
+
+class Redis extends CodecFactory[Command, Reply] {
+ def server = Function.const {
+ new Codec[Command, Reply] {
+ def pipelineFactory = new ChannelPipelineFactory {
+ def getPipeline() = {
+ val pipeline = Channels.pipeline()
+ val commandCodec = new CommandCodec
+ val replyCodec = new ReplyCodec
+
+ pipeline.addLast("codec", new NaggatiCodec(commandCodec.decode, replyCodec.encode))
+
+ pipeline
+ }
+ }
+ }
+ }
+
+ def client = Function.const {
+ new Codec[Command, Reply] {
+
+ def pipelineFactory = new ChannelPipelineFactory {
+ def getPipeline() = {
+ val pipeline = Channels.pipeline()
+ val commandCodec = new CommandCodec
+ val replyCodec = new ReplyCodec
+
+ pipeline.addLast("codec", new NaggatiCodec(replyCodec.decode, commandCodec.encode))
+
+ pipeline
+ }
+ }
+
+ override def prepareService(underlying: Service[Command, Reply]) = {
+ Future.value((new RedisTracingFilter()) andThen underlying)
+ }
+
+ }
+ }
+}
+
+private class RedisTracingFilter extends ClientRequestTracingFilter[Command, Reply] {
+ val serviceName = "redis"
+ def methodName(req: Command): String = req.getClass().getSimpleName()
+}
@@ -0,0 +1,56 @@
+package com.twitter.finagle.redis
+package protocol
+
+import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
+import util.StringToChannelBuffer
+import scala.collection.immutable.WrappedString
+
+private[redis] object RedisCodec {
+ object NilValue extends WrappedString("nil") {
+ def getBytes(charset: String = "UTF-8") = Array[Byte]()
+ def getBytes = Array[Byte]()
+ }
+
+ val STATUS_REPLY = '+'
+ val ERROR_REPLY = '-'
+ val INTEGER_REPLY = ':'
+ val BULK_REPLY = '$'
+ val MBULK_REPLY = '*'
+
+ val ARG_COUNT_MARKER = '*'
+ val ARG_SIZE_MARKER = '$'
+
+ val TOKEN_DELIMITER = ' '
+ val EOL_DELIMITER = "\r\n"
+
+ val NIL_VALUE = NilValue
+ val NIL_VALUE_BA = NilValue.getBytes
+
+ def toUnifiedFormat(args: List[Array[Byte]], includeHeader: Boolean = true) = {
+ val buffer = ChannelBuffers.dynamicBuffer()
+ includeHeader match {
+ case true =>
+ val argHeader = "%c%d%s".format(ARG_COUNT_MARKER, args.length, EOL_DELIMITER)
+ buffer.writeBytes(argHeader.getBytes)
+ case false =>
+ }
+ args.foreach { arg =>
+ if (arg.length == 0) {
+ buffer.writeBytes("%c-1%s".format(ARG_SIZE_MARKER, EOL_DELIMITER).getBytes)
+ } else {
+ val sizeHeader = "%c%d%s".format(ARG_SIZE_MARKER, arg.length, EOL_DELIMITER)
+ buffer.writeBytes(sizeHeader.getBytes)
+ buffer.writeBytes(arg)
+ buffer.writeBytes(EOL_DELIMITER.getBytes)
+ }
+ }
+ buffer
+ }
+ def toInlineFormat(args: List[String]) = {
+ StringToChannelBuffer(args.mkString(TOKEN_DELIMITER.toString) + EOL_DELIMITER)
+ }
+}
+abstract class RedisMessage {
+ def toChannelBuffer: ChannelBuffer
+ def toByteArray: Array[Byte] = toChannelBuffer.array
+}
@@ -0,0 +1,176 @@
+package com.twitter.finagle.redis
+package protocol
+
+import util._
+
+object RequireClientProtocol extends ErrorConversion {
+ override def getException(msg: String) = new ClientError(msg)
+}
+
+abstract class Command extends RedisMessage
+
+object Commands {
+ // Key Commands
+ val DEL = "DEL"
+ val EXISTS = "EXISTS"
+ val EXPIRE = "EXPIRE"
+ val EXPIREAT = "EXPIREAT"
+ val KEYS = "KEYS"
+ val PERSIST = "PERSIST"
+ val RANDOMKEY = "RANDOMKEY"
+ val RENAME = "RENAME"
+ val RENAMENX = "RENAMENX"
+ val TTL = "TTL"
+ val TYPE = "TYPE"
+
+ // String Commands
+ val APPEND = "APPEND"
+ val DECR = "DECR"
+ val DECRBY = "DECRBY"
+ val GET = "GET"
+ val GETBIT = "GETBIT"
+ val GETRANGE = "GETRANGE"
+ val GETSET = "GETSET"
+ val INCR = "INCR"
+ val INCRBY = "INCRBY"
+ val MGET = "MGET"
+ val MSET = "MSET"
+ val MSETNX = "MSETNX"
+ val SET = "SET"
+ val SETBIT = "SETBIT"
+ val SETEX = "SETEX"
+ val SETNX = "SETNX"
+ val SETRANGE = "SETRANGE"
+ val STRLEN = "STRLEN"
+
+ // Sorted Sets
+ val ZADD = "ZADD"
+ val ZCARD = "ZCARD"
+ val ZCOUNT = "ZCOUNT"
+ val ZINCRBY = "ZINCRBY"
+ val ZINTERSTORE = "ZINTERSTORE"
+ val ZRANGE = "ZRANGE"
+ val ZRANGEBYSCORE = "ZRANGEBYSCORE"
+ val ZRANK = "ZRANK"
+ val ZREM = "ZREM"
+ val ZREMRANGEBYRANK = "ZREMRANGEBYRANK"
+ val ZREMRANGEBYSCORE = "ZREMRANGEBYSCORE"
+ val ZREVRANGE = "ZREVRANGE"
+ val ZREVRANGEBYSCORE = "ZREVRANGEBYSCORE"
+ val ZREVRANK = "ZREVRANK"
+ val ZSCORE = "ZSCORE"
+ val ZUNIONSTORE = "ZUNIONSTORE"
+
+ val commandMap: Map[String,Function1[List[Array[Byte]],Command]] = Map(
+ // key commands
+ DEL -> {args => Del(BytesToString.fromList(args))},
+ EXISTS -> {Exists(_)},
+ EXPIRE -> {Expire(_)},
+ EXPIREAT -> {ExpireAt(_)},
+ KEYS -> {Keys(_)},
+ PERSIST -> {Persist(_)},
+ RANDOMKEY -> {args => Randomkey()},
+ RENAME -> {Rename(_)},
+ RENAMENX -> {RenameNx(_)},
+ TTL -> {Ttl(_)},
+ TYPE -> {Type(_)},
+
+ // string commands
+ APPEND -> {Append(_)},
+ DECR -> {Decr(_)},
+ DECRBY -> {DecrBy(_)},
+ GET -> {Get(_)},
+ GETBIT -> {GetBit(_)},
+ GETRANGE -> {GetRange(_)},
+ GETSET -> {GetSet(_)},
+ INCR -> {Incr(_)},
+ INCRBY -> {IncrBy(_)},
+ MGET -> {args => MGet(BytesToString.fromList(args))},
+ MSET -> {MSet(_)},
+ MSETNX -> {MSetNx(_)},
+ SET -> {Set(_)},
+ SETBIT -> {SetBit(_)},
+ SETEX -> {SetEx(_)},
+ SETNX -> {SetNx(_)},
+ SETRANGE -> {SetRange(_)},
+ STRLEN -> {Strlen(_)},
+
+ // sorted sets
+ ZADD -> {ZAdd(_)},
+ ZCARD -> {ZCard(_)},
+ ZCOUNT -> {ZCount(_)},
+ ZINCRBY -> {ZIncrBy(_)},
+ ZINTERSTORE -> {ZInterStore(_)},
+ ZRANGE -> {ZRange(_)},
+ ZRANGEBYSCORE -> {ZRangeByScore(_)},
+ ZRANK -> {ZRank(_)},
+ ZREM -> {ZRem(_)},
+ ZREMRANGEBYRANK -> {ZRemRangeByRank(_)},
+ ZREMRANGEBYSCORE -> {ZRemRangeByScore(_)},
+ ZREVRANGE -> {ZRevRange(_)},
+ ZREVRANGEBYSCORE -> {ZRevRangeByScore(_)},
+ ZREVRANK -> {ZRevRank(_)},
+ ZSCORE -> {ZScore(_)},
+ ZUNIONSTORE -> {ZUnionStore(_)}
+ )
+
+ def doMatch(cmd: String, args: List[Array[Byte]]) = commandMap.get(cmd).map {
+ _(args)
+ }.getOrElse(throw ClientError("Unsupported command: " + cmd))
+
+ def trimList(list: List[Array[Byte]], count: Int, from: String = "") = {
+ RequireClientProtocol(list != null, "%s Empty list found".format(from))
+ RequireClientProtocol(
+ list.length == count,
+ "%s Expected %d elements, found %d".format(from, count, list.length))
+ val newList = list.take(count)
+ newList.foreach { item => RequireClientProtocol(item != null, "Found empty item in list") }
+ newList
+ }
+}
+
+class CommandCodec extends UnifiedProtocolCodec {
+ import com.twitter.naggati.{Emit, Encoder, NextStep}
+ import com.twitter.naggati.Stages._
+ import RedisCodec._
+ import com.twitter.logging.Logger
+
+ val log = Logger(getClass)
+
+ val decode = readBytes(1) { bytes =>
+ bytes(0) match {
+ case ARG_COUNT_MARKER =>
+ val doneFn = { lines => commandDecode(lines) }
+ RequireClientProtocol.safe {
+ readLine { line => decodeUnifiedFormat(NumberFormat.toLong(line), doneFn) }
+ }
+ case b: Byte =>
+ decodeInlineRequest(b.asInstanceOf[Char])
+ }
+ }
+
+ val encode = new Encoder[Command] {
+ def encode(obj: Command) = Some(obj.toChannelBuffer)
+ }
+
+ def decodeInlineRequest(c: Char) = readLine { line =>
+ val listOfArrays = (c + line).split(' ').toList.map { args => args.getBytes("UTF-8") }
+ val cmd = commandDecode(listOfArrays)
+ emit(cmd)
+ }
+
+ def commandDecode(lines: List[Array[Byte]]): Command = {
+ RequireClientProtocol(lines != null && lines.length > 0, "Invalid client command protocol")
+ val cmd = new String(lines.head)
+ val args = lines.tail
+ try {
+ Commands.doMatch(cmd, args)
+ } catch {
+ case e: ClientError => throw e
+ case t: Throwable =>
+ log.warning(t, "Unhandled exception %s(%s)".format(t.getClass.toString, t.getMessage))
+ throw new ClientError(t.getMessage)
+ }
+ }
+
+}
@@ -0,0 +1,51 @@
+package com.twitter.finagle.redis
+package protocol
+
+import util._
+import RedisCodec._
+
+import com.twitter.naggati.{Emit, Encoder, NextStep, ProtocolError}
+import com.twitter.naggati.Stages._
+
+trait UnifiedProtocolCodec {
+
+ type ByteArrays = List[Array[Byte]]
+
+ def decodeUnifiedFormat[T <: AnyRef](argCount: Long, doneFn: ByteArrays => T) =
+ argCount match {
+ case n if n < 0 => throw new ProtocolError("Invalid argument count specified")
+ case n => decodeRequestLines(n, Nil, { lines => doneFn(lines) } )
+ }
+
+ def decodeRequestLines[T <: AnyRef](
+ i: Long,
+ lines: ByteArrays,
+ doneFn: ByteArrays => T): NextStep =
+ {
+ if (i <= 0) {
+ emit(doneFn(lines.reverse))
+ } else {
+ readLine { line =>
+ val header = line(0)
+ header match {
+ case ARG_SIZE_MARKER =>
+ val size = NumberFormat.toInt(line.drop(1))
+ if (size < 1) {
+ decodeRequestLines(i - 1, lines.+:(RedisCodec.NIL_VALUE_BA), doneFn)
+ } else {
+ readBytes(size) { byteArray =>
+ readBytes(2) { eol =>
+ if (eol(0) != '\r' || eol(1) != '\n') {
+ throw new ProtocolError("Expected EOL after line data and didn't find it")
+ }
+ decodeRequestLines(i - 1, lines.+:(byteArray), doneFn)
+ }
+ }
+ }
+ case b: Char =>
+ throw new ProtocolError("Expected size marker $, got " + b)
+ } // header match
+ } // readLine
+ } // else
+ } // decodeRequestLines
+}
Oops, something went wrong.

0 comments on commit c30ba58

Please sign in to comment.