A JVM Kestrel client that aggregates queues from multiple servers. Implemented in Scala with Java bindings. In use at Twitter for all JVM Search and Streaming Kestrel interactions.
Scala Java
Pull request Compare This branch is even with jreichhold:master.
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Failed to load latest commit information.



v1.0 2010/05/27

A JVM Kestrel client optimized for low-latency consumption of message
queues hosted across a cluster of Kestrel servers.  Connections are
opened to each Kestrel for each requested queue and each outstanding
read or write transaction.  Messages are read via Kestrel blocking
read transactions. Implemented in Scala, but callable from Java.

Robustness, visibility, efficiency, and low-latency.

A complete implementation of the memcache protocol.

Scala Actor dependency.

A deep or Scala-specific dependency chain.

A fair or uniform write distribution across a cluster.

Strict order preservation.

Low latency end-to-end read results by using Kestrel blocking

Highly-concurrent. Each transaction is concurrently hosted in its
own thread.

Multiple outstanding read or write transaction support.

Multiple Kestrel clusters can be supported by creating additional
instances of GrabbyHands.

Arbitrary user-defined read and write timeout behaviors are possible
by interacting directly with an internal Java Blocking Queue.

Internal queues provide additional, but bounded, liveness.

Writes may be cancelled while still waiting in local queues.

Writes may be made at-least-once transactional by calling

If at least one Kestrel is available, read and write progress can
be made.

Immutable configuration. The entire configuration is passed in at
instantiation and cannot be modified. Safety and simplicity over
flexibility and complexity.

Counters for everything.

See the Scala unit tests in src/test/scala/com/twitter/grabbyhands/*Spec.java
for more detailed examples.

  val queue = "example"
  val grabby = {
    val config = new Config()
    config.addServer(host + ":" + port)
    new GrabbyHands(config)
  val send = "payload"
  grabby.getSendQueue(queue).put(new Write(send))
  val receive = new String(grabby.getRecvQueue("example").poll(1, TimeUnit.SECONDS).array)
  receive must be_==(send)

See the Java unit test in src/test/java/com/twitter/grabbyhands/JavaTest.java
for a more detailed example.

  String queue = "example"
  Config config = new Config();
  config.addServer(host + ":" + port);
  GrabbyHands grabbyHands = new GrabbyHands(config);
  String send = "payload";
  grabbyHands.getSendQueue(queue).put(new Write(send));
  String receive = new String(grabbyHands.getRecvQueue(queue).poll(1, TimeUnit.SECONDS));
  assertEquals(send, receive);

Some options can be overridden on a per queue basis by setting
values in the QueueConfig object returned by addQueue().

Scala users can generally assign values:
  val config = new Config()
  config.recvNumConnections = 2
  val queueConfigs = config.addQueues(Array(queue1, queue2))
  // override queue1, leaving queue2 at default of 2
  queueConfig(queue1).recvNumConnections = 4

Java users should probably use the bean-style accessors:

Generally one transaction per direction (send or receive) provides
sufficient throughput and latency, especially when reading from a
large Kestrel cluster. Demanding applications, or those reading
from a small Kestrel cluster, may wish to add a second or third
concurrent transaction in a given direction by setting sendNumConnections
and recvNumConnections.

Set maxMessageBytes to be larger than your largest expected message.
Excessive message size wastes a bit of memory and provides additional
garbage collection pressure.

Set kestrelReadTimeoutMs to be large enough to cover your typical,
if not maximal arrival gap, but not so large as to cause shutdown
latency issues. Larger is almost always better.

connectTimeoutMs is largely irrelevant, due to the threading model.
Excessive values may actually increase reconnection latency to a
recently recovered Kestrel server.

readTimeoutMs and writeTimeoutMs, when tuned correctly, might possibly
decrease down or struggling Kestrel server detection latency.  Again,
due to the threading model, this may be irrelevant, as traffic will
tend to naturally flow around a slow server.  Excessively small
values may cause needless transaction failures and retries.

reconnectHoldDownMs can reduce connection thrashing logging annoyance
when faced with a down Kestrel server. Excessively large values
will increase reconnection latency, but will not otherwise improve
throughput around an unavailable Kestrel. Excessively small values
will burn local CPU and network resources.

sendQueueDepth can increase write liveness by decoupling your application
from transaction completion latency. Excessive queue depth, when not
coupled with Write.awaitWrite() checking, may cause undesired uncommitted
write loss upon local process exit.

recvQueueDepth can increase read liveness by pipelining read messages
locally.  Excessive queue depth may cause undesired uncommitted read
loss upon local process exit.

All tests assume a kestrel process running on localhost:22133.

% ant test runs the normal tests that are run during ant package.
% ant stress will run a small stress test suite and give some performance numbers.
% ant javatest will run the java example.

Apache 2.0 License. See included license file.