Permalink
Browse files

[split] Add Array[Byte] and String interface to finagle-memcache clie…

…nt. Add flakestore project. - Add Array[Byte] and String interface to finagle-memcache Client, using withBytes and withStrings methods. - Add a MockClient backed by a map - Add a YamlMemcacheConfig (Config[Client]) to twitter-config (copied from gluebird) - Breakout flakestore project from limiter. - ChannelBuffersUtils: Add methods to convert ChannelBuffers to string, byte array.
  • Loading branch information...
1 parent 5b9ae0d commit af504702fde3f437612dbc6c6c55e45c86794aab @hjz hjz committed Nov 3, 2011
@@ -12,7 +12,7 @@ import com.twitter.finagle.memcached.protocol._
import com.twitter.finagle.memcached.util.ChannelBufferUtils._
import com.twitter.finagle.Service
import com.twitter.hashing._
-import com.twitter.util.{Time, Future}
+import com.twitter.util.{Time, Future, Bijection}
import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.util.CharsetUtil
@@ -73,39 +73,41 @@ object GetResult {
/**
* A friendly client to talk to a Memcached server.
*/
-trait Client {
+trait BaseClient[T] {
+ def channelBufferToType(a: ChannelBuffer): T
+
/**
* Store a key. Override an existing value.
* @return true
*/
- def set(key: String, flags: Int, expiry: Time, value: ChannelBuffer): Future[Unit]
+ def set(key: String, flags: Int, expiry: Time, value: T): Future[Unit]
/**
* Store a key but only if it doesn't already exist on the server.
* @return true if stored, false if not stored
*/
- def add(key: String, flags: Int, expiry: Time, value: ChannelBuffer): Future[Boolean]
+ def add(key: String, flags: Int, expiry: Time, value: T): Future[Boolean]
/**
* Append bytes to the end of an existing key. If the key doesn't exist, the
* operation has no effect.
* @return true if stored, false if not stored
*/
- def append(key: String, flags: Int, expiry: Time, value: ChannelBuffer): Future[Boolean]
+ def append(key: String, flags: Int, expiry: Time, value: T): Future[Boolean]
/**
* Prepend bytes to the beginning of an existing key. If the key doesn't
* exist, the operation has no effect.
* @return true if stored, false if not stored
*/
- def prepend(key: String, flags: Int, expiry: Time, value: ChannelBuffer): Future[Boolean]
+ def prepend(key: String, flags: Int, expiry: Time, value: T): Future[Boolean]
/**
* Replace bytes on an existing key. If the key doesn't exist, the
* operation has no effect.
* @return true if stored, false if not stored
*/
- def replace(key: String, flags: Int, expiry: Time, value: ChannelBuffer): Future[Boolean]
+ def replace(key: String, flags: Int, expiry: Time, value: T): Future[Boolean]
/**
* Perform a CAS operation on the key, only if the value has not
@@ -116,33 +118,32 @@ trait Client {
* @return true if replaced, false if not
*/
def cas(
- key: String, flags: Int, expiry: Time, value: ChannelBuffer, casUnique: ChannelBuffer
+ key: String, flags: Int, expiry: Time, value: T, casUnique: ChannelBuffer
): Future[Boolean]
-
/**
* Get a key from the server.
*/
- def get(key: String): Future[Option[ChannelBuffer]] = get(Seq(key)) map { _.values.headOption }
+ def get(key: String): Future[Option[T]] = get(Seq(key)) map { _.values.headOption }
/**
* Get a key from the server, with a "cas unique" token. The token
* is treated opaquely by the memcache client but is in reality a
* string-encoded u64.
*/
- def gets(key: String): Future[Option[(ChannelBuffer, ChannelBuffer)]] =
+ def gets(key: String): Future[Option[(T, ChannelBuffer)]] =
gets(Seq(key)) map { _.values.headOption }
/**
* Get a set of keys from the server.
- * @return a Map[String, ChannelBuffer] of all of the keys that the server had.
+ * @return a Map[String, T] of all of the keys that the server had.
*/
- def get(keys: Iterable[String]): Future[Map[String, ChannelBuffer]] = {
+ def get(keys: Iterable[String]): Future[Map[String, T]] = {
getResult(keys) flatMap { result =>
if (result.failures.nonEmpty) {
Future.exception(result.failures.values.head)
} else {
- Future.value(result.values)
+ Future.value(result.values mapValues { channelBufferToType(_) })
}
}
}
@@ -152,20 +153,21 @@ trait Client {
* token. The token is treated opaquely by the memcache client but
* is in reality a string-encoded u64.
*
- * @return a Map[String, (ChannelBuffer, ChannelBuffer)] of all the
+ * @return a Map[String, (T, ChannelBuffer)] of all the
* keys the server had, together with their "cas unique" token
*/
- def gets(keys: Iterable[String]): Future[Map[String, (ChannelBuffer, ChannelBuffer)]] = {
+ def gets(keys: Iterable[String]): Future[Map[String, (T, ChannelBuffer)]] = {
getsResult(keys) flatMap { result =>
if (result.failures.nonEmpty) {
Future.exception(result.failures.values.head)
} else {
- Future.value(result.valuesWithTokens)
+ Future.value(result.valuesWithTokens mapValues {
+ case (v, u) => (channelBufferToType(v), u)
+ })
}
}
}
-
/**
* Get a set of keys from the server. Returns a Future[GetResult] that
* encapsulates hits, misses and failures.
@@ -189,52 +191,52 @@ trait Client {
* Increment a key. Interpret the value as an Long if it is parsable.
* This operation has no effect if there is no value there already.
*/
- def incr(key: String): Future[Option[Long]]
def incr(key: String, delta: Long): Future[Option[Long]]
+ def incr(key: String): Future[Option[Long]] = incr(key, 1L)
/**
* Decrement a key. Interpret the value as an Long if it is parsable.
* This operation has no effect if there is no value there already.
*/
- def decr(key: String): Future[Option[Long]]
def decr(key: String, delta: Long): Future[Option[Long]]
+ def decr(key: String): Future[Option[Long]] = decr(key, 1L)
/**
* Store a key. Override an existing values.
* @return true
*/
- def set(key: String, value: ChannelBuffer): Future[Unit] =
+ def set(key: String, value: T): Future[Unit] =
set(key, 0, Time.epoch, value)
/**
* Store a key but only if it doesn't already exist on the server.
* @return true if stored, false if not stored
*/
- def add(key: String, value: ChannelBuffer): Future[Boolean] =
+ def add(key: String, value: T): Future[Boolean] =
add(key, 0, Time.epoch, value)
/**
* Append a set of bytes to the end of an existing key. If the key doesn't
* exist, the operation has no effect.
* @return true if stored, false if not stored
*/
- def append(key: String, value: ChannelBuffer): Future[Boolean] =
+ def append(key: String, value: T): Future[Boolean] =
append(key, 0, Time.epoch, value)
/**
* Prepend a set of bytes to the beginning of an existing key. If the key
* doesn't exist, the operation has no effect.
* @return true if stored, false if not stored
*/
- def prepend(key: String, value: ChannelBuffer): Future[Boolean] =
+ def prepend(key: String, value: T): Future[Boolean] =
prepend(key, 0, Time.epoch, value)
/**
* Replace an item if it exists. If it doesn't exist, the operation has no
* effect.
* @return true if stored, false if not stored
*/
- def replace(key: String, value: ChannelBuffer): Future[Boolean] = replace(key, 0, Time.epoch, value)
+ def replace(key: String, value: T): Future[Boolean] = replace(key, 0, Time.epoch, value)
/**
* Perform a CAS operation on the key, only if the value has not
@@ -244,7 +246,7 @@ trait Client {
*
* @return true if replaced, false if not
*/
- def cas(key: String, value: ChannelBuffer, casUnique: ChannelBuffer): Future[Boolean] =
+ def cas(key: String, value: T, casUnique: ChannelBuffer): Future[Boolean] =
cas(key, 0, Time.epoch, value, casUnique)
/**
@@ -260,6 +262,29 @@ trait Client {
def release(): Unit
}
+trait Client extends BaseClient[ChannelBuffer] {
+ def channelBufferToType(v: ChannelBuffer) = v
+
+ def adapt[T](bijection: Bijection[ChannelBuffer, T]): BaseClient[T] =
+ new ClientAdaptor[T](this, bijection)
+
+ /** Adaptor to use String as values */
+ def withStrings: BaseClient[String] = adapt(
+ new Bijection[ChannelBuffer, String] {
+ def apply(a: ChannelBuffer): String = channelBufferToString(a)
+ def invert(b: String): ChannelBuffer = stringToChannelBuffer(b)
+ }
+ )
+
+ /** Adaptor to use Array[Byte] as values */
+ def withBytes: BaseClient[Array[Byte]] = adapt(
+ new Bijection[ChannelBuffer, Array[Byte]] {
+ def apply(a: ChannelBuffer): Array[Byte] = channelBufferToBytes(a)
+ def invert(b: Array[Byte]): ChannelBuffer = bytesToChannelBuffer(b)
+ }
+ )
+}
+
/**
* A Client connected to an individual Memcached server.
*
@@ -345,10 +370,6 @@ protected class ConnectedClient(service: Service[Command, Response]) extends Cli
case _ => throw new IllegalStateException
}
- def incr(key: String) = incr(key, 1L)
-
- def decr(key: String) = decr(key, 1L)
-
def incr(key: String, delta: Long): Future[Option[Long]] = {
service(Incr(key, delta)) map {
case Number(value) => Some(value)
@@ -422,9 +443,7 @@ trait PartitionedClient extends Client {
def delete(key: String) = clientOf(key).delete(key)
- def incr(key: String) = clientOf(key).incr(key)
def incr(key: String, delta: Long) = clientOf(key).incr(key, delta)
- def decr(key: String) = clientOf(key).decr(key)
def decr(key: String, delta: Long) = clientOf(key).decr(key, delta)
}
@@ -0,0 +1,33 @@
+package com.twitter.finagle.memcached
+
+import com.twitter.util.{Time, Future, Bijection}
+import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
+
+class ClientAdaptor[T](
+ val self: Client,
+ bijection: Bijection[ChannelBuffer, T]
+) extends BaseClient[T] with Proxy {
+ def channelBufferToType(a: ChannelBuffer): T = bijection(a)
+
+ def set(key: String, flags: Int, expiry: Time, value: T): Future[Unit] =
+ self.set(key, flags, expiry, bijection.inverse(value))
+ def add(key: String, flags: Int, expiry: Time, value: T): Future[Boolean] =
+ self.add(key, flags, expiry, bijection.inverse(value))
+ def append(key: String, flags: Int, expiry: Time, value: T): Future[Boolean] =
+ self.append(key, flags, expiry, bijection.inverse(value))
+ def prepend(key: String, flags: Int, expiry: Time, value: T): Future[Boolean] =
+ self.prepend(key, flags, expiry, bijection.inverse(value))
+ def replace(key: String, flags: Int, expiry: Time, value: T): Future[Boolean] =
+ self.replace(key, flags, expiry, bijection.inverse(value))
+ def cas(key: String, flags: Int, expiry: Time, value: T, casUnique: ChannelBuffer): Future[Boolean] =
+ self.cas(key, flags, expiry, bijection.inverse(value), casUnique)
+
+ def getResult(keys: Iterable[String]): Future[GetResult] = self.getResult(keys)
+ def getsResult(keys: Iterable[String]): Future[GetsResult] = self.getsResult(keys)
+
+ def delete(key: String): Future[Boolean] = self.delete(key)
+ def incr(key: String, delta: Long): Future[Option[Long]] = self.incr(key, delta)
+ def decr(key: String, delta: Long): Future[Option[Long]] = self.decr(key, delta)
+
+ def release(): Unit = self.release()
+}
Oops, something went wrong.

0 comments on commit af50470

Please sign in to comment.