Permalink
Browse files

Merge branch 'master' of github.com:twitter/finagle

  • Loading branch information...
Nick Kallen
Nick Kallen committed Feb 22, 2011
2 parents 52d00c8 + 89ac034 commit a86f2f63535a171cef097624532e5996a72ae5e4
@@ -15,7 +15,9 @@ class ConnectionFailedException extends ChannelException
class ChannelClosedException extends ChannelException
class SpuriousMessageException extends ChannelException
class IllegalMessageException extends ChannelException
-class UnknownChannelException(e: Throwable) extends ChannelException
+class UnknownChannelException(e: Throwable) extends ChannelException {
+ override def toString = "%s: %s".format(super.toString, e.toString)
+}
class WriteException(e: Throwable) extends ChannelException {
override def toString = "%s: %s".format(super.toString, e.toString)
}
@@ -80,28 +80,28 @@ case class ClientBuilder[Req, Rep](
false // startTls
)
- override def toString() = {
- val options = Seq(
- "name" -> _name,
- "cluster" -> _cluster,
- "protocol" -> _protocol,
- "connectionTimeout" -> Some(_connectionTimeout),
- "requestTimeout" -> Some(_requestTimeout),
- "statsReceiver" -> _statsReceiver,
- "loadStatistics" -> _loadStatistics,
- "hostConnectionLimit" -> Some(_hostConnectionLimit),
- "hostConnectionCoresize" -> Some(_hostConnectionCoresize),
- "hostConnectionIdleTime" -> Some(_hostConnectionIdleTime),
- "hostConnectionMaxIdleTime" -> Some(_hostConnectionMaxIdleTime),
- "sendBufferSize" -> _sendBufferSize,
- "recvBufferSize" -> _recvBufferSize,
- "retries" -> _retries,
- "logger" -> _logger,
- "channelFactory" -> _channelFactory,
- "tls" -> _tls,
- "startTls" -> _startTls
- )
+ private[this] def options = Seq(
+ "name" -> _name,
+ "cluster" -> _cluster,
+ "protocol" -> _protocol,
+ "connectionTimeout" -> Some(_connectionTimeout),
+ "requestTimeout" -> Some(_requestTimeout),
+ "statsReceiver" -> _statsReceiver,
+ "loadStatistics" -> _loadStatistics,
+ "hostConnectionLimit" -> Some(_hostConnectionLimit),
+ "hostConnectionCoresize" -> Some(_hostConnectionCoresize),
+ "hostConnectionIdleTime" -> Some(_hostConnectionIdleTime),
+ "hostConnectionMaxIdleTime" -> Some(_hostConnectionMaxIdleTime),
+ "sendBufferSize" -> _sendBufferSize,
+ "recvBufferSize" -> _recvBufferSize,
+ "retries" -> _retries,
+ "logger" -> _logger,
+ "channelFactory" -> _channelFactory,
+ "tls" -> _tls,
+ "startTls" -> _startTls
+ )
+ override def toString() = {
"ClientBuilder(%s)".format(
options flatMap {
case (k, Some(v)) => Some("%s=%s".format(k, v))
@@ -18,7 +18,7 @@ import com.twitter.util.Duration
import com.twitter.conversions.time._
import com.twitter.finagle._
-import com.twitter.finagle.tracing.{TraceReceiver, TracingFilter}
+import com.twitter.finagle.tracing.{TraceReceiver, TracingFilter, NullTraceReceiver}
import com.twitter.finagle.util.Conversions._
import com.twitter.finagle.util._
import com.twitter.finagle.util.Timer._
@@ -60,33 +60,58 @@ case class ServerBuilder[Req, Rep](
_recvBufferSize: Option[Int],
_bindTo: Option[SocketAddress],
_logger: Option[Logger],
- _tls: Option[SSLContext],
+ _tls: Option[(String, String)],
_startTls: Boolean,
_channelFactory: Option[ReferenceCountedChannelFactory],
_maxConcurrentRequests: Option[Int],
_hostConnectionMaxIdleTime: Option[Duration],
_requestTimeout: Option[Duration],
- _traceReceiver: Option[TraceReceiver])
+ _traceReceiver: TraceReceiver)
{
import ServerBuilder._
def this() = this(
- None, // codec
- None, // statsReceiver
- None, // name
- None, // sendBufferSize
- None, // recvBufferSize
- None, // bindTo
- None, // logger
- None, // tls
- false, // startTls
- None, // channelFactory
- None, // maxConcurrentRequests
- None, // hostConnectionMaxIdleTime
- None, // requestTimeout
- None // traceReceiver
+ None, // codec
+ None, // statsReceiver
+ None, // name
+ None, // sendBufferSize
+ None, // recvBufferSize
+ None, // bindTo
+ None, // logger
+ None, // tls
+ false, // startTls
+ None, // channelFactory
+ None, // maxConcurrentRequests
+ None, // hostConnectionMaxIdleTime
+ None, // requestTimeout
+ new NullTraceReceiver // traceReceiver
)
+ private[this] def options = Seq(
+ "codec" -> _codec,
+ "statsReceiver" -> _statsReceiver,
+ "name" -> _name,
+ "sendBufferSize" -> _sendBufferSize,
+ "recvBufferSize" -> _recvBufferSize,
+ "bindTo" -> _bindTo,
+ "logger" -> _logger,
+ "tls" -> _tls,
+ "startTls" -> Some(_startTls),
+ "channelFactory" -> _channelFactory,
+ "maxConcurrentRequests" -> _maxConcurrentRequests,
+ "hostConnectionMaxIdleTime" -> _hostConnectionMaxIdleTime,
+ "requestTimeout" -> _requestTimeout,
+ "traceReceiver" -> Some(_traceReceiver)
+ )
+
+ override def toString() = {
+ "ServerBuilder(%s)".format(
+ options flatMap {
+ case (k, Some(v)) => Some("%s=%s".format(k, v))
+ case _ => None
+ } mkString(", "))
+ }
+
def codec[Req1, Rep1](codec: Codec[Req1, Rep1]) =
copy(_codec = Some(codec))
@@ -107,7 +132,7 @@ case class ServerBuilder[Req, Rep](
def logger(logger: Logger) = copy(_logger = Some(logger))
def tls(certificatePath: String, keyPath: String) =
- copy(_tls = Some(Ssl.server(certificatePath, keyPath)))
+ copy(_tls = Some((certificatePath, keyPath)))
def startTls(value: Boolean) =
copy(_startTls = true)
@@ -122,7 +147,7 @@ case class ServerBuilder[Req, Rep](
copy(_requestTimeout = Some(howlong))
def traceReceiver(receiver: TraceReceiver) =
- copy(_traceReceiver = Some(receiver))
+ copy(_traceReceiver = receiver)
private[this] def scopedStatsReceiver =
_statsReceiver map { sr => _name map (sr.scope(_)) getOrElse sr }
@@ -173,8 +198,8 @@ case class ServerBuilder[Req, Rep](
}
// SSL comes first so that ChannelSnooper gets plaintext
- _tls foreach { ctx =>
- val sslEngine = ctx.createSSLEngine()
+ _tls foreach { case (certificatePath, keyPath) =>
+ val sslEngine = Ssl.server(certificatePath, keyPath).createSSLEngine()
sslEngine.setUseClientMode(false)
sslEngine.setEnableSessionCreation(true)
@@ -222,9 +247,7 @@ case class ServerBuilder[Req, Rep](
// This has to go last (ie. first in the stack) so that
// protocol-specific trace support can override our generic
// one here.
- _traceReceiver foreach { traceReceiver =>
- service = (new TracingFilter(traceReceiver)) andThen service
- }
+ service = (new TracingFilter(_traceReceiver)) andThen service
// Register the channel so we can wait for them for a
// drain. We close the socket but wait for all handlers to
@@ -291,6 +314,14 @@ case class ServerBuilder[Req, Rep](
bs.releaseExternalResources()
Timer.default.stop()
}
+
+ override def toString = {
+ "Server(%s)".format(
+ options flatMap {
+ case (k, Some(v)) => Some("%s=%s".format(k, v))
+ case _ => None
+ } mkString(", "))
+ }
}
}
}
@@ -213,7 +213,7 @@ object Ssl {
class NoSuitableSslProvider(message: String) extends Exception(message: String)
- def fileMustExist(path: String) =
+ private[this] def fileMustExist(path: String) =
require(new File(path).exists(), "File '%s' does not exist.".format(path))
/**
@@ -55,6 +55,15 @@ case class Span(
Span.timeFormat.format(startTime),
(endTime - startTime).inMilliseconds)
}
+
+ def print() {
+ transcript foreach { record =>
+ val atMs = (record.timestamp - startTime).inMilliseconds
+ record.message.split("\n") foreach { line =>
+ println("%s %03dms: %s".format(traceID, atMs, line))
+ }
+ }
+ }
}
object Trace {
@@ -80,7 +89,7 @@ object Trace {
def clear() {
current.clear()
}
-
+
def startSpan() {
this() = newSpan()
}
@@ -13,8 +13,15 @@ trait TraceReceiver {
def receiveSpan(span: Span): Unit
}
+class NullTraceReceiver extends TraceReceiver {
+ def receiveSpan(span: Span) {/*ignore*/}
+}
+
+/**
+ * Pretty-print the span together with the transcript on the console.
+ */
class ConsoleTraceReceiver extends TraceReceiver {
def receiveSpan(span: Span) {
- println(span)
+ span.print()
}
}
@@ -0,0 +1,29 @@
+package com.twitter.finagle.builder
+
+/**
+ * Make a com.twitter.admin.Service from a finagle ServerBuilder.
+ */
+
+import com.twitter.admin
+import com.twitter.util.Duration
+import com.twitter.conversions.time._
+
+import com.twitter.finagle.Service
+
+class ServerBuildertoTwitterService[Req, Rep](
+ builder: ServerBuilder[Req, Rep],
+ service: Service[Req, Rep],
+ gracePeriod: Duration = 10.seconds)
+ extends admin.Service
+{
+ private[this] var server: Option[Server] = None
+
+ def start() {
+ if (!server.isDefined)
+ server = Some(builder.build(service))
+ }
+
+ def shutdown() {
+ server foreach { _.close(gracePeriod) }
+ }
+}
@@ -13,8 +13,6 @@ class OstrichStatsReceiver extends StatsReceiver {
private[this] val name_ = variableName(name)
def add(value: Float) {
- // TODO: can't really implement this properly... should we take
- // the average over `count' and add `count' samples?
Stats.addMetric(name_, value.toInt)
}
}

0 comments on commit a86f2f6

Please sign in to comment.