Skip to content

Commit

Permalink
Checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
bierbaum committed Nov 12, 2010
1 parent 8caa347 commit 83facc3
Show file tree
Hide file tree
Showing 7 changed files with 188 additions and 34 deletions.
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package com.twitter.finagle.channel

import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.bootstrap._
import org.jboss.netty.channel._

class BrokerClientBootstrap(channelFactory: ChannelFactory)
extends ClientBootstrap
{
def this() = this(null)
class BrokerServerBootstrap(channelFactory: ChannelFactory)
extends ServerBootstrap(channelFactory) with BrokerBootstrap

if (channelFactory ne null)
setFactory(channelFactory)
class BrokerClientBootstrap(channelFactory: ChannelFactory)
extends ClientBootstrap(channelFactory) with BrokerBootstrap

abstract sealed trait BrokerBootstrap <: Bootstrap
{
override def getPipelineFactory = {
val outerFactory = super.getPipelineFactory
new ChannelPipelineFactory {
Expand Down
37 changes: 12 additions & 25 deletions src/main/scala/com/twitter/finagle/client/Builder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,7 @@ import com.twitter.finagle.channel._
import com.twitter.finagle.http.RequestLifecycleSpy
import com.twitter.finagle.thrift.ThriftClientCodec
import com.twitter.finagle.util._

sealed abstract class Codec {
val pipelineFactory: ChannelPipelineFactory
}
import com.twitter.finagle._

object Http extends Codec {
val pipelineFactory =
Expand Down Expand Up @@ -51,9 +48,6 @@ object Codec {
val thrift = Thrift
}

sealed abstract class StatsReceiver
case class Ostrich(provider: ostrich.StatsProvider) extends StatsReceiver

object Builder {
def apply() = new Builder
def get() = apply()
Expand All @@ -63,28 +57,21 @@ object Builder {
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())

case class Timeout(value: Long, unit: TimeUnit) {
def duration = Duration.fromTimeUnit(value, unit)
}

def parseHosts(hosts: String): java.util.List[InetSocketAddress] = {
val hostPorts = hosts split Array(' ', ',') filter (_ != "") map (_.split(":"))
hostPorts map { hp => new InetSocketAddress(hp(0), hp(1).toInt) } toList
}
}

class IncompleteClientSpecification(message: String)
extends Exception(message)

// We're nice to java.
case class Builder(
_hosts: Option[Seq[InetSocketAddress]],
_codec: Option[Codec],
_connectionTimeout: Builder.Timeout,
_requestTimeout: Builder.Timeout,
_connectionTimeout: Timeout,
_requestTimeout: Timeout,
_statsReceiver: Option[StatsReceiver],
_sampleWindow: Builder.Timeout,
_sampleGranularity: Builder.Timeout,
_sampleWindow: Timeout,
_sampleGranularity: Timeout,
_name: Option[String],
_hostConnectionLimit: Option[Int],
_sendBufferSize: Option[Int],
Expand All @@ -94,11 +81,11 @@ case class Builder(
def this() = this(
None, // hosts
None, // codec
Builder.Timeout(Long.MaxValue, TimeUnit.MILLISECONDS), // connectionTimeout
Builder.Timeout(Long.MaxValue, TimeUnit.MILLISECONDS), // requestTimeout
Timeout(Long.MaxValue, TimeUnit.MILLISECONDS), // connectionTimeout
Timeout(Long.MaxValue, TimeUnit.MILLISECONDS), // requestTimeout
None, // statsReceiver
Builder.Timeout(10, TimeUnit.MINUTES), // sampleWindow
Builder.Timeout(10, TimeUnit.SECONDS), // sampleGranularity
Timeout(10, TimeUnit.MINUTES), // sampleWindow
Timeout(10, TimeUnit.SECONDS), // sampleGranularity
None, // name
None, // hostConnectionLimit
None, // sendBufferSize
Expand Down Expand Up @@ -140,9 +127,9 @@ case class Builder(
def build() = {
val (hosts, codec) = (_hosts, _codec) match {
case (None, _) =>
throw new IncompleteClientSpecification("No hosts were specified")
throw new IncompleteConfiguration("No hosts were specified")
case (_, None) =>
throw new IncompleteClientSpecification("No codec was specified")
throw new IncompleteConfiguration("No codec was specified")
case (Some(hosts), Some(codec)) =>
(hosts, codec)
}
Expand Down Expand Up @@ -176,7 +163,7 @@ case class Builder(
val granularity = _sampleGranularity.duration
val window = _sampleWindow.duration
if (window < granularity) {
throw new IncompleteClientSpecification(
throw new IncompleteConfiguration(
"window smaller than granularity!")
}
val numBuckets = math.max(1, window.inMilliseconds / granularity.inMilliseconds)
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/com/twitter/finagle/test/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import org.jboss.netty.handler.codec.http._

import net.lag.configgy.{Configgy, RuntimeEnvironment}
import com.twitter.ostrich
import com.twitter.finagle.client.{Client, Builder, Http, Ostrich}
import com.twitter.finagle._
import com.twitter.finagle.client.{Client, Builder, Http}

import com.twitter.util.{Return, Throw}

Expand Down
23 changes: 23 additions & 0 deletions src/main/scala/com/twitter/finagle/thrift/Common.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.twitter.finagle

import java.util.concurrent.TimeUnit

import org.jboss.netty.channel.ChannelPipelineFactory

import com.twitter.ostrich
import com.twitter.util.TimeConversions._
import com.twitter.util.Duration

abstract class Codec {
val pipelineFactory: ChannelPipelineFactory
}

sealed abstract class StatsReceiver
case class Ostrich(provider: ostrich.StatsProvider) extends StatsReceiver

class IncompleteConfiguration(message: String)
extends Exception(message)

case class Timeout(value: Long, unit: TimeUnit) {
def duration = Duration.fromTimeUnit(value, unit)
}
139 changes: 139 additions & 0 deletions src/main/scala/com/twitter/finagle/thrift/server/Builder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package com.twitter.finagle.server

import java.net.InetSocketAddress
import java.util.concurrent.{TimeUnit, Executors}

import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.channel._
import org.jboss.netty.handler.codec.http._
import org.jboss.netty.channel.socket.nio._

import com.twitter.finagle._
import com.twitter.finagle.util._
import com.twitter.finagle.thrift._

import com.twitter.ostrich
import com.twitter.util.TimeConversions._
import com.twitter.util.Duration

object Http extends Codec {
val pipelineFactory =
new ChannelPipelineFactory {
def getPipeline() = {
val pipeline = Channels.pipeline()
pipeline.addLast("httpCodec", new HttpServerCodec)
pipeline
}
}
}

object Thrift extends Codec {
val pipelineFactory =
new ChannelPipelineFactory {
def getPipeline() = {
val pipeline = Channels.pipeline()
pipeline.addLast("thriftCodec", new ThriftServerCodec)
pipeline
}
}
}

object Codec {
val http = Http
val thrift = Thrift
}

object Builder {
def apply() = new Builder()
def get() = apply()

val channelFactory =
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool())

}

case class Builder(
_port: Int,
_codec: Option[Codec],
_connectionTimeout: Timeout,
_requestTimeout: Timeout,
_statsReceiver: Option[StatsReceiver],
_sampleWindow: Timeout,
_sampleGranularity: Timeout,
_name: Option[String],
_sendBufferSize: Option[Int],
_recvBufferSize: Option[Int])
{
import Builder._

def this() = this(
0, // port (default, ephemeral)
None, // codec
Timeout(Long.MaxValue, TimeUnit.MILLISECONDS), // connectionTimeout
Timeout(Long.MaxValue, TimeUnit.MILLISECONDS), // requestTimeout
None, // statsReceiver
Timeout(10, TimeUnit.MINUTES), // sampleWindow
Timeout(10, TimeUnit.SECONDS), // sampleGranularity
None, // name
None, // sendBufferSize
None // recvBufferSize
)

def codec(codec: Codec) =
copy(_codec = Some(codec))

def connectionTimeout(value: Long, unit: TimeUnit) =
copy(_connectionTimeout = Timeout(value, unit))

def requestTimeout(value: Long, unit: TimeUnit) =
copy(_requestTimeout = Timeout(value, unit))

def reportTo(receiver: StatsReceiver) =
copy(_statsReceiver = Some(receiver))

def sampleWindow(value: Long, unit: TimeUnit) =
copy(_sampleWindow = Timeout(value, unit))

def sampleGranularity(value: Long, unit: TimeUnit) =
copy(_sampleGranularity = Timeout(value, unit))

def name(value: String) = copy(_name = Some(value))

def sendBufferSize(value: Int) = copy(_sendBufferSize = Some(value))
def recvBufferSize(value: Int) = copy(_recvBufferSize = Some(value))

def build() = {
if (!_codec.isDefined) throw new IncompleteConfiguration("No codec was specified")

val bs = new ServerBootstrap(channelFactory)
bs.setOption("tcpNoDelay", true) // XXX: right?
// bs.setOption("soLinger", 0) // XXX: (TODO)
bs.setOption("reuseAddress", true)
_sendBufferSize foreach { s => bs.setOption("sendBufferSize", s) }
_recvBufferSize foreach { s => bs.setOption("receiveBufferSize", s) }

val sampleRepository = new SampleRepository

// Construct sample stats.
val granularity = _sampleGranularity.duration
val window = _sampleWindow.duration
if (window < granularity) {
throw new IncompleteConfiguration(
"window smaller than granularity!")
}
val numBuckets = math.max(1, window.inMilliseconds / granularity.inMilliseconds)
val statsMaker = () => new TimeWindowedSample[ScalarSample](numBuckets.toInt, granularity)
val namePrefix = _name map ("%s_".format(_)) getOrElse ""

// new LoadBalancedBroker(statsBrokers)
}

// def buildClient[Request, Reply]() =
// new Server[HttpRequest, HttpResponse](build())

}

class Server(bootstrap: ServerBootstrap) {
}
4 changes: 4 additions & 0 deletions src/main/scala/com/twitter/finagle/thrift/server/Server.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
package com.twitter.finagle.serfver

class Server {
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ object BootstrapBrokerSpec extends Specification with Mockito {
Executors.newCachedThreadPool())

"throw an exception if the bootstrap has no remoteAddress option" in {
val bs = new BrokerClientBootstrap
val bs = new BrokerClientBootstrap(cf)
new BootstrapBroker(bs) must throwAn [IllegalArgumentException]

bs.setOption("remoteAddress", address)
Expand Down

0 comments on commit 83facc3

Please sign in to comment.