Permalink
Browse files

Merge branch 'master' of github.com:twitter/finagle

  • Loading branch information...
2 parents f39a52b + 068ec55 commit 81fdcd4fa0ee9531398b270338d4ad7c01cfd5d6 @mariusae mariusae committed Feb 18, 2011
Showing with 108 additions and 18 deletions.
  1. +108 −18 README.md
View
126 README.md
@@ -1,8 +1,8 @@
# Finagle
-## What is it?
+Finagle is an library for building *asynchronous* RPC servers and clients in Java, Scala, or any JVM language. Built atop [Netty](http://www.jboss.org/netty), Finagle provides a rich set of tools that are protocol independent.
-Finagle is an library for building *asynchronous* RPC servers and clients in Java, Scala, or any JVM language. Built atop [Netty](http://www.jboss.org/netty), Finagle provides a rich set of tools that are protocol independent:
+Finagle is flexible enough to support a variety of RPC styles, including request-response, streaming, and pipelining (e.g., HTTP pipelining and Redis pipelining). It also makes it easy to work with stateful RPC styles (e.g., those requiring authentication and those that support transactions).
### Client Features
@@ -28,11 +28,9 @@ Finagle is an library for building *asynchronous* RPC servers and clients in Jav
* Memcached/Kestrel
* More to come!
-Finagle is flexible enough to support a variety of RPC styles, including request-response, streaming, and pipelining (e.g., HTTP pipelining and Redis pipelining). It also makes it easy to work with stateful RPC styles (e.g., those requiring authentication and those that support transactions).
-
## How do I start?
-Here is a simple HTTP server and client:
+Here is a simple HTTP server and client. The server returns a simple HTTP 200 response:
### Scala
@@ -47,6 +45,8 @@ Here is a simple HTTP server and client:
.bindTo(address)
.build(service)
+The client connects to the server, and issues a simple HTTP GET request:
+
val client: Service[HttpRequest, HttpResponse] = ClientBuilder()
.codec(Http)
.hosts(address)
@@ -56,7 +56,9 @@ Here is a simple HTTP server and client:
val request: HttpRequest = new DefaultHttpRequest(HTTP_1_1, GET, "/")
val responseFuture: Future[HttpResponse] = client(request)
-### Java
+ // all done!
+ client.close()
+ server.close()
Note that the variable `responseFuture` in this example is of type `Future[HttpResponse]`, which represents an asynchronous HTTP response (i.e., a response that will arrive sometime later). With a `Future` object, you can express your program in either a synchronous or asynchronous style: the program can either 1) block, awaiting the response, or 2) provide a callback to be invoked when a response is available. For example,
@@ -70,11 +72,9 @@ Note that the variable `responseFuture` in this example is of type `Future[HttpR
println(response)
}
-### Java
-
`Futures` allow the programmer to easily express a number of powerful idioms such as pipelining, scatter-gather, timeouts, and error handling. See the section "Using Futures" for more information.
-## Services and Filters
+## Services
In Finagle, RPC Servers are built out of `Services` and `Filters`. A `Service` is a simply a function that receives a request and returns a `Future` of a response. For example, here is a service that increments a number by one.
@@ -84,18 +84,18 @@ In Finagle, RPC Servers are built out of `Services` and `Filters`. A `Service` i
def apply(request: Int) = Future { request + 1 } // (1)
}
-### Java
-
Note that `plusOneService` acts as if it were asynchronous despite that it is not doing any asynchronous work. It adheres to the `Service[Req, Rep]` contract by wrapping the synchronously computed response in a "constant" `Future` of type `Future[Int]`.
-More sophisticated `Services` than `plusOneService` might make truly asynchronous calls (e.g., by making further RPC calls or by scheduling work in a queue), and so the return type of `Future[Rep]` is the more general.
+More sophisticated `Services` than `plusOneService` might make truly asynchronous calls (e.g., by making further RPC calls or by scheduling work in a queue), and `apply` having a return type of `Future[Rep]` is general enough to handle most cases.
Once you have defined your `Service`, it can be bound to a SocketAddress, thus becoming an RPC Server:
ServerBuilder()
.bindTo(address)
.build(plusOneService)
+### Filters
+
However, an RPC Server must speak a specific protocol. One nice way to design an RPC Server is to decouple the protocol handling code from the implimentation of the business service. A `Filter` provides a easy way to do this.
Here is a `Filter` that adapts the `HttpRequest => HttpResponse` protocol to the `Int => Int` `plusOneService`:
@@ -116,8 +116,6 @@ Here is a `Filter` that adapts the `HttpRequest => HttpResponse` protocol to the
val httpPlusOneService: Service[HttpRequest, HttpResponse] =
httpToIntFilter.andThen(plusOneService) // (3)
-### Java
-
This example illustrates three important concepts:
1. A `Filter` wraps a `Service` and (potentially) converts the input and output types of the service to other types. Here `Int => Int` is mapped to `HttpRequest => HttpResponse`.
@@ -248,11 +246,103 @@ In the example below, we define a function `f` that takes an `Int` and returns a
case Throw(e) => ...
}
-### Java
+In addition to waiting for results to return, `Futures` can be transformed in interesting ways. For instance, it is possible to convert a `Future[String]` to a `Future[Int]` by using `map`:
+
+ val stringFuture: Future[String] = Future("1")
+ val intFuture: Future[Int] = stringFuture map (_.toInt)
-`Futures` employ a powerful set of combinators including `map`, `flatMap`, and `foreach`.
+Similar to `map`, there is `flatMap`. This allows you to easily "pipeline" a sequence of `Futures`:
+
+ val authenticateUser: Future[User] = User.authenticate(email, password)
+ val lookupTweets: Future[Seq[Tweet]] = authenticateUser flatMap { user =>
+ Tweet.findAllByUser(user)
+ }
+
+In this example, `Tweet.findAllByUser(user)` is a function of type `User => Future[Seq[Tweet]]`.
+
+As a final example of `Futures` let's consider the problem of scatter/gather patterns. The challenge is to issue a series of requests in parallel and wait for all of them to arrive. Suppose for example, we have a sequence of `Futures`. To wait for all to return, we do like so:
+
+ val myFutures: Seq[Future[Int]] = ...
+
+ val waitTillAllComplete: Future[Seq[Int]] = Future.join(myFutures)
+
+In this example, `waitTillAllComplete` is a `Future[Unit]`. `Futures` of this type are often used to indicate when something has completed, but there is no value to return (or the value is available elsewhere).
+
+A more complex variation of scatter/gather is to perform a sequence of asynchronous operations and harvest only those that return within a certain time -- using a default value for those that don't return. A concrete example might be to issue a set of parallel requests to N partitions of a search index; those that don't return in time we consider to have returned the empty set.
+
+ import com.twitter.finagle.util.Timer._
+
+ val results: Seq[Future[Result]] = partitions.map { partition =>
+ partition.get(query).within(1.second) handle {
+ case _: TimeoutException => EmptyResult
+ }
+ }
+ val allResults: Future[Seq[Result]] = Future.join(timedResults)
+
+ // Process the results asynchronously.
+ // Note: this takes no longer than 1 second.
+ allResults onSuccess { results =>
+ println(results)
+ }
## Streaming Protocols
-Some incomplete API documentation is available:
-See [scaladoc](http://twitter.github.com/finagle/).
+Finagle makes streaming and pubsub-like RPCs easy. Streams rely on a generalization of `Futures` called `Channels`. `Channels` represent a stream of events that can be listened to. To publish and subscribe to a `Channel`, do the following:
+
+ // a ChannelSource is a readable-writable stream of messages, whereas a Channel is only readable.
+ val source = new ChannelSource[Int]
+ val sink: Channel = source
+
+ // start listening for messages:
+ val observer1 = sink receive { message =>
+ Future { println("1: " + message) } // (1)
+ }
+ val observer2 = sink receive { message => // (2)
+ Future { println("2: " + message) }
+ }
+
+ // send some messages on the channel
+ source send(1)
+ source send(2)
+ // etc.
+
+ // stop listening for messages:
+ observer1.dispose()
+
+### Notes
+
+1. Subscribers must return a `Future` indicating when processing of the received message is complete. This allows consumers to exhibit backpressure to producers.
+1. Just as with `Futures` there can be any number of receivers to a `Channel`.
+
+`Channels` have some of the usual sequence operations such as `map` and `filter`:
+
+ val channel = new ChannelSource[Int]
+ val evenChannel = channel filter (_ % 2 == 0)
+ val stringChannel = channel map (_.toString)
+
+Most of the tricky issues concerning `Channels` involve data-loss and backpressure. In order to ensure that no data is lost, a typical pattern is not to broadcast on a `Channel` until there is at least one subscriber. This is easily accomplished:
+
+ channel.receives.first { _ => // (1) (2)
+ // the first receiver has arrived, start sending!
+ // ...
+ }
+
+### Notes
+
+1. `channel.receives` is a `Channel` itself. In other words, every `Channel` has a sub-channel indicating when subscribers subscribe (and unsubscribe!).
+1. `channel.first` is a `Future` indicating when the first message arrives on that `Channel`.
+
+Backpressure (that is, slowing down production if consumers get backed up) is also easily accomplished. The `channel.send` method returns a `Seq[Future[Unit]]` -- a sequence of `Futures` indicating when all consumers have processed the message. You can use `Future.join` to slow down production in the following way:
+
+ Future.join(channel.send(1)) onSuccess { _ =>
+ Future.join(channel.send(2)) onSuccess { ... }
+ }
+
+With `Channels`, building a streaming RPC service is straightforward. You will need a codec that supports streaming (such as HTTP with chunked encoding). Then, build a `Service` that returns a `Channel`:
+
+ val channel = new ChannelSource[ChannelBuffer]
+ val myService = new Service[HttpRequest, Channel[ChannelBuffer]] {
+ def apply(request: HttpRequest) = Future.value(channel)
+ }
+
+Some incomplete API documentation is available: See [scaladoc](http://twitter.github.com/finagle/).

0 comments on commit 81fdcd4

Please sign in to comment.