Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

add a build() variant that takes a ClientConnection. add connected() …

…callback for Service.
  • Loading branch information...
commit d1f2b4f9ce466b4bc05a11d906498e26e65a1137 1 parent 02d9f5d
Robey Pointer authored
View
20 finagle-core/src/main/scala/com/twitter/finagle/Service.scala
@@ -3,6 +3,8 @@ package com.twitter.finagle
import com.twitter.util.Future
import com.twitter.finagle.service.RefcountedService
+import java.net.SocketAddress
+import org.jboss.netty.channel.Channel
/**
* A Service is an asynchronous function from Request to Future[Response]. It is the
@@ -15,6 +17,7 @@ abstract class Service[-Req, +Rep] extends (Req => Future[Rep]) {
def map[Req1](f: Req1 => Req) = new Service[Req1, Rep] {
def apply(req1: Req1) = Service.this.apply(f(req1))
override def release() = Service.this.release()
+ override def connected() = Service.this.connected()
}
/**
@@ -28,6 +31,8 @@ abstract class Service[-Req, +Rep] extends (Req => Future[Rep]) {
*/
def release() = ()
+ def connected() = ()
+
/**
* Determines whether this service is available (can accept requests
* with a reasonable likelihood of success).
@@ -35,6 +40,19 @@ abstract class Service[-Req, +Rep] extends (Req => Future[Rep]) {
def isAvailable: Boolean = true
}
+class ClientConnection {
+ // tricky. we can't fill in the Channel till the first event call from netty.
+ private[finagle] var channel: Channel = null
+
+ def remoteAddress: SocketAddress = channel.getRemoteAddress
+
+ def localAddress: SocketAddress = channel.getLocalAddress
+
+ def close() {
+ channel.disconnect()
+ }
+}
+
abstract class ServiceFactory[-Req, +Rep] {
/**
* Reserve the use of a given service instance. This pins the
@@ -114,6 +132,7 @@ abstract class Filter[-ReqIn, +RepOut, +ReqOut, -RepIn]
Filter.this.apply(request, new Service[ReqOut, RepIn] {
def apply(request: ReqOut): Future[RepIn] = next(request, service)
override def release() = service.release()
+ override def connected() = service.connected()
override def isAvailable = service.isAvailable
})
}
@@ -132,6 +151,7 @@ abstract class Filter[-ReqIn, +RepOut, +ReqOut, -RepIn]
def apply(request: ReqIn) = Filter.this.apply(request, refcounted)
override def release() = refcounted.release()
+ override def connected() = refcounted.connected()
override def isAvailable = refcounted.isAvailable
}
View
12 finagle-core/src/main/scala/com/twitter/finagle/builder/ServerBuilder.scala
@@ -242,11 +242,16 @@ class ServerBuilder[Req, Rep](val config: ServerConfig[Req, Rep]) {
def build(service: Service[Req, Rep]): Server = build(() => service)
/**
+ * Construct the Server, given the provided Service factory.
+ */
+ def build(serviceFactory: () => Service[Req, Rep]): Server = build(_ => serviceFactory())
+
+ /**
* Construct the Server, given the provided ServiceFactory. This
* is useful if the protocol is stateful (e.g., requires authentication
* or supports transactions).
*/
- def build(serviceFactory: () => Service[Req, Rep]): Server = {
+ def build(serviceFactory: ClientConnection => Service[Req, Rep]): Server = {
config.assertValid()
val scopedStatsReceiver =
@@ -363,8 +368,9 @@ class ServerBuilder[Req, Rep](val config: ServerConfig[Req, Rep]) {
queueingChannelHandler foreach { pipeline.addLast("queue", _) }
// Compose the service stack.
+ val clientConnection = new ClientConnection()
var service: Service[Req, Rep] = {
- val underlying = serviceFactory()
+ val underlying = serviceFactory(clientConnection)
val prepared = codec.prepareService(underlying)
new ProxyService(prepared)
}
@@ -411,7 +417,7 @@ class ServerBuilder[Req, Rep](val config: ServerConfig[Req, Rep]) {
// complete (to drain them individually.) Note: this would be
// complicated by the presence of pipelining.
val channelHandler = new ServiceToChannelHandler(
- service, scopedStatsReceiver getOrElse NullStatsReceiver)
+ service, scopedStatsReceiver getOrElse NullStatsReceiver, Some(clientConnection))
val handle = new ChannelHandle {
def close() =
View
16 finagle-core/src/main/scala/com/twitter/finagle/channel/ServiceToChannelHandler.scala
@@ -11,7 +11,7 @@ import com.twitter.util.{Future, Promise, Return, Throw}
import com.twitter.finagle.util.Conversions._
import com.twitter.finagle.stats.{StatsReceiver, NullStatsReceiver}
-import com.twitter.finagle.{CodecException, Service, WriteTimedOutException}
+import com.twitter.finagle.{ClientConnection, CodecException, Service, WriteTimedOutException}
private[finagle] object ServiceToChannelHandler {
// valid transitions are:
@@ -27,16 +27,19 @@ private[finagle] object ServiceToChannelHandler {
private[finagle] class ServiceToChannelHandler[Req, Rep](
service: Service[Req, Rep],
statsReceiver: StatsReceiver,
- log: Logger)
+ log: Logger,
+ clientConnection: Option[ClientConnection])
extends ChannelClosingHandler
{
import ServiceToChannelHandler._
import State._
+ def this(service: Service[Req, Rep], statsReceiver: StatsReceiver, clientConnection: Option[ClientConnection]) =
+ this(service, statsReceiver, Logger.getLogger(getClass.getName), clientConnection)
def this(service: Service[Req, Rep], statsReceiver: StatsReceiver) =
- this(service, statsReceiver, Logger.getLogger(getClass.getName))
+ this(service, statsReceiver, Logger.getLogger(getClass.getName), None)
def this(service: Service[Req, Rep]) =
- this(service, NullStatsReceiver)
+ this(service, NullStatsReceiver, None)
private[this] val state = new AtomicReference[State](Idle)
private[this] val onShutdownPromise = new Promise[Unit]
@@ -109,6 +112,11 @@ private[finagle] class ServiceToChannelHandler[Req, Rep](
}
}
+ override def channelConnected(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
+ clientConnection.foreach { _.channel = ctx.getChannel }
+ service.connected()
+ }
+
override def channelClosed(ctx: ChannelHandlerContext, e: ChannelStateEvent) {
shutdown()
}
View
1  finagle-core/src/main/scala/com/twitter/finagle/service/ProxyService.scala
@@ -62,5 +62,6 @@ class ProxyService[Req, Rep](underlyingFuture: Future[Service[Req, Rep]])
def apply(request: Req): Future[Rep] = proxy(request)
override def release() = proxy.release()
+ override def connected() = proxy.connected()
override def isAvailable = proxy.isAvailable
}
View
1  finagle-core/src/main/scala/com/twitter/finagle/service/RefcountedService.scala
@@ -15,6 +15,7 @@ private[finagle] class RefcountedService[Req, Rep](underlying: Service[Req, Rep]
override def isAvailable = underlying.isAvailable
override final def release() = replyLatch.await { doRelease() }
+ override def connected() = underlying.connected()
protected def doRelease() = underlying.release()
}
Please sign in to comment.
Something went wrong with that request. Please try again.