Skip to content

Commit

Permalink
channel-based kestrel client
Browse files Browse the repository at this point in the history
  • Loading branch information
Nick Kallen committed Jan 27, 2011
1 parent 6ecd5d1 commit 8f4d3ef
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object ClientSpec extends Specification {
withServer(closingHandler) { clientBuilder =>
val client = clientBuilder.build()
val future = client(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"))
val resolved = future within(1.second)
val resolved = future get(1.second)
resolved.isThrow must beTrue
val Throw(cause) = resolved
cause must haveClass[ChannelClosedException]
Expand All @@ -67,7 +67,7 @@ object ClientSpec extends Specification {
.retries(10)
.build()
val future = client(new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"))
val resolved = future within(1.second)
val resolved = future get(1.second)
resolved.isThrow must beTrue
val Throw(cause) = resolved
cause must haveClass[ChannelClosedException]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.twitter.conversions.time._
import com.twitter.finagle.Service
import com.twitter.finagle.kestrel.protocol._
import com.twitter.finagle.memcached.util.ChannelBufferUtils._
import com.twitter.concurrent.{Channel, Topic}

object Client {
def apply(raw: Service[Command, Response]): Client = {
Expand All @@ -18,17 +19,7 @@ trait Client {
def get(queueName: String, waitUpTo: Duration = 0.seconds): Future[Option[ChannelBuffer]]
def delete(queueName: String): Future[Response]
def flush(queueName: String): Future[Response]
def receive(queueName: String, waitUpTo: Duration = 0.seconds)(f: ChannelBuffer => Unit): Task
}

class Task {
private[this] var _isCancelled = false

def cancel() {
_isCancelled = true
}

def isCancelled = _isCancelled
def channel(queueName: String, waitUpTo: Duration = 0.seconds): Channel[ChannelBuffer]
}

protected class ConnectedClient(underlying: Service[Command, Response]) extends Client {
Expand All @@ -51,29 +42,30 @@ protected class ConnectedClient(underlying: Service[Command, Response]) extends
}
}

def receive(queueName: String, waitUpTo: Duration = 0.seconds)(f: ChannelBuffer => Unit): Task = {
val task = new Task
receive0(queueName: String, task, waitUpTo, collection.Set(Open()), f)
task
def channel(queueName: String, waitUpTo: Duration = 10.seconds): Channel[ChannelBuffer] = {
val channel = new Topic[ChannelBuffer]
channel.onReceive {
receive(queueName, channel, waitUpTo, collection.Set(Open()))
}
channel
}

private[this] def receive0(queueName: String, task: Task, waitUpTo: Duration, options: collection.Set[GetOption], f: ChannelBuffer => Unit) {
if (!task.isCancelled) {
private[this] def receive(queueName: String, channel: Topic[ChannelBuffer], waitUpTo: Duration, options: collection.Set[GetOption]) {
if (channel.isOpen) {
underlying(Get(queueName, collection.Set(Timeout(waitUpTo)) ++ options)) respond {
case Return(Values(Seq(Value(key, item)))) =>
val options = collection.mutable.Set[GetOption]()
options += Open()
try {
f(item)
options += Close()
channel.send(item)
receive(queueName, channel, waitUpTo, collection.Set(Close(), Open()))
} catch {
case e => options += Abort()
case e =>
underlying(Get(queueName, collection.Set(Abort())))
channel.close()
}
receive0(queueName, task, waitUpTo, options, f)
case Return(Values(Seq())) =>
receive0(queueName, task, waitUpTo, collection.Set(Open()), f)
receive(queueName, channel, waitUpTo, collection.Set(Open()))
case Throw(e) =>
// unsure -- FIXME!
channel.close()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@ import com.twitter.finagle.kestrel.protocol.Kestrel
import com.twitter.finagle.kestrel.Client
import org.jboss.netty.util.CharsetUtil
import com.twitter.finagle.memcached.util.ChannelBufferUtils._
import collection.mutable.ListBuffer
import com.twitter.util.CountDownLatch
import com.twitter.concurrent.Value
import com.twitter.conversions.time._
import org.jboss.netty.buffer.ChannelBuffer


object ClientSpec extends Specification {
"ConnectedClient" should {
Expand All @@ -18,12 +24,53 @@ object ClientSpec extends Specification {

client.delete("foo")()


"set & get" in {
skip("yarr")
client.get("foo")() mustEqual None
client.set("foo", "bar")()
client.get("foo")().get.toString(CharsetUtil.UTF_8) mustEqual "bar"
}

"receive" in {
"no errors" in {
skip("yar")
val result = new ListBuffer[String]
client.set("foo", "bar")()
client.set("foo", "baz")()
client.set("foo", "boing")()

val channel = client.channel("foo")
val latch = new CountDownLatch(3)
channel.foreach {
case item =>
result += item.toString(CharsetUtil.UTF_8)
latch.countDown()
}
latch.await(1.second)
channel.close ()
result mustEqual List("bar", "baz", "boing")
}

"transactionality in the presence of errors" in {
client.set("foo", "bar")()

var result: ChannelBuffer = null
var channel = client.channel("foo")
val latch = new CountDownLatch(1)
channel.foreach {
case item => throw new Exception
}
channel = client.channel("foo")
channel.foreach {
case item =>
result = item
latch.countDown()
}
latch.await(1.second)
channel.close()
result.toString(CharsetUtil.UTF_8) mustEqual "bar"
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class Server(address: SocketAddress) {

private[this] val serverSpec =
ServerBuilder()
.name("schmemcached")
.name("finagle")
.codec(new Memcached)
.bindTo(address)

Expand Down
2 changes: 1 addition & 1 deletion project/build/Project.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class Project(info: ProjectInfo) extends StandardParentProject(info)
val nettyRepo =
"repository.jboss.org" at "http://repository.jboss.org/nexus/content/groups/public/"
val netty = "org.jboss.netty" % "netty" % "3.2.3.Final"
val util = "com.twitter" % "util" % "1.4.13"
val util = "com.twitter" % "util" % "1.5.0"

val mockito = "org.mockito" % "mockito-all" % "1.8.5" % "test" withSources()
val specs = "org.scala-tools.testing" % "specs_2.8.0" % "1.6.5" % "test" withSources()
Expand Down

0 comments on commit 8f4d3ef

Please sign in to comment.