Permalink
Browse files

more client functionality

  • Loading branch information...
1 parent 9fd91db commit 69e0659dc1e01aaed2b9656472ca4f320ee260a4 Nick Kallen committed Dec 23, 2010
@@ -0,0 +1,45 @@
+package com.twitter.twemcached
+
+import com.twitter.finagle.service
+import protocol._
+import com.twitter.twemcached.util.ChannelBufferUtils._
+import com.twitter.util.Future
+import org.jboss.netty.util.CharsetUtil
+
+class Client(underlying: service.Client[Command, Response]) {
+ def get(key: String) = {
+ underlying(Get(Seq(key))) map { case Values(values) =>
+ if (values.size > 0) Some(values.head.value)
+ else None
+ }
+ }
+
+ def get(keys: String*) = {
+ underlying(Get(keys)) map { case Values(values) =>
+ val tuples = values.map { case Value(key, value) =>
+ (key.toString(CharsetUtil.UTF_8), value)
+ }
+ Map(tuples: _*)
+ }
+ }
+
+ def set(key: String, value: String) = underlying(Set(key, value))
+ def add(key: String, value: String) = underlying(Add(key, value))
+ def append(key: String, value: String) = underlying(Append(key, value))
+ def prepend(key: String, value: String) = underlying(Prepend(key, value))
+ def delete(key: String) = underlying(Delete(key))
+
+ def incr(key: String): Future[Int] = incr(key, 1)
+ def incr(key: String, delta: Int): Future[Int] = {
+ underlying(Incr(key, delta)) map { case Number(value) =>
+ value
+ }
+ }
+
+ def decr(key: String): Future[Int] = decr(key, 1)
+ def decr(key: String, delta: Int): Future[Int] = {
+ underlying(Decr(key, delta)) map { case Number(value) =>
+ value
+ }
+ }
+}
@@ -76,22 +76,26 @@ class Interpreter(map: AtomicMap[ChannelBuffer, ChannelBuffer]) {
if (data.remove(key).isDefined)
Deleted
else
- NotStored
+ NotFound
}
- case Incr(key, value) =>
+ case Incr(key, delta) =>
map.lock(key) { data =>
val existing = data.get(key)
if (existing.isDefined) {
- data(key) = {
- val existingString = existing.get.toString(CharsetUtil.US_ASCII)
- if (existingString.matches(DIGITS))
- (existingString.toInt + value).toString
- else
- value.toString
- }
- Stored
+ val existingString = existing.get.toString(CharsetUtil.US_ASCII)
+ if (!existingString.isEmpty && !existingString.matches(DIGITS))
+ throw new ClientError("cannot increment or decrement non-numeric value")
+
+ val existingValue =
+ if (existingString.isEmpty) 0
+ else existingString.toInt
+
+ val result = existingValue + delta
+ data(key) = result.toString
+
+ Number(result)
} else {
- NotStored
+ NotFound
}
}
case Decr(key, value) =>
@@ -4,7 +4,7 @@ import java.net.InetSocketAddress
object Main {
def main(args: Array[String]) {
- val server = new MemcachedServer(new InetSocketAddress(11214))
+ val server = new Server(new InetSocketAddress(11214))
server.start()
}
}
@@ -1,3 +0,0 @@
-package com.twitter.twemcached
-
-class MemcachedClient
@@ -8,7 +8,7 @@ import java.net.SocketAddress
import com.twitter.util.SynchronizedLruMap
import util.AtomicMap
-class MemcachedServer(address: SocketAddress) {
+class Server(address: SocketAddress) {
val concurrencyLevel = 16
val slots = 500000
val slotsPerLru = slots / concurrencyLevel
@@ -13,6 +13,7 @@ object ParseResponse extends Parser[Response] {
}
private[this] val VALUE = "VALUE": ChannelBuffer
private[this] val STORED = "STORED": ChannelBuffer
+ private[this] val NOT_FOUND = "NOT_FOUND": ChannelBuffer
private[this] val NOT_STORED = "NOT_STORED": ChannelBuffer
private[this] val DELETED = "DELETED": ChannelBuffer
private[this] val END = "END": ChannelBuffer
@@ -31,9 +32,11 @@ object ParseResponse extends Parser[Response] {
def apply(tokens: Seq[ChannelBuffer]) = {
tokens.head match {
+ case NOT_FOUND => NotFound
case STORED => Stored
case NOT_STORED => NotStored
case DELETED => Deleted
+ case ds => Number(ds.toInt)
}
}
@@ -3,10 +3,12 @@ package com.twitter.twemcached.protocol
import org.jboss.netty.buffer.ChannelBuffer
sealed abstract class Response
+case object NotFound extends Response
case object Stored extends Response
case object NotStored extends Response
case object Deleted extends Response
case class Values(values: Seq[Value]) extends Response
+case class Number(value: Int) extends Response
case class Value(key: ChannelBuffer, value: ChannelBuffer)
@@ -36,6 +36,10 @@ object Show {
case Stored => STORED
case NotStored => NOT_STORED
case Deleted => DELETED
+ case Number(value) =>
+ val buffer = ChannelBuffers.dynamicBuffer(10)
+ buffer.writeBytes(value.toString.getBytes)
+ buffer
case Values(values) =>
val buffer = ChannelBuffers.dynamicBuffer(100 * values.size)
val shown = values map { case Value(key, value) =>
@@ -85,14 +89,20 @@ object Show {
val buffer = ChannelBuffers.dynamicBuffer(50)
buffer.writeBytes(INCR)
buffer.writeBytes(SPACE)
+ buffer.writeBytes(key)
+ buffer.writeBytes(SPACE)
buffer.writeBytes(amount.toString.getBytes)
+ buffer.writeBytes(SPACE)
buffer.writeBytes(DELIMETER)
buffer
case Decr(key, amount) =>
val buffer = ChannelBuffers.dynamicBuffer(30)
buffer.writeBytes(DECR)
buffer.writeBytes(SPACE)
+ buffer.writeBytes(key)
+ buffer.writeBytes(SPACE)
buffer.writeBytes(amount.toString.getBytes)
+ buffer.writeBytes(SPACE)
buffer.writeBytes(DELIMETER)
buffer
case Delete(key) =>
@@ -16,7 +16,11 @@ class Decoder extends AbstractDecoder[Response] with StateMachine {
state match {
case AwaitingResponse() =>
decodeLine(buffer, ParseResponse.needsData(_)) { tokens =>
- ParseResponse(tokens)
+ if (ParseResponse.isEnd(tokens)) {
+ ParseResponse.parseValues(Seq())
+ } else {
+ ParseResponse(tokens)
+ }
}
case AwaitingData(valuesSoFar, tokens, bytesNeeded) =>
decodeData(bytesNeeded, buffer) { data =>
@@ -14,6 +14,9 @@ object ChannelBufferUtils {
implicit def stringToChannelBuffer(string: String) =
ChannelBuffers.wrappedBuffer(string.getBytes)
+ implicit def seqOfStringToSeqOfChannelBuffer(strings: Seq[String]) =
+ strings.map { string => ChannelBuffers.wrappedBuffer(string.getBytes) }
+
implicit def stringToByteArray(string: String) =
string.getBytes
}
@@ -0,0 +1,47 @@
+package com.twitter.twemcached.integration
+
+import org.specs.Specification
+import com.twitter.finagle.builder.ServerBuilder
+import com.twitter.twemcached.protocol._
+import com.twitter.twemcached.protocol.text.Memcached
+import com.twitter.finagle.builder.ClientBuilder
+import com.twitter.twemcached.Client
+import org.jboss.netty.util.CharsetUtil
+
+object ClientSpec extends Specification {
+ /**
+ * Note: This test needs a real Memcached server running on 11211 to work!!
+ */
+ "Client" should {
+ "work" in {
+ val service = ClientBuilder().hosts("localhost:11211").codec(Memcached).buildService[Command, Response]()
+ val client = new Client(service)
+
+ client.delete("foo")()
+
+ "set & get" in {
+ client.get("foo")() mustEqual None
+ client.set("foo", "bar")()
+ client.get("foo")().get.toString(CharsetUtil.UTF_8) mustEqual "bar"
+ }
+
+ "gets" in {
+ client.set("foo", "bar")()
+ client.set("baz", "boing")()
+ val result = client.get("foo", "baz", "notthere")()
+ .map { case (key, value) => (key, value.toString(CharsetUtil.UTF_8)) }
+ result mustEqual Map(
+ "foo" -> "bar",
+ "baz" -> "boing"
+ )
+ }
+
+ "incr & decr" in {
+ client.set("foo", "")()
+ client.incr("foo")() mustEqual 1
+ client.incr("foo", 2)() mustEqual 3
+ client.decr("foo")() mustEqual 2
+ }
+ }
+ }
+}
@@ -1,7 +1,7 @@
package com.twitter.twemcached.integration
import org.specs.Specification
-import com.twitter.twemcached.MemcachedServer
+import com.twitter.twemcached.Server
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.twemcached.protocol._
import org.jboss.netty.buffer.ChannelBuffers
@@ -14,12 +14,12 @@ import com.twitter.twemcached.util.ChannelBufferUtils._
object InterpreterServiceSpec extends Specification {
"InterpreterService" should {
- var server: MemcachedServer = null
+ var server: Server = null
var client: Service[Command, Response] = null
doBefore {
val address = RandomSocket()
- server = new MemcachedServer(address)
+ server = new Server(address)
server.start()
client = ClientBuilder()
.hosts("localhost:" + address.getPort)
@@ -1,7 +1,7 @@
package com.twitter.twemcached.stress
import org.specs.Specification
-import com.twitter.twemcached.MemcachedServer
+import com.twitter.twemcached.Server
import com.twitter.finagle.builder.ClientBuilder
import com.twitter.twemcached.protocol._
import com.twitter.util.RandomSocket
@@ -11,12 +11,12 @@ import com.twitter.twemcached.util.ChannelBufferUtils._
object InterpreterServiceSpec extends Specification {
"InterpreterService" should {
- var server: MemcachedServer = null
+ var server: Server = null
var client: Service[Command, Response] = null
doBefore {
val address = RandomSocket()
- server = new MemcachedServer(address)
+ server = new Server(address)
server.start()
client = ClientBuilder()
.hosts("localhost:" + address.getPort)
@@ -34,8 +34,8 @@ object InterpreterServiceSpec extends Specification {
val start = System.currentTimeMillis
(0 until 100) map { i =>
val key = _key + "i"
- client(Set(key, value))
- client(Get(Seq(key)))
+ client(Set(key, value))()
+ client(Get(Seq(key)))()
}
val end = System.currentTimeMillis
}

0 comments on commit 69e0659

Please sign in to comment.