Permalink
Browse files

Merge branch 'master' of github.com:twitter/finagle

  • Loading branch information...
2 parents 9cb4448 + 21121ad commit 0a3ceae8d52e835f8bc911604e9048c0cf8ce7ad @mariusae mariusae committed Jan 29, 2011
@@ -0,0 +1,125 @@
+package com.twitter.finagle.kestrel.java;
+
+import com.twitter.concurrent.Channel;
+import com.twitter.finagle.Service;
+import com.twitter.finagle.kestrel.protocol.Command;
+import com.twitter.finagle.kestrel.protocol.Response;
+import com.twitter.util.Duration;
+import com.twitter.util.Future;
+import com.twitter.util.Time;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A Java-friendly Client for interacting with Kestrel.
+ */
+public abstract class Client {
+ public static Client newInstance(Service<Command, Response> finagleClient) {
+ com.twitter.finagle.kestrel.Client kestrelClient =
+ com.twitter.finagle.kestrel.Client$.MODULE$.apply(finagleClient);
+ return new com.twitter.finagle.kestrel.java.ClientBase(kestrelClient);
+ }
+
+ /**
+ * Dequeue an item.
+ *
+ * @param key the name of the queue.
+ * @param waitFor if the queue is empty, wait up to this duration for an item to arrive.
+ * @return A Future<ChannelBuffer>
+ */
+ abstract public Future<ChannelBuffer> get(String key, Duration waitFor);
+
+ /**
+ * Enqueue an item.
+ *
+ * @param key the queue to enqueue into
+ * @param value the value to enqueue
+ * @param expiry indicates to Kestrel to discard the item if it isn't dequeued in time.
+ * @return a Future<Response> indicating success for failure.
+ */
+ abstract public Future<Response> set(String key, ChannelBuffer value, Time expiry);
+
+ /**
+ * Delete a queue. Kestrel will actually delete the queue's journal file and all items in the queue.
+ *
+ * @param key the queue name.
+ * @return a Future<Response> indicating success if the queue already exists
+ */
+ abstract public Future<Response> delete(String key);
+
+ /**
+ * Flush/empty a queue. The journal file is preserved
+ *
+ * @param key the queue name
+ * @return a Future<Response> indicating success if the queue already exists
+ */
+ abstract public Future<Response> flush(String key);
+
+ /**
+ * A friendly Channel object for Dequeueing items from a queue as they arrive.
+ * @param key queue name
+ * @param waitFor if the queue is empty, wait up to this duration for something to arrive before explicitly calling dequeueing again. A sensible value for this is infinity.
+ * @return
+ */
+ abstract public Channel channel(String key, Duration waitFor);
+
+ /**
+ * Dequeue an item
+ *
+ * @param key the queue name
+ * @return a Channel buffer if the item exists, null otherwise.
+ */
+ public Future<ChannelBuffer> get(String key) {
+ return this.get(key, Duration.apply(0, TimeUnit.SECONDS));
+ }
+
+ /**
+ * Get a channel for the given queue.
+ *
+ * @param key the queue name
+ * @return
+ */
+ public Channel channel(String key) {
+ return this.channel(key, Duration.apply(10, TimeUnit.SECONDS));
+ }
+
+ /**
+ * Enqueue an item.
+ *
+ * @param key the queue
+ * @param value the item as a ChannelBuffer
+ * @return a Future<Reponse> indicating success or failure.
+ */
+ public Future<Response> set(String key, ChannelBuffer value) {
+ return this.set(key, value);
+ }
+
+ /**
+ * Enqueue an item.
+ *
+ * @param key the queue
+ * @param value the item as a String. The bytes behind the String are stored in Kestrel
+ * @return a Future<Response> indicating success or failutre.
+ */
+ public Future<Response> set(String key, String value) {
+ return this.set(key, toChannelBuffer(value));
+ }
+
+ /**
+ * Enqueue an item.
+ *
+ * @param key the queue
+ * @param value the item as a string. The bytes behind the string are stored in Kestrel.
+ * @param expiry indicates to Kestrel to delete the item if it is not enqueued in time.
+ * @return a Future<Response> indicating success or failure.
+ */
+ public Future<Response> set(String key, String value, Time expiry) {
+ return this.set(key, toChannelBuffer(value), expiry);
+ }
+
+ private ChannelBuffer toChannelBuffer(String value) {
+ return ChannelBuffers.wrappedBuffer(value.getBytes());
+ }
+}
@@ -0,0 +1,47 @@
+package com.twitter.finagle.kestrel.java;
+
+import com.twitter.concurrent.Channel;
+import com.twitter.finagle.kestrel.protocol.Response;
+import com.twitter.util.Duration;
+import com.twitter.util.Function;
+import com.twitter.util.Future;
+import com.twitter.util.Time;
+import org.jboss.netty.buffer.ChannelBuffer;
+import scala.Option;
+
+public class ClientBase extends com.twitter.finagle.kestrel.java.Client {
+ com.twitter.finagle.kestrel.Client underlying;
+
+ public ClientBase(com.twitter.finagle.kestrel.Client underlying) {
+ this.underlying = underlying;
+ }
+
+ public Future<ChannelBuffer> get(String key, Duration waitFor) {
+ Future<Option<ChannelBuffer>> result = underlying.get(key, waitFor);
+ return result.map(new Function<Option<ChannelBuffer>, ChannelBuffer>() {
+ public ChannelBuffer apply(Option<ChannelBuffer> value) {
+ if (value.isDefined()) {
+ return (ChannelBuffer)value.get();
+ } else {
+ return null;
+ }
+ }
+ });
+ }
+
+ public Future<Response> set(String key, ChannelBuffer value, Time expiry) {
+ return underlying.set(key, value, expiry);
+ }
+
+ public Future<Response> delete(String key) {
+ return underlying.delete(key);
+ }
+
+ public Future<Response> flush(String key) {
+ return underlying.delete(key);
+ }
+
+ public Channel channel(String key, Duration waitFor) {
+ return underlying.channel(key, waitFor);
+ }
+}
@@ -14,6 +14,9 @@ object Client {
}
}
+/**
+ * A friendly Kestrel client Interface.
+ */
trait Client {
def set(queueName: String, value: ChannelBuffer, expiry: Time = Time.epoch): Future[Response]
def get(queueName: String, waitUpTo: Duration = 0.seconds): Future[Option[ChannelBuffer]]
@@ -22,6 +25,11 @@ trait Client {
def channel(queueName: String, waitUpTo: Duration = 0.seconds): Channel[ChannelBuffer]
}
+/**
+ * A Client representing a single TCP connection to a single server.
+ *
+ * @param underlying a Service[Command, Response]. '''Note:''' underlying MUST not use a connection pool or load-balance!
+ */
protected class ConnectedClient(underlying: Service[Command, Response]) extends Client {
def flush(queueName: String) = {
underlying(Flush(queueName))
@@ -31,17 +39,32 @@ protected class ConnectedClient(underlying: Service[Command, Response]) extends
underlying(Delete(queueName))
}
+ /**
+ * Enqueue an item.
+ *
+ * @param expiry how long the item is valid for (Kestrel will delete the item if it isn't dequeued in time.
+ */
def set(queueName: String, value: ChannelBuffer, expiry: Time = Time.epoch) = {
underlying(Set(queueName, expiry, value))
}
+ /**
+ * Dequeue an item.
+ *
+ * @param waitUpTo if the queue is empty, indicate to the Kestrel server how long to block the operation, waiting for something to arrive, before returning None
+ */
def get(queueName: String, waitUpTo: Duration = 0.seconds) = {
underlying(Get(queueName, collection.Set(Timeout(waitUpTo)))) map {
case Values(Seq()) => None
case Values(Seq(Value(key, value))) => Some(value)
}
}
+ /**
+ * Get a channel for the given queue
+ *
+ * @return A Channel object that you can receive items from as they arrive.
+ */
def channel(queueName: String, waitUpTo: Duration = 10.seconds): Channel[ChannelBuffer] = {
val channel = new Topic[ChannelBuffer]
channel.onReceive {
@@ -10,21 +10,69 @@
import java.util.List;
import java.util.Map;
+/**
+ * A Java-friendly memcached client.
+ */
public abstract class Client {
+ /**
+ * Construct a Client from a single Service<Command, Response>
+ *
+ * @param finagleClient a Service<Command, Response>
+ * @return a Client.
+ */
public static Client newInstance(Service<Command, Response> finagleClient) {
- com.twitter.finagle.memcached.Client schmemcachedClient = com.twitter.finagle.memcached.Client$.MODULE$.apply(finagleClient);
+ com.twitter.finagle.memcached.Client schmemcachedClient =
+ com.twitter.finagle.memcached.Client$.MODULE$.apply(finagleClient);
return new com.twitter.finagle.memcached.java.ClientBase(schmemcachedClient);
}
+ /**
+ * Get a key from the server.
+ */
abstract public Future<ChannelBuffer> get(String key);
+
+
+ /**
+ * Get a set of keys from the server.
+ * @return a Map[String, ChannelBuffer] of all of the keys that the server had.
+ */
abstract public Future<Map<String, ChannelBuffer>> get(List<String> keys);
+
+ /**
+ * Store a key. Override an existing values.
+ */
abstract public Future<Response> set(String key, ChannelBuffer value);
+
+ /**
+ * Store a key but only if it doesn't already exist on the server.
+ */
abstract public Future<Response> add(String key, ChannelBuffer value);
+
+ /**
+ * Append a set of bytes to the end of an existing key. If the key doesn't
+ * exist, the operation has no effect.
+ */
abstract public Future<Response> append(String key, ChannelBuffer value);
+
+ /**
+ * Prepend a set of bytes to the beginning of an existing key. If the key
+ * doesn't exist, the operation has no effect.
+ */
abstract public Future<Response> prepend(String key, ChannelBuffer value);
abstract public Future<Response> delete(String key);
+
+ /**
+ * Increment a key. Interpret the key as an integer if it is parsable.
+ * This operation has no effect if there is no value there already.
+ * A common idiom is to set(key, ""), incr(key).
+ */
abstract public Future<Integer> incr(String key);
abstract public Future<Integer> incr(String key, int delta);
+
+ /**
+ * Decrement a key. Interpret the key as an integer if it is parsable.
+ * This operation has no effect if there is no value there already.
+ */
abstract public Future<Integer> decr(String key);
abstract public Future<Integer> decr(String key, int delta);
@@ -33,15 +81,15 @@ public static Client newInstance(Service<Command, Response> finagleClient) {
}
public Future<Response> add(String key, String value) {
- return this.set(key, toChannelBuffer(value));
+ return this.add(key, toChannelBuffer(value));
}
public Future<Response> append(String key, String value) {
- return this.set(key, toChannelBuffer(value));
+ return this.append(key, toChannelBuffer(value));
}
public Future<Response> prepend(String key, String value) {
- return this.set(key, toChannelBuffer(value));
+ return this.prepend(key, toChannelBuffer(value));
}
private ChannelBuffer toChannelBuffer(String value) {
Oops, something went wrong.

0 comments on commit 0a3ceae

Please sign in to comment.