Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge branch 'master' of github.com:twitter/schmemcached

Conflicts:
	src/main/scala/com/twitter/schmemcached/protocol/text/Show.scala
  • Loading branch information...
commit 50e46a3ad00231ffa29ccc751d22133eed2242d4 2 parents 6a524f0 + 1a56b45
Nick Kallen authored
View
4 project/build.properties
@@ -1,8 +1,8 @@
#Project properties
-#Mon Dec 13 19:08:50 PST 2010
+#Fri Jan 07 13:38:50 PST 2011
project.organization=com.twitter
project.name=schmemcached
sbt.version=0.7.4
-project.version=1.0.2-SNAPSHOT
+project.version=1.0.6-SNAPSHOT
build.scala.versions=2.8.1
project.initialize=false
View
8 project/build/Project.scala
@@ -6,7 +6,7 @@ class Project(info: ProjectInfo)
with AdhocInlines
with SubversionPublisher
{
- override def subversionRepository = Some("http://svn.local.twitter.com/maven-public")
+ override def subversionRepository = Some("http://svn.local.twitter.com/maven-public")
override def compileOrder = CompileOrder.ScalaThenJava
override def managedStyle = ManagedStyle.Maven
@@ -25,12 +25,12 @@ class Project(info: ProjectInfo)
override def filterScalaJars = false
val netty = "org.jboss.netty" % "netty" % "3.2.2.Final"
- val finagle = "com.twitter" % "finagle" % "1.0.11-SNAPSHOT"
- val util = "com.twitter" % "util" % "1.4.4-SNAPSHOT"
+ val finagle = "com.twitter" % "finagle-core" % "1.0.20"
+ val util = "com.twitter" % "util" % "1.4.8"
val junit = "junit" % "junit" % "3.8.2" % "test"
override def distZipName = "%s.zip".format(name)
val mockito = "org.mockito" % "mockito-all" % "1.8.5" % "test" withSources()
- val specs = "org.scala-tools.testing" % "specs_2.8.0" % "1.6.5" % "test" withSources()
+ val specs = "org.scala-tools.testing" % "specs_2.8.1" % "1.6.6" % "test" withSources()
}
View
50 src/main/scala/com/twitter/schmemcached/Client.scala
@@ -28,17 +28,25 @@ object Client {
}
trait Client {
+ def set(key: String, flags: Int, expiry: Int, value: ChannelBuffer): Future[Response]
+ def add(key: String, flags: Int, expiry: Int, value: ChannelBuffer): Future[Response]
+ def append(key: String, flags: Int, expiry: Int, value: ChannelBuffer): Future[Response]
+ def prepend(key: String, flags: Int, expiry: Int, value: ChannelBuffer): Future[Response]
+ def replace(key: String, flags: Int, expiry: Int, value: ChannelBuffer): Future[Response]
+
def get(key: String): Future[Option[ChannelBuffer]]
def get(keys: Iterable[String]): Future[Map[String, ChannelBuffer]]
- def set(key: String, value: ChannelBuffer): Future[Response]
- def add(key: String, value: ChannelBuffer): Future[Response]
- def append(key: String, value: ChannelBuffer): Future[Response]
- def prepend(key: String, value: ChannelBuffer): Future[Response]
def delete(key: String): Future[Response]
def incr(key: String): Future[Int]
def incr(key: String, delta: Int): Future[Int]
def decr(key: String): Future[Int]
def decr(key: String, delta: Int): Future[Int]
+
+ def set(key: String, value: ChannelBuffer): Future[Response]= set(key, 0, 0, value)
+ def add(key: String, value: ChannelBuffer): Future[Response] = add(key, 0, 0, value)
+ def append(key: String, value: ChannelBuffer): Future[Response] = append(key, 0, 0, value)
+ def prepend(key: String, value: ChannelBuffer): Future[Response] = prepend(key, 0, 0, value)
+ def replace(key: String, value: ChannelBuffer): Future[Response] = replace(key, 0, 0, value)
}
protected class ConnectedClient(underlying: service.Client[Command, Response]) extends Client {
@@ -61,10 +69,17 @@ protected class ConnectedClient(underlying: service.Client[Command, Response]) e
}
}
- def set(key: String, value: ChannelBuffer) = underlying(Set(key, value))
- def add(key: String, value: ChannelBuffer) = underlying(Add(key, value))
- def append(key: String, value: ChannelBuffer) = underlying(Append(key, value))
- def prepend(key: String, value: ChannelBuffer) = underlying(Prepend(key, value))
+ def set(key: String, flags: Int, expiry: Int, value: ChannelBuffer) =
+ underlying(Set(key, flags, expiry, value))
+ def add(key: String, flags: Int, expiry: Int, value: ChannelBuffer) =
+ underlying(Add(key, flags, expiry, value))
+ def append(key: String, flags: Int, expiry: Int, value: ChannelBuffer) =
+ underlying(Append(key, flags, expiry, value))
+ def prepend(key: String, flags: Int, expiry: Int, value: ChannelBuffer) =
+ underlying(Prepend(key, flags, expiry, value))
+ def replace(key: String, flags: Int, expiry: Int, value: ChannelBuffer) =
+ underlying(Replace(key, flags, expiry, value))
+
def delete(key: String) = underlying(Delete(key))
def incr(key: String): Future[Int] = incr(key, 1)
def decr(key: String): Future[Int] = decr(key, 1)
@@ -87,7 +102,7 @@ protected class ConnectedClient(underlying: service.Client[Command, Response]) e
override def toString = hashCode.toString // FIXME this incompatible with Ketama
}
-protected class PartitionedClient(clients: Seq[Client], hash: String => Long) extends Client {
+class PartitionedClient(clients: Seq[Client], hash: String => Long) extends Client {
require(clients.size > 0, "At least one client must be provided")
private[this] val circle = {
@@ -116,10 +131,17 @@ protected class PartitionedClient(clients: Seq[Client], hash: String => Long) ex
}
}
- def set(key: String, value: ChannelBuffer) = idx(key).set(key, value)
- def add(key: String, value: ChannelBuffer) = idx(key).add(key, value)
- def append(key: String, value: ChannelBuffer) = idx(key).append(key, value)
- def prepend(key: String, value: ChannelBuffer) = idx(key).prepend(key, value)
+ def set(key: String, flags: Int, expiry: Int, value: ChannelBuffer) =
+ idx(key).set(key, flags, expiry, value)
+ def add(key: String, flags: Int, expiry: Int, value: ChannelBuffer) =
+ idx(key).add(key, flags, expiry, value)
+ def append(key: String, flags: Int, expiry: Int, value: ChannelBuffer) =
+ idx(key).append(key, flags, expiry, value)
+ def prepend(key: String, flags: Int, expiry: Int, value: ChannelBuffer) =
+ idx(key).prepend(key, flags, expiry, value)
+ def replace(key: String, flags: Int, expiry: Int, value: ChannelBuffer) =
+ idx(key).replace(key, flags, expiry, value)
+
def delete(key: String) = idx(key).delete(key)
def incr(key: String) = idx(key).incr(key)
def incr(key: String, delta: Int) = idx(key).incr(key, delta)
@@ -132,4 +154,4 @@ protected class PartitionedClient(clients: Seq[Client], hash: String => Long) ex
else circle.firstEntry.getValue
client
}
-}
+}
View
18 src/main/scala/com/twitter/schmemcached/Hash.scala
@@ -0,0 +1,18 @@
+package com.twitter.schmemcached
+
+object Hash {
+
+ val FNV1_32_PRIME = 16777619
+ def fnv1_32(key: String) : Long = {
+ var i = 0
+ val len = key.length
+ var rv: Long = 0x811c9dc5L
+ val keyBytes = key.getBytes("UTF-8")
+ while (i < len) {
+ rv = (rv * FNV1_32_PRIME) ^ (keyBytes(i) & 0xff)
+ i += 1
+ }
+
+ rv & 0xffffffffL
+ }
+}
View
10 src/main/scala/com/twitter/schmemcached/Interpreter.scala
@@ -18,12 +18,12 @@ class Interpreter(map: AtomicMap[ChannelBuffer, ChannelBuffer]) {
def apply(command: Command): Response = {
command match {
- case Set(key, value) =>
+ case Set(key, flags, expiry, value) =>
map.lock(key) { data =>
data(key) = value
Stored
}
- case Add(key, value) =>
+ case Add(key, flags, expiry, value) =>
map.lock(key) { data =>
val existing = data.get(key)
if (existing.isDefined)
@@ -33,7 +33,7 @@ class Interpreter(map: AtomicMap[ChannelBuffer, ChannelBuffer]) {
Stored
}
}
- case Replace(key, value) =>
+ case Replace(key, flags, expiry, value) =>
map.lock(key) { data =>
val existing = data.get(key)
if (existing.isDefined) {
@@ -43,7 +43,7 @@ class Interpreter(map: AtomicMap[ChannelBuffer, ChannelBuffer]) {
NotStored
}
}
- case Append(key, value) =>
+ case Append(key, flags, expiry, value) =>
map.lock(key) { data =>
val existing = data.get(key)
if (existing.isDefined) {
@@ -53,7 +53,7 @@ class Interpreter(map: AtomicMap[ChannelBuffer, ChannelBuffer]) {
NotStored
}
}
- case Prepend(key, value) =>
+ case Prepend(key, flags, expiry, value) =>
map.lock(key) { data =>
val existing = data.get(key)
if (existing.isDefined) {
View
26 src/main/scala/com/twitter/schmemcached/protocol/Command.scala
@@ -4,19 +4,19 @@ import org.jboss.netty.buffer.ChannelBuffer
sealed abstract class Command
-abstract class StorageCommand(key: ChannelBuffer, value: ChannelBuffer) extends Command
-abstract class ArithmeticCommand(key: ChannelBuffer, delta: Int) extends Command
-abstract class RetrievalCommand(keys: Seq[ChannelBuffer]) extends Command
+abstract class StorageCommand(key: ChannelBuffer, flags: Int, expiry: Int, value: ChannelBuffer) extends Command
+abstract class ArithmeticCommand(key: ChannelBuffer, delta: Int) extends Command
+abstract class RetrievalCommand(keys: Seq[ChannelBuffer]) extends Command
-case class Set(key: ChannelBuffer, value: ChannelBuffer) extends StorageCommand(key, value)
-case class Add(key: ChannelBuffer, value: ChannelBuffer) extends StorageCommand(key, value)
-case class Replace(key: ChannelBuffer, value: ChannelBuffer) extends StorageCommand(key, value)
-case class Append(key: ChannelBuffer, value: ChannelBuffer) extends StorageCommand(key, value)
-case class Prepend(key: ChannelBuffer, value: ChannelBuffer) extends StorageCommand(key, value)
+case class Set(key: ChannelBuffer, flags: Int, expiry: Int, value: ChannelBuffer) extends StorageCommand(key, flags, expiry, value)
+case class Add(key: ChannelBuffer, flags: Int, expiry: Int, value: ChannelBuffer) extends StorageCommand(key, flags, expiry, value)
+case class Replace(key: ChannelBuffer, flags: Int, expiry: Int, value: ChannelBuffer) extends StorageCommand(key, flags, expiry, value)
+case class Append(key: ChannelBuffer, flags: Int, expiry: Int, value: ChannelBuffer) extends StorageCommand(key, flags, expiry, value)
+case class Prepend(key: ChannelBuffer, flags: Int, expiry: Int, value: ChannelBuffer) extends StorageCommand(key, flags, expiry, value)
-case class Get(keys: Seq[ChannelBuffer]) extends RetrievalCommand(keys)
-case class Gets(keys: Seq[ChannelBuffer]) extends RetrievalCommand(keys)
+case class Get(keys: Seq[ChannelBuffer]) extends RetrievalCommand(keys)
+case class Gets(keys: Seq[ChannelBuffer]) extends RetrievalCommand(keys)
-case class Delete(key: ChannelBuffer) extends Command
-case class Incr(key: ChannelBuffer, value: Int) extends ArithmeticCommand(key, value)
-case class Decr(key: ChannelBuffer, value: Int) extends ArithmeticCommand(key, -value)
+case class Delete(key: ChannelBuffer) extends Command
+case class Incr(key: ChannelBuffer, value: Int) extends ArithmeticCommand(key, value)
+case class Decr(key: ChannelBuffer, value: Int) extends ArithmeticCommand(key, -value)
View
16 src/main/scala/com/twitter/schmemcached/protocol/text/ParseCommand.scala
@@ -27,7 +27,7 @@ object ParseCommand extends Parser[Command] {
val commandName = tokens.head
val args = tokens.tail
if (storageCommands.contains(commandName)) {
- validateStorageCommand(args)
+ validateStorageCommand(args, null)
Some(tokens(4).toInt)
} else None
}
@@ -36,11 +36,11 @@ object ParseCommand extends Parser[Command] {
val commandName = tokens.head
val args = tokens.tail
commandName match {
- case SET => Set(validateStorageCommand(args), data)
- case ADD => Add(validateStorageCommand(args), data)
- case REPLACE => Replace(validateStorageCommand(args), data)
- case APPEND => Append(validateStorageCommand(args), data)
- case PREPEND => Prepend(validateStorageCommand(args), data)
+ case SET => tupled(Set)(validateStorageCommand(args, data))
+ case ADD => tupled(Add)(validateStorageCommand(args, data))
+ case REPLACE => tupled(Replace)(validateStorageCommand(args, data))
+ case APPEND => tupled(Append)(validateStorageCommand(args, data))
+ case PREPEND => tupled(Prepend)(validateStorageCommand(args, data))
case _ => throw new NonexistentCommand(commandName.toString)
}
}
@@ -58,13 +58,13 @@ object ParseCommand extends Parser[Command] {
}
}
- private[this] def validateStorageCommand(tokens: Seq[ChannelBuffer]) = {
+ private[this] def validateStorageCommand(tokens: Seq[ChannelBuffer], data: ChannelBuffer) = {
if (tokens.size < 4) throw new ClientError("Too few arguments")
if (tokens.size == 5 && tokens(4) != NOREPLY) throw new ClientError("Too many arguments")
if (tokens.size > 5) throw new ClientError("Too many arguments")
if (!tokens(3).matches(DIGITS)) throw new ClientError("Bad frame length")
- tokens.head
+ (tokens.head, tokens(1).toInt, tokens(2).toInt, data)
}
private[this] def validateArithmeticCommand(tokens: Seq[ChannelBuffer]) = {
View
2  src/main/scala/com/twitter/schmemcached/protocol/text/Parser.scala
@@ -1,6 +1,6 @@
package com.twitter.schmemcached.protocol.text
-import org.jboss.netty.buffer.{ChannelBufferIndexFinder, ChannelBuffer}
+import org.jboss.netty.buffer.ChannelBufferIndexFinder
import collection.mutable.ArrayBuffer
import org.jboss.netty.buffer.ChannelBuffer
View
28 src/main/scala/com/twitter/schmemcached/protocol/text/Show.scala
@@ -36,6 +36,7 @@ object Show {
case Stored => STORED
case NotStored => NOT_STORED
case Deleted => DELETED
+ case NotFound => NOT_FOUND
case Number(value) =>
val buffer = ChannelBuffers.dynamicBuffer(10)
buffer.writeBytes(value.toString.getBytes)
@@ -63,16 +64,16 @@ object Show {
def apply(command: Command): ChannelBuffer = {
command match {
- case Add(key, value) =>
- showStorageCommand(ADD, key, value)
- case Set(key, value) =>
- showStorageCommand(SET, key, value)
- case Replace(key, value) =>
- showStorageCommand(REPLACE, key, value)
- case Append(key, value) =>
- showStorageCommand(APPEND, key, value)
- case Prepend(key, value) =>
- showStorageCommand(PREPEND, key, value)
+ case Add(key, flags, expiry, value) =>
+ showStorageCommand(ADD, key, flags, expiry, value)
+ case Set(key, flags, expiry, value) =>
+ showStorageCommand(SET, key, flags, expiry, value)
+ case Replace(key, flags, expiry, value) =>
+ showStorageCommand(REPLACE, key, flags, expiry, value)
+ case Append(key, flags, expiry, value) =>
+ showStorageCommand(APPEND, key, flags, expiry, value)
+ case Prepend(key, flags, expiry, value) =>
+ showStorageCommand(PREPEND, key, flags, expiry, value)
case Get(keys) =>
apply(Gets(keys))
case Gets(keys) =>
@@ -122,15 +123,16 @@ object Show {
case _ => throw throwable
}
- @inline private[this] def showStorageCommand(name: Array[Byte], key: ChannelBuffer, value: ChannelBuffer) = {
+ @inline private[this] def showStorageCommand(
+ name: Array[Byte], key: ChannelBuffer, flags: Int, expiry: Int, value: ChannelBuffer) = {
val buffer = ChannelBuffers.dynamicBuffer(50 + value.readableBytes)
buffer.writeBytes(name)
buffer.writeBytes(SPACE)
buffer.writeBytes(key)
buffer.writeBytes(SPACE)
- buffer.writeBytes(ZERO)
+ buffer.writeBytes(flags.toString.getBytes)
buffer.writeBytes(SPACE)
- buffer.writeBytes(ZERO)
+ buffer.writeBytes(expiry.toString.getBytes)
buffer.writeBytes(SPACE)
buffer.writeBytes(value.readableBytes.toString.getBytes)
buffer.writeBytes(DELIMETER)
View
39 src/test/scala/com/twitter/schmemcached/integration/ClientSpec.scala
@@ -1,19 +1,23 @@
package com.twitter.schmemcached.integration
import org.specs.Specification
+import com.twitter.finagle.builder.ClientBuilder
import com.twitter.finagle.builder.ServerBuilder
+import com.twitter.finagle.service.Service
+import com.twitter.schmemcached.Client
+import com.twitter.schmemcached.Server
import com.twitter.schmemcached.protocol._
import com.twitter.schmemcached.protocol.text.Memcached
-import com.twitter.finagle.builder.ClientBuilder
-import com.twitter.schmemcached.Client
-import org.jboss.netty.util.CharsetUtil
import com.twitter.schmemcached.util.ChannelBufferUtils._
+import com.twitter.util.RandomSocket
+import org.jboss.netty.util.CharsetUtil
object ClientSpec extends Specification {
/**
* Note: This test needs a real Memcached server running on 11211 to work!!
*/
"ConnectedClient" should {
+
"simple client" in {
val service = ClientBuilder()
.hosts("localhost:11211")
@@ -57,22 +61,41 @@ object ClientSpec extends Specification {
}
"partitioned client" in {
+ // we already proved above that we can hit a real memcache server,
+ // so we can use our own for the partitioned client test.
+ var server1: Server = null
+ var server2: Server = null
+ val address1 = RandomSocket()
+ val address2 = RandomSocket()
+
+ doBefore {
+ server1 = new Server(address1)
+ server1.start()
+ server2 = new Server(address2)
+ server2.start()
+ }
+
+ doAfter {
+ server1.stop()
+ server2.stop()
+ }
+
val service1 = ClientBuilder()
.name("service1")
- .hosts("localhost:11211")
+ .hosts("localhost:" + address1.getPort)
.codec(new Memcached)
.buildService[Command, Response]()
+
val service2 = ClientBuilder()
.name("service2")
- .hosts("localhost:11212")
+ .hosts("localhost:" + address2.getPort)
.codec(new Memcached)
.buildService[Command, Response]()
- val client = Client(Seq(service1, service2))
-
- client.delete("foo")()
+ val client = Client(Seq(service1, service2))
"doesn't blow up" in {
+ client.delete("foo")()
client.get("foo")() mustEqual None
client.set("foo", "bar")()
client.get("foo")().get.toString(CharsetUtil.UTF_8) mustEqual "bar"
View
3  src/test/scala/com/twitter/schmemcached/integration/InterpreterServiceSpec.scala
@@ -35,7 +35,8 @@ object InterpreterServiceSpec extends Specification {
val key = "key"
val value = "value"
val result = for {
- _ <- client(Set(key, value))
+ _ <- client(Delete(key))
+ _ <- client(Set(key, 0, 0, value))
r <- client(Get(Seq(key)))
} yield r
result(1.second) mustEqual Values(Seq(Value(key, value)))
View
8 src/test/scala/com/twitter/schmemcached/stress/InterpreterServiceSpec.scala
@@ -33,11 +33,13 @@ object InterpreterServiceSpec extends Specification {
val value = "value"
val start = System.currentTimeMillis
(0 until 100) map { i =>
- val key = _key + "i"
- client(Set(key, value))()
- client(Get(Seq(key)))()
+ val key = _key + i
+ client(Delete(key))()
+ client(Set(key, 0, 0, value))()
+ client(Get(Seq(key)))() mustEqual Values(Seq(Value(key, value)))
}
val end = System.currentTimeMillis
+ println("%d ms".format(end - start))
}
}
}
View
3  src/test/scala/com/twitter/schmemcached/unit/InterpreterSpec.scala
@@ -16,7 +16,8 @@ class InterpreterSpec extends Specification {
"set & get" in {
val key = "foo"
val value = "bar"
- interpreter(Set(key, value))
+ interpreter(Delete(key))
+ interpreter(Set(key, 0, 0, value))
interpreter(Get(Seq(key))) mustEqual Values(Seq(Value(key, value)))
}
}
View
16 src/test/scala/com/twitter/schmemcached/unit/protocol/text/ParseSpec.scala
@@ -24,11 +24,11 @@ class ParserSpec extends Specification {
"parse storage commands" in {
val buffer = "bar"
- ParseCommand(Seq("add", "foo", "0", "0", "3"), buffer) mustEqual Add("foo", buffer)
- ParseCommand(Seq("set", "foo", "0", "0", "3"), buffer) mustEqual Set("foo", buffer)
- ParseCommand(Seq("replace", "foo", "0", "0", "3"), buffer) mustEqual Replace("foo", buffer)
- ParseCommand(Seq("append", "foo", "0", "0", "3"), buffer) mustEqual Append("foo", buffer)
- ParseCommand(Seq("prepend", "foo", "0", "0", "3"), buffer) mustEqual Prepend("foo", buffer)
+ ParseCommand(Seq("add", "foo", "1", "2", "3"), buffer) mustEqual Add("foo", 1, 2, buffer)
+ ParseCommand(Seq("set", "foo", "1", "2", "3"), buffer) mustEqual Set("foo", 1, 2, buffer)
+ ParseCommand(Seq("replace", "foo", "1", "2", "3"), buffer) mustEqual Replace("foo", 1, 2, buffer)
+ ParseCommand(Seq("append", "foo", "1", "2", "3"), buffer) mustEqual Append("foo", 1, 2, buffer)
+ ParseCommand(Seq("prepend", "foo", "1", "2", "3"), buffer) mustEqual Prepend("foo", 1, 2, buffer)
}
}
@@ -43,9 +43,9 @@ class ParserSpec extends Specification {
}
"parse simple responses" in {
- ParseResponse(Seq("STORED")) mustEqual Stored()
- ParseResponse(Seq("NOT_STORED")) mustEqual NotStored()
- ParseResponse(Seq("DELETED")) mustEqual Deleted()
+ ParseResponse(Seq("STORED")) mustEqual Stored
+ ParseResponse(Seq("NOT_STORED")) mustEqual NotStored
+ ParseResponse(Seq("DELETED")) mustEqual Deleted
}
"parse values" in {
View
2  src/test/scala/com/twitter/schmemcached/unit/protocol/text/ShowSpec.scala
@@ -10,7 +10,7 @@ object ShowSpec extends Specification {
"Show" should {
"show commands" in {
val value = "value"
- Show(Add("key", value)).toString(CharsetUtil.UTF_8) mustEqual "add key 0 0 5\r\nvalue\r\n"
+ Show(Add("key", 1, 2, value)).toString(CharsetUtil.UTF_8) mustEqual "add key 1 2 5\r\nvalue\r\n"
}
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.