Skip to content

Commit

Permalink
Basic Kestrel interpreter
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick Kallen committed Jan 20, 2011
1 parent 1048abb commit bf9d1f6
Show file tree
Hide file tree
Showing 10 changed files with 172 additions and 6 deletions.
Expand Up @@ -5,7 +5,8 @@ import com.twitter.util.{Time, Duration, TimerTask}
import org.jboss.netty.util.{HashedWheelTimer, Timeout}

object Timer {
lazy val default = new Timer(new HashedWheelTimer(10, TimeUnit.MILLISECONDS))
implicit lazy val default =
new Timer(new HashedWheelTimer(10, TimeUnit.MILLISECONDS))
}

class Timer(underlying: org.jboss.netty.util.Timer) extends com.twitter.util.Timer {
Expand Down
@@ -0,0 +1,50 @@
package com.twitter.finagle.kestrel

import com.twitter.finagle.kestrel.protocol._
import org.jboss.netty.buffer.ChannelBuffer
import com.twitter.conversions.time._
import java.util.concurrent.{TimeUnit, BlockingQueue}
import com.twitter.util.{Future, MapMaker}

class Interpreter(Queue: () => BlockingQueue[ChannelBuffer]) {
private[this] val queues = MapMaker[ChannelBuffer, BlockingQueue[ChannelBuffer]] { config =>
config.compute { key =>
Queue()
}
}

def apply(command: Command): Response = {
command match {
case Get(queueName, options) =>
val timeoutOption = options.find(_.isInstanceOf[Timeout]).asInstanceOf[Option[Timeout]]
val wait = timeoutOption.map(_.duration).getOrElse(0.seconds)
val item = queues(queueName).poll(wait.inMilliseconds, TimeUnit.MILLISECONDS)
if (item eq null)
Values(Seq.empty)
else
Values(Seq(Value(queueName, item)))
case Set(queueName, flags, expiry, value) =>
queues(queueName).add(value)
Stored
case Delete(queueName) =>
queues.remove(queueName)
Deleted
case Flush(queueName) =>
queues.remove(queueName)
Deleted
case FlushAll() =>
queues.clear()
Deleted
case Version() =>
NotFound
case ShutDown() =>
NotFound
case DumpConfig() =>
NotFound
case Stats() =>
NotFound
case DumpStats() =>
NotFound
}
}
}
@@ -0,0 +1,28 @@
package com.twitter.finagle.kestrel.protocol

import org.jboss.netty.buffer.ChannelBuffer
import com.twitter.util.Duration

sealed abstract class Command

case class Get(queueName: ChannelBuffer, options: collection.Set[GetOption]) extends Command
case class Set(queueName: ChannelBuffer, flags: Int, expiry: Duration, value: ChannelBuffer) extends Command

case class Delete(queueName: ChannelBuffer) extends Command
case class Flush(queueName: ChannelBuffer) extends Command
case class FlushAll() extends Command

case class Version() extends Command
case class ShutDown() extends Command
case class Reload() extends Command
case class DumpConfig() extends Command
case class Stats() extends Command
case class DumpStats() extends Command

sealed abstract class GetOption

case class Timeout(duration: Duration) extends GetOption
case class Open() extends GetOption
case class Close() extends GetOption
case class Abort() extends GetOption
case class Peek() extends GetOption
@@ -0,0 +1,14 @@
package com.twitter.finagle.kestrel.protocol

import org.jboss.netty.buffer.ChannelBuffer

// fixme remove case objects
sealed abstract class Response
case object NotFound extends Response
case object Stored extends Response
case object NotStored extends Response
case object Deleted extends Response

case class Values(values: Seq[Value]) extends Response

case class Value(key: ChannelBuffer, value: ChannelBuffer)
@@ -0,0 +1,60 @@
package com.twitter.finagle.kestrel

import org.specs.Specification
import com.twitter.finagle.memcached.util.ChannelBufferUtils._
import com.twitter.finagle.kestrel.protocol._
import com.twitter.conversions.time._
import org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer
import org.jboss.netty.buffer.ChannelBuffer
import java.util.concurrent.LinkedBlockingQueue

object InterpreterSpec extends Specification {
"Interpreter" should {
val interpreter = new Interpreter(() => new LinkedBlockingQueue[ChannelBuffer])

"set & get" in {
interpreter(Set("name", 0, 0.seconds, "rawr"))
interpreter(Get("name", collection.Set.empty)) mustEqual
Values(Seq(Value("name", "rawr")))
}

"delete" in {
interpreter(Set("name", 0, 0.seconds, "rawr"))
interpreter(Delete("name"))
interpreter(Get("name", collection.Set.empty)) mustEqual Values(Seq.empty)
}

"flush" in {
interpreter(Set("name", 0, 0.seconds, "rawr"))
interpreter(Flush("name"))
interpreter(Get("name", collection.Set.empty)) mustEqual Values(Seq.empty)
}

"flushAll" in {
interpreter(Set("name", 0, 0.seconds, "rawr"))
interpreter(FlushAll())
interpreter(Get("name", collection.Set.empty)) //mustEqual Values(Seq.empty)
interpreter(Get("name", collection.Set.empty)) mustEqual Values(Seq.empty)
}

"version" in {

}

"shutDown" in {

}

"dumpConfig" in {

}

"stats" in {

}

"dumpStats" in {

}
}
}
@@ -1,14 +1,13 @@
package com.twitter.finagle.memcached

import com.twitter.finagle.memcached.protocol._
import scala.collection.mutable
import com.twitter.finagle.memcached.protocol.text.Parser
import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer
import org.jboss.netty.util.CharsetUtil
import text.Parser
import com.twitter.finagle.memcached.util.ChannelBufferUtils._
import util.AtomicMap
import com.twitter.util.{Future, SynchronizedLruMap}
import com.twitter.util.Future
import com.twitter.finagle.Service

/**
Expand Down
Expand Up @@ -4,6 +4,7 @@ import org.jboss.netty.buffer.ChannelBuffer

sealed abstract class Command

// FIXME make expiry a duration
abstract class StorageCommand(key: ChannelBuffer, flags: Int, expiry: Int, value: ChannelBuffer) extends Command
abstract class ArithmeticCommand(key: ChannelBuffer, delta: Int) extends Command
abstract class RetrievalCommand(keys: Seq[ChannelBuffer]) extends Command
Expand Down
Expand Up @@ -2,13 +2,15 @@ package com.twitter.finagle.memcached.protocol

import org.jboss.netty.buffer.ChannelBuffer

// FIXME remove case objects

sealed abstract class Response
case object NotFound extends Response
case object Stored extends Response
case object NotStored extends Response
case object Deleted extends Response

case class Values(values: Seq[Value]) extends Response
case class Number(value: Int) extends Response
case class Number(value: Int) extends Response

case class Value(key: ChannelBuffer, value: ChannelBuffer)
@@ -1,7 +1,6 @@
package com.twitter.finagle.memcached.util

import scala.collection.mutable
import mutable.ArrayBuffer

/**
* Improve concurrency with fine-grained locking. A hash of synchronized hash
Expand Down
12 changes: 12 additions & 0 deletions project/build/Project.scala
Expand Up @@ -39,6 +39,14 @@ class Project(info: ProjectInfo) extends StandardParentProject(info)
"finagle-memcached", "finagle-memcached",
new MemcachedProject(_), coreProject)

/**
* finagle-kestrel contains the kestrel codec and Java and Scala
* friendly clients.
*/
val kestrelProject = project(
"finagle-kestrel", "finagle-kestrel",
new KestrelProject(_), coreProject, memcachedProject)

/**
* finagle-stress has stress/integration test suites & tools for
* development.
Expand Down Expand Up @@ -80,6 +88,10 @@ class Project(info: ProjectInfo) extends StandardParentProject(info)
val junit = "junit" % "junit" % "3.8.2" % "test"
}

class KestrelProject(info: ProjectInfo) extends StandardProject(info)
with SubversionPublisher with AdhocInlines
{ }

class OstrichProject(info: ProjectInfo) extends StandardProject(info)
with SubversionPublisher with AdhocInlines
{
Expand Down

0 comments on commit bf9d1f6

Please sign in to comment.