diff --git a/finagle-core/src/main/scala/com/twitter/finagle/util/Timer.scala b/finagle-core/src/main/scala/com/twitter/finagle/util/Timer.scala index 43ab164669..9ed16d8424 100644 --- a/finagle-core/src/main/scala/com/twitter/finagle/util/Timer.scala +++ b/finagle-core/src/main/scala/com/twitter/finagle/util/Timer.scala @@ -5,7 +5,8 @@ import com.twitter.util.{Time, Duration, TimerTask} import org.jboss.netty.util.{HashedWheelTimer, Timeout} object Timer { - lazy val default = new Timer(new HashedWheelTimer(10, TimeUnit.MILLISECONDS)) + implicit lazy val default = + new Timer(new HashedWheelTimer(10, TimeUnit.MILLISECONDS)) } class Timer(underlying: org.jboss.netty.util.Timer) extends com.twitter.util.Timer { diff --git a/finagle-kestrel/src/main/scala/com/twitter/finagle/kestrel/Interpreter.scala b/finagle-kestrel/src/main/scala/com/twitter/finagle/kestrel/Interpreter.scala new file mode 100644 index 0000000000..1918d432ae --- /dev/null +++ b/finagle-kestrel/src/main/scala/com/twitter/finagle/kestrel/Interpreter.scala @@ -0,0 +1,50 @@ +package com.twitter.finagle.kestrel + +import com.twitter.finagle.kestrel.protocol._ +import org.jboss.netty.buffer.ChannelBuffer +import com.twitter.conversions.time._ +import java.util.concurrent.{TimeUnit, BlockingQueue} +import com.twitter.util.{Future, MapMaker} + +class Interpreter(Queue: () => BlockingQueue[ChannelBuffer]) { + private[this] val queues = MapMaker[ChannelBuffer, BlockingQueue[ChannelBuffer]] { config => + config.compute { key => + Queue() + } + } + + def apply(command: Command): Response = { + command match { + case Get(queueName, options) => + val timeoutOption = options.find(_.isInstanceOf[Timeout]).asInstanceOf[Option[Timeout]] + val wait = timeoutOption.map(_.duration).getOrElse(0.seconds) + val item = queues(queueName).poll(wait.inMilliseconds, TimeUnit.MILLISECONDS) + if (item eq null) + Values(Seq.empty) + else + Values(Seq(Value(queueName, item))) + case Set(queueName, flags, expiry, value) => + queues(queueName).add(value) + Stored + case Delete(queueName) => + queues.remove(queueName) + Deleted + case Flush(queueName) => + queues.remove(queueName) + Deleted + case FlushAll() => + queues.clear() + Deleted + case Version() => + NotFound + case ShutDown() => + NotFound + case DumpConfig() => + NotFound + case Stats() => + NotFound + case DumpStats() => + NotFound + } + } +} \ No newline at end of file diff --git a/finagle-kestrel/src/main/scala/com/twitter/finagle/kestrel/protocol/Command.scala b/finagle-kestrel/src/main/scala/com/twitter/finagle/kestrel/protocol/Command.scala new file mode 100644 index 0000000000..5f6862c599 --- /dev/null +++ b/finagle-kestrel/src/main/scala/com/twitter/finagle/kestrel/protocol/Command.scala @@ -0,0 +1,28 @@ +package com.twitter.finagle.kestrel.protocol + +import org.jboss.netty.buffer.ChannelBuffer +import com.twitter.util.Duration + +sealed abstract class Command + +case class Get(queueName: ChannelBuffer, options: collection.Set[GetOption]) extends Command +case class Set(queueName: ChannelBuffer, flags: Int, expiry: Duration, value: ChannelBuffer) extends Command + +case class Delete(queueName: ChannelBuffer) extends Command +case class Flush(queueName: ChannelBuffer) extends Command +case class FlushAll() extends Command + +case class Version() extends Command +case class ShutDown() extends Command +case class Reload() extends Command +case class DumpConfig() extends Command +case class Stats() extends Command +case class DumpStats() extends Command + +sealed abstract class GetOption + +case class Timeout(duration: Duration) extends GetOption +case class Open() extends GetOption +case class Close() extends GetOption +case class Abort() extends GetOption +case class Peek() extends GetOption \ No newline at end of file diff --git a/finagle-kestrel/src/main/scala/com/twitter/finagle/kestrel/protocol/Response.scala b/finagle-kestrel/src/main/scala/com/twitter/finagle/kestrel/protocol/Response.scala new file mode 100644 index 0000000000..dcb5c79ac8 --- /dev/null +++ b/finagle-kestrel/src/main/scala/com/twitter/finagle/kestrel/protocol/Response.scala @@ -0,0 +1,14 @@ +package com.twitter.finagle.kestrel.protocol + +import org.jboss.netty.buffer.ChannelBuffer + +// fixme remove case objects +sealed abstract class Response +case object NotFound extends Response +case object Stored extends Response +case object NotStored extends Response +case object Deleted extends Response + +case class Values(values: Seq[Value]) extends Response + +case class Value(key: ChannelBuffer, value: ChannelBuffer) \ No newline at end of file diff --git a/finagle-kestrel/src/test/scala/com/twitter/finagle/kestrel/InterpreterSpec.scala b/finagle-kestrel/src/test/scala/com/twitter/finagle/kestrel/InterpreterSpec.scala new file mode 100644 index 0000000000..d177f44806 --- /dev/null +++ b/finagle-kestrel/src/test/scala/com/twitter/finagle/kestrel/InterpreterSpec.scala @@ -0,0 +1,60 @@ +package com.twitter.finagle.kestrel + +import org.specs.Specification +import com.twitter.finagle.memcached.util.ChannelBufferUtils._ +import com.twitter.finagle.kestrel.protocol._ +import com.twitter.conversions.time._ +import org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer +import org.jboss.netty.buffer.ChannelBuffer +import java.util.concurrent.LinkedBlockingQueue + +object InterpreterSpec extends Specification { + "Interpreter" should { + val interpreter = new Interpreter(() => new LinkedBlockingQueue[ChannelBuffer]) + + "set & get" in { + interpreter(Set("name", 0, 0.seconds, "rawr")) + interpreter(Get("name", collection.Set.empty)) mustEqual + Values(Seq(Value("name", "rawr"))) + } + + "delete" in { + interpreter(Set("name", 0, 0.seconds, "rawr")) + interpreter(Delete("name")) + interpreter(Get("name", collection.Set.empty)) mustEqual Values(Seq.empty) + } + + "flush" in { + interpreter(Set("name", 0, 0.seconds, "rawr")) + interpreter(Flush("name")) + interpreter(Get("name", collection.Set.empty)) mustEqual Values(Seq.empty) + } + + "flushAll" in { + interpreter(Set("name", 0, 0.seconds, "rawr")) + interpreter(FlushAll()) + interpreter(Get("name", collection.Set.empty)) //mustEqual Values(Seq.empty) + interpreter(Get("name", collection.Set.empty)) mustEqual Values(Seq.empty) + } + + "version" in { + + } + + "shutDown" in { + + } + + "dumpConfig" in { + + } + + "stats" in { + + } + + "dumpStats" in { + + } + } +} \ No newline at end of file diff --git a/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/Interpreter.scala b/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/Interpreter.scala index cab92d1982..bacb63c19f 100644 --- a/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/Interpreter.scala +++ b/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/Interpreter.scala @@ -1,14 +1,13 @@ package com.twitter.finagle.memcached import com.twitter.finagle.memcached.protocol._ -import scala.collection.mutable +import com.twitter.finagle.memcached.protocol.text.Parser import org.jboss.netty.buffer.ChannelBuffer import org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer import org.jboss.netty.util.CharsetUtil -import text.Parser import com.twitter.finagle.memcached.util.ChannelBufferUtils._ import util.AtomicMap -import com.twitter.util.{Future, SynchronizedLruMap} +import com.twitter.util.Future import com.twitter.finagle.Service /** diff --git a/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/protocol/Command.scala b/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/protocol/Command.scala index 3c9a7c2c59..d297c68e03 100644 --- a/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/protocol/Command.scala +++ b/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/protocol/Command.scala @@ -4,6 +4,7 @@ import org.jboss.netty.buffer.ChannelBuffer sealed abstract class Command +// FIXME make expiry a duration abstract class StorageCommand(key: ChannelBuffer, flags: Int, expiry: Int, value: ChannelBuffer) extends Command abstract class ArithmeticCommand(key: ChannelBuffer, delta: Int) extends Command abstract class RetrievalCommand(keys: Seq[ChannelBuffer]) extends Command diff --git a/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/protocol/Response.scala b/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/protocol/Response.scala index 8051c71ca0..b8e713a374 100644 --- a/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/protocol/Response.scala +++ b/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/protocol/Response.scala @@ -2,6 +2,8 @@ package com.twitter.finagle.memcached.protocol import org.jboss.netty.buffer.ChannelBuffer +// FIXME remove case objects + sealed abstract class Response case object NotFound extends Response case object Stored extends Response @@ -9,6 +11,6 @@ case object NotStored extends Response case object Deleted extends Response case class Values(values: Seq[Value]) extends Response -case class Number(value: Int) extends Response +case class Number(value: Int) extends Response case class Value(key: ChannelBuffer, value: ChannelBuffer) \ No newline at end of file diff --git a/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/util/AtomicMap.scala b/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/util/AtomicMap.scala index 5644755715..3deeaa2964 100644 --- a/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/util/AtomicMap.scala +++ b/finagle-memcached/src/main/scala/com/twitter/finagle/memcached/util/AtomicMap.scala @@ -1,7 +1,6 @@ package com.twitter.finagle.memcached.util import scala.collection.mutable -import mutable.ArrayBuffer /** * Improve concurrency with fine-grained locking. A hash of synchronized hash diff --git a/project/build/Project.scala b/project/build/Project.scala index 60aa7dbfab..c70c277847 100644 --- a/project/build/Project.scala +++ b/project/build/Project.scala @@ -39,6 +39,14 @@ class Project(info: ProjectInfo) extends StandardParentProject(info) "finagle-memcached", "finagle-memcached", new MemcachedProject(_), coreProject) + /** + * finagle-kestrel contains the kestrel codec and Java and Scala + * friendly clients. + */ + val kestrelProject = project( + "finagle-kestrel", "finagle-kestrel", + new KestrelProject(_), coreProject, memcachedProject) + /** * finagle-stress has stress/integration test suites & tools for * development. @@ -80,6 +88,10 @@ class Project(info: ProjectInfo) extends StandardParentProject(info) val junit = "junit" % "junit" % "3.8.2" % "test" } + class KestrelProject(info: ProjectInfo) extends StandardProject(info) + with SubversionPublisher with AdhocInlines + { } + class OstrichProject(info: ProjectInfo) extends StandardProject(info) with SubversionPublisher with AdhocInlines {