Skip to content

Commit

Permalink
Merge branch 'master' of github.com:twitter/finagle
Browse files Browse the repository at this point in the history
  • Loading branch information
mariusae committed Jan 9, 2011
2 parents 77ebff7 + 936d953 commit 1312c6c
Show file tree
Hide file tree
Showing 30 changed files with 194 additions and 245 deletions.
@@ -1,9 +1,9 @@
package com.twitter.finagle.builder; package com.twitter.finagle.builder;


import com.twitter.ostrich.Stats$; import com.twitter.finagle.stats.JavaLoggerStatsReceiver;
import com.twitter.finagle.stats.StatsReceiver;


public class Stats4J { public class Stats4J {
public static StatsReceiver Ostrich = new Ostrich(Stats$.MODULE$);
public static StatsReceiver Logger = public static StatsReceiver Logger =
new JavaLogger(java.util.logging.Logger.getLogger("Finagle")); new JavaLoggerStatsReceiver(java.util.logging.Logger.getLogger("Finagle"));
} }
Expand Up @@ -10,13 +10,13 @@ import java.util.concurrent.Executors
import org.jboss.netty.channel._ import org.jboss.netty.channel._
import org.jboss.netty.channel.socket.nio._ import org.jboss.netty.channel.socket.nio._


import com.twitter.ostrich
import com.twitter.util.Duration import com.twitter.util.Duration
import com.twitter.util.TimeConversions._ import com.twitter.util.TimeConversions._


import com.twitter.finagle.channel._ import com.twitter.finagle.channel._
import com.twitter.finagle.util._ import com.twitter.finagle.util._
import com.twitter.finagle.service import com.twitter.finagle.service
import com.twitter.finagle.stats.StatsReceiver


object ClientBuilder { object ClientBuilder {
def apply() = new ClientBuilder def apply() = new ClientBuilder
Expand Down Expand Up @@ -53,7 +53,6 @@ case class ClientBuilder(
_hostConnectionLimit: Option[Int], _hostConnectionLimit: Option[Int],
_sendBufferSize: Option[Int], _sendBufferSize: Option[Int],
_recvBufferSize: Option[Int], _recvBufferSize: Option[Int],
_exportLoadsToOstrich: Boolean,
_failureAccrualWindow: Duration, _failureAccrualWindow: Duration,
_retries: Option[Int], _retries: Option[Int],
_initialBackoff: Option[Duration], _initialBackoff: Option[Duration],
Expand All @@ -75,7 +74,6 @@ case class ClientBuilder(
None, // hostConnectionLimit None, // hostConnectionLimit
None, // sendBufferSize None, // sendBufferSize
None, // recvBufferSize None, // recvBufferSize
false, // exportLoadsToOstrich
10.seconds, // failureAccrualWindow 10.seconds, // failureAccrualWindow
None, // retries None, // retries
None, // initialBackoff None, // initialBackoff
Expand Down Expand Up @@ -129,8 +127,6 @@ case class ClientBuilder(
def sendBufferSize(value: Int): ClientBuilder = copy(_sendBufferSize = Some(value)) def sendBufferSize(value: Int): ClientBuilder = copy(_sendBufferSize = Some(value))
def recvBufferSize(value: Int): ClientBuilder = copy(_recvBufferSize = Some(value)) def recvBufferSize(value: Int): ClientBuilder = copy(_recvBufferSize = Some(value))


def exportLoadsToOstrich(): ClientBuilder = copy(_exportLoadsToOstrich = true)

def failureAccrualWindow(window: Duration): ClientBuilder = def failureAccrualWindow(window: Duration): ClientBuilder =
copy(_failureAccrualWindow = window) copy(_failureAccrualWindow = window)


Expand Down Expand Up @@ -252,11 +248,11 @@ case class ClientBuilder(


val broker = makeBroker(codec, statsRepo)(host) val broker = makeBroker(codec, statsRepo)(host)


if (_exportLoadsToOstrich) { _statsReceiver.foreach { statsReceiver =>
val hostString = host.toString val hostString = host.toString
ostrich.Stats.makeGauge(hostString + "_load") { broker.load } statsReceiver.makeGauge(hostString + "_load", broker.load)
ostrich.Stats.makeGauge(hostString + "_weight") { broker.weight } statsReceiver.makeGauge(hostString + "_weight", broker.weight)
ostrich.Stats.makeGauge(hostString + "_available") { if (broker.isAvailable) 1 else 0 } statsReceiver.makeGauge(hostString + "_available", if (broker.isAvailable) 1 else 0)
} }


broker broker
Expand Down
Expand Up @@ -13,11 +13,6 @@ trait Codec {
val serverPipelineFactory: ChannelPipelineFactory val serverPipelineFactory: ChannelPipelineFactory
} }


object StatsReporter4J {
val ostrich = Ostrich()
val logger = JavaLogger()
}


trait StatsReceiver {
def observer(prefix: String, label: String): (Seq[String], Int, Int) => Unit
}

This file was deleted.

This file was deleted.

Expand Up @@ -5,24 +5,21 @@ import scala.collection.JavaConversions._
import java.net.SocketAddress import java.net.SocketAddress
import java.util.concurrent.{Executors, LinkedBlockingQueue} import java.util.concurrent.{Executors, LinkedBlockingQueue}
import java.util.logging.Logger import java.util.logging.Logger
import javax.net.ssl.{KeyManager, SSLContext} import javax.net.ssl.SSLContext


import org.jboss.netty.bootstrap.ServerBootstrap import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.buffer._
import org.jboss.netty.channel._ import org.jboss.netty.channel._
import org.jboss.netty.handler.codec.http._
import org.jboss.netty.handler.ssl._ import org.jboss.netty.handler.ssl._
import org.jboss.netty.channel.socket.nio._ import org.jboss.netty.channel.socket.nio._


import com.twitter.ostrich
import com.twitter.util.TimeConversions._ import com.twitter.util.TimeConversions._
import com.twitter.util.{Duration, Time} import com.twitter.util.{Duration, Time}


import com.twitter.finagle._ import com.twitter.finagle._
import channel.{Job, QueueingChannelHandler} import channel.{Job, QueueingChannelHandler}
import com.twitter.finagle.util._ import com.twitter.finagle.util._
import com.twitter.finagle.service.{Service, ServicePipelineFactory} import com.twitter.finagle.service.{Service, ServicePipelineFactory}
import org.jboss.netty.util.HashedWheelTimer import stats.StatsReceiver


object ServerBuilder { object ServerBuilder {
def apply() = new ServerBuilder() def apply() = new ServerBuilder()
Expand Down
@@ -1,15 +1,9 @@
package com.twitter.finagle.channel package com.twitter.finagle.channel


import scala.util.Random import scala.util.Random
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._


import java.util.concurrent.atomic.AtomicInteger import com.twitter.util.{Time, Return, Throw, Future}
import java.util.concurrent.ConcurrentHashMap

import org.jboss.netty.channel.MessageEvent

import com.twitter.util.{Time, Duration, Return, Throw, Future}
import com.twitter.util.TimeConversions._ import com.twitter.util.TimeConversions._
import com.twitter.finagle.util._ import com.twitter.finagle.util._
import com.twitter.finagle.util.Conversions._ import com.twitter.finagle.util.Conversions._
Expand Down
@@ -1,13 +1,8 @@
package com.twitter.finagle.channel package com.twitter.finagle.channel


import java.net.SocketAddress import java.net.SocketAddress

import org.jboss.netty.util.HashedWheelTimer
import org.jboss.netty.util.{HashedWheelTimer, TimerTask, Timeout}

import com.twitter.finagle.util.{Cancelled, Error, Ok, TimerFuture}
import com.twitter.finagle.util.Conversions._ import com.twitter.finagle.util.Conversions._

import com.twitter.util.TimeConversions._
import com.twitter.util.{Duration, Future, Promise, Throw, Return} import com.twitter.util.{Duration, Future, Promise, Throw, Return}


object RetryingBroker { object RetryingBroker {
Expand Down Expand Up @@ -62,7 +57,7 @@ class NumTriesRetryStrategy(numTries: Int) extends RetryStrategy {
// number of tries need to be bumped by one. // number of tries need to be bumped by one.
if (numTries > 1) if (numTries > 1)
Future.value(new NumTriesRetryStrategy(numTries - 1)) Future.value(new NumTriesRetryStrategy(numTries - 1))
else else
Future.exception(new Exception) Future.exception(new Exception)
} }
} }
Expand All @@ -79,7 +74,7 @@ class ExponentialBackoffRetryStrategy(delay: Duration, multiplier: Int)


def apply() = { def apply() = {
val future = new Promise[RetryStrategy] val future = new Promise[RetryStrategy]

timer(delay) { timer(delay) {
future() = Return( future() = Return(
new ExponentialBackoffRetryStrategy(delay * multiplier, multiplier)) new ExponentialBackoffRetryStrategy(delay * multiplier, multiplier))
Expand Down
@@ -1,11 +1,8 @@
package com.twitter.finagle.channel package com.twitter.finagle.channel


import org.jboss.netty.util.{TimerTask, Timeout, Timer} import org.jboss.netty.util.Timer
import org.jboss.netty.channel.MessageEvent import com.twitter.util.{Duration, Throw}


import com.twitter.util.{Future, Duration, Promise, Return, Throw}

import com.twitter.finagle.util.TimerFuture
import com.twitter.finagle.util.Conversions._ import com.twitter.finagle.util.Conversions._


class TimeoutBroker(timer: Timer, val underlying: Broker, timeout: Duration) class TimeoutBroker(timer: Timer, val underlying: Broker, timeout: Duration)
Expand Down
@@ -0,0 +1,29 @@
package com.twitter.finagle.stats

import java.util.logging.Logger
import org.jboss.netty.util.HashedWheelTimer
import com.twitter.conversions.time._
import com.twitter.finagle.util.Conversions._

case class JavaLoggerStatsReceiver(logger: Logger) extends StatsReceiver {
val timer = new HashedWheelTimer()

def observer(prefix: String, label: String) = {
val suffix = "_%s".format(label)

(path: Seq[String], value: Int, count: Int) => {
val pathString = path mkString "__"
logger.info(List(prefix, pathString, suffix, count) mkString " ")
}
}

def makeGauge(name: String, f: => Float) {
timer(10.seconds) {
logger.info("%s %2f".format(name, f))
}
}
}

object JavaLoggerStatsReceiver {
def apply(): JavaLoggerStatsReceiver = JavaLoggerStatsReceiver(Logger.getLogger(getClass.getName))
}
@@ -0,0 +1,6 @@
package com.twitter.finagle.stats

trait StatsReceiver {
def observer(prefix: String, label: String): (Seq[String], Int, Int) => Unit
def makeGauge(name: String, f: => Float)
}
@@ -0,0 +1,6 @@
package com.twitter.finagle.stats





Expand Up @@ -3,30 +3,16 @@ package com.twitter.finagle.test
import java.util.logging.Logger import java.util.logging.Logger
import org.jboss.netty.handler.codec.http._ import org.jboss.netty.handler.codec.http._


import com.twitter.ostrich import com.twitter.finagle.builder.{ClientBuilder, Http}
import com.twitter.finagle.builder.{ClientBuilder, Http, Ostrich}
import com.twitter.finagle.service.Service import com.twitter.finagle.service.Service
import com.twitter.ostrich.RuntimeEnvironment


object HttpClient extends ostrich.Service { object HttpClient {
def main(args: Array[String]) { def main(args: Array[String]) {
val runtime = new RuntimeEnvironment(getClass)
ostrich.ServiceTracker.register(this)
val config = new ostrich.Config {
def telnetPort = 0
def httpBacklog = 0
def httpPort = 8890
def jmxPackage = None
}
ostrich.ServiceTracker.startAdmin(config, runtime)

val client = val client =
ClientBuilder() ClientBuilder()
.name("http") .name("http")
.hosts("localhost:10000,localhost:10001,localhost:10003") .hosts("localhost:10000,localhost:10001,localhost:10003")
.codec(Http) .codec(Http)
.exportLoadsToOstrich()
.reportTo(Ostrich())
.retries(2) .retries(2)
.logger(Logger.getLogger("http")) .logger(Logger.getLogger("http"))
.buildService[HttpRequest, HttpResponse]() .buildService[HttpRequest, HttpResponse]()
Expand Down
Expand Up @@ -3,30 +3,15 @@ package com.twitter.finagle.test
import java.net.InetSocketAddress import java.net.InetSocketAddress


import org.jboss.netty.buffer._ import org.jboss.netty.buffer._
import org.jboss.netty.channel._
import org.jboss.netty.handler.codec.http._ import org.jboss.netty.handler.codec.http._


import com.twitter.ostrich
import com.twitter.finagle.builder._ import com.twitter.finagle.builder._
import com.twitter.finagle.service._ import com.twitter.finagle.service._


import com.twitter.util.Future import com.twitter.util.Future
import com.twitter.ostrich.RuntimeEnvironment


object HttpServer extends ostrich.Service { object HttpServer {
def main(args: Array[String]) { def main(args: Array[String]) {
val runtime = new RuntimeEnvironment(getClass)

val config = new ostrich.Config {
def telnetPort = 0
def httpBacklog = 0
def httpPort = 8889
def jmxPackage = None
}

ostrich.ServiceTracker.register(this)
ostrich.ServiceTracker.startAdmin(config, runtime)

val server = new Service[HttpRequest, HttpResponse] { val server = new Service[HttpRequest, HttpResponse] {
def apply(request: HttpRequest) = Future { def apply(request: HttpRequest) = Future {
val response = new DefaultHttpResponse( val response = new DefaultHttpResponse(
Expand All @@ -38,7 +23,6 @@ object HttpServer extends ostrich.Service {


ServerBuilder() ServerBuilder()
.codec(Http) .codec(Http)
.reportTo(Ostrich())
.service(server) .service(server)
.bindTo(new InetSocketAddress(10000)) .bindTo(new InetSocketAddress(10000))
.build .build
Expand Down
@@ -1,13 +1,11 @@
package com.twitter.finagle.util package com.twitter.finagle.util


import scala.annotation.tailrec
import scala.collection.mutable.Queue import scala.collection.mutable.Queue
import scala.collection.JavaConversions._ import scala.collection.JavaConversions._


import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap


import com.twitter.util.{Duration, Time} import com.twitter.util.Duration
import com.twitter.util.TimeConversions._ import com.twitter.util.TimeConversions._


// TODO: do we want a decaying stat? // TODO: do we want a decaying stat?
Expand Down
16 changes: 1 addition & 15 deletions finagle-core/src/main/scala/com/twitter/finagle/util/Timer.scala
@@ -1,22 +1,8 @@
package com.twitter.finagle.util package com.twitter.finagle.util


import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit

import org.jboss.netty.util.{Timer, TimerTask, Timeout} import org.jboss.netty.util.{Timer, TimerTask, Timeout}

import com.twitter.util.Duration
import com.twitter.util.{Duration, Try, Promise}

object TimerFuture {
import Conversions._

def apply[A](timer: Timer, after: Duration, tryValue: => Try[A]) = {
val future = new Promise[A]
timer(after) {
future() = tryValue
}
future
}
}


class RichTimer(val self: Timer) { class RichTimer(val self: Timer) {
def apply(after: Duration)(f: => Unit): Timeout = { def apply(after: Duration)(f: => Unit): Timeout = {
Expand Down
Expand Up @@ -14,10 +14,10 @@ import org.jboss.netty.handler.codec.http._
import org.jboss.netty.util.HashedWheelTimer import org.jboss.netty.util.HashedWheelTimer


import com.twitter.conversions.time._ import com.twitter.conversions.time._
import com.twitter.ostrich.StatsCollection


import com.twitter.finagle.util.Conversions._ import com.twitter.finagle.util.Conversions._
import com.twitter.util.{RandomSocket, Duration} import com.twitter.util.{RandomSocket, Duration}
import com.twitter.ostrich.StatsCollection


object EmbeddedServer { object EmbeddedServer {
def apply() = new EmbeddedServer(RandomSocket()) def apply() = new EmbeddedServer(RandomSocket())
Expand Down Expand Up @@ -47,8 +47,10 @@ class EmbeddedServer(val addr: SocketAddress) {
val pipeline = Channels.pipeline() val pipeline = Channels.pipeline()
pipeline.addLast("transposer", new SimpleChannelDownstreamHandler { pipeline.addLast("transposer", new SimpleChannelDownstreamHandler {
override def writeRequested(ctx: ChannelHandlerContext, e: MessageEvent) { override def writeRequested(ctx: ChannelHandlerContext, e: MessageEvent) {
if (!isBelligerent) if (!isBelligerent) {
return super.writeRequested(ctx, e) super.writeRequested(ctx, e)
return
}


// Garble the message a bit. // Garble the message a bit.
val buffer = e.getMessage.asInstanceOf[ChannelBuffer] val buffer = e.getMessage.asInstanceOf[ChannelBuffer]
Expand Down

0 comments on commit 1312c6c

Please sign in to comment.