Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Tests compile using new Timer style

  • Loading branch information...
commit 662f708132630a6e07b1acefc5d8adb439b47752 1 parent acc12c5
Nick Kallen authored
Showing with 112 additions and 188 deletions.
  1. +1 −9 finagle-core/src/main/scala/com/twitter/finagle/channel/Broker.scala
  2. +4 −3 finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelPool.scala
  3. +7 −11 finagle-core/src/main/scala/com/twitter/finagle/channel/RetryingBroker.scala
  4. +2 −6 finagle-core/src/main/scala/com/twitter/finagle/channel/TimeoutBroker.scala
  5. +3 −16 finagle-core/src/main/scala/com/twitter/finagle/service/TimeoutFilter.scala
  6. +6 −5 finagle-core/src/main/scala/com/twitter/finagle/stats/JavaLoggerStatsReceiver.scala
  7. +0 −2  finagle-core/src/main/scala/com/twitter/finagle/util/Conversions.scala
  8. +3 −5 finagle-core/src/main/scala/com/twitter/finagle/util/{Future.scala → RichFuture.scala}
  9. +27 −9 finagle-core/src/main/scala/com/twitter/finagle/util/Timer.scala
  10. +2 −2 finagle-core/src/test/scala/com/twitter/finagle/channel/{TimeoutBroker.scala → TimeoutBrokerSpec.scala}
  11. +4 −4 finagle-core/src/test/scala/com/twitter/finagle/integration/EmbeddedServer.scala
  12. +10 −35 finagle-core/src/test/scala/com/twitter/finagle/service/TimeoutFilterSpec.scala
  13. +0 −50 finagle-core/src/test/scala/com/twitter/finagle/util/Future.scala
  14. +27 −0 finagle-core/src/test/scala/com/twitter/finagle/util/RichFutureSpec.scala
  15. +15 −30 finagle-core/src/test/scala/com/twitter/finagle/util/TimerSpec.scala
  16. +1 −1  project/build/Project.scala
View
10 finagle-core/src/main/scala/com/twitter/finagle/channel/Broker.scala
@@ -1,9 +1,5 @@
package com.twitter.finagle.channel
-import java.util.concurrent.TimeUnit
-
-import org.jboss.netty.util.HashedWheelTimer
-
import com.twitter.finagle.service.Service
// TODO: variables.
@@ -16,8 +12,4 @@ trait WrappingBroker extends Broker {
def apply(request: AnyRef) = underlying(request)
override def isAvailable = underlying.isAvailable
-}
-
-object Broker {
- val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
-}
+}
View
7 finagle-core/src/main/scala/com/twitter/finagle/channel/ChannelPool.scala
@@ -14,8 +14,9 @@ import com.twitter.finagle.util.Conversions._
import com.twitter.finagle.util._
class ChannelPool(
- clientBootstrap: BrokerClientBootstrap,
- connectRetryPeriod: Option[Duration] = None)
+ clientBootstrap: BrokerClientBootstrap,
+ connectRetryPeriod: Option[Duration] = None,
+ timer: Timer = Timer.default)
extends Serialized
{
@volatile private[this] var _isAvailable = false
@@ -39,7 +40,7 @@ class ChannelPool(
val timeSinceLastConnectAttempt = lastConnectAttempt.untilNow
if (timeSinceLastConnectAttempt < period) {
- Broker.timer(period - timeSinceLastConnectAttempt) {
+ timer.schedule(period - timeSinceLastConnectAttempt) {
tryToConnect(period)
}
} else {
View
18 finagle-core/src/main/scala/com/twitter/finagle/channel/RetryingBroker.scala
@@ -1,9 +1,9 @@
package com.twitter.finagle.channel
-import java.net.SocketAddress
import org.jboss.netty.util.HashedWheelTimer
import com.twitter.finagle.util.Conversions._
-import com.twitter.util.{Duration, Future, Promise, Throw, Return}
+import com.twitter.util._
+import com.twitter.finagle.util.Timer
object RetryingBroker {
def tries(underlying: Broker, numTries: Int) =
@@ -62,20 +62,16 @@ class NumTriesRetryStrategy(numTries: Int) extends RetryStrategy {
}
}
-object ExponentialBackoffRetryStrategy {
- // the default tick is 100ms
- val timer = new HashedWheelTimer()
-}
-
-class ExponentialBackoffRetryStrategy(delay: Duration, multiplier: Int)
+class ExponentialBackoffRetryStrategy(
+ delay: Duration,
+ multiplier: Int,
+ timer: Timer = Timer.default)
extends RetryStrategy
{
- import ExponentialBackoffRetryStrategy._
-
def apply() = {
val future = new Promise[RetryStrategy]
- timer(delay) {
+ timer.schedule(delay) {
future() = Return(
new ExponentialBackoffRetryStrategy(delay * multiplier, multiplier))
}
View
8 finagle-core/src/main/scala/com/twitter/finagle/channel/TimeoutBroker.scala
@@ -1,16 +1,12 @@
package com.twitter.finagle.channel
-import org.jboss.netty.util.Timer
import com.twitter.util.{Duration, Throw}
-
import com.twitter.finagle.util.Conversions._
+import com.twitter.finagle.util.Timer
-class TimeoutBroker(timer: Timer, val underlying: Broker, timeout: Duration)
+class TimeoutBroker(val underlying: Broker, timeout: Duration, timer: Timer = Timer.default)
extends WrappingBroker
{
- def this(underlying: Broker, timeout: Duration) =
- this(Broker.timer, underlying, timeout)
-
override def apply(req: AnyRef) =
underlying(req).timeout(
timer, timeout, Throw(new TimedoutRequestException))
View
19 finagle-core/src/main/scala/com/twitter/finagle/service/TimeoutFilter.scala
@@ -1,11 +1,9 @@
package com.twitter.finagle.service
-import org.jboss.netty.util.Timer
-
-import com.twitter.util.{Future, Promise, Duration, Throw}
-
import com.twitter.finagle.channel.TimedoutRequestException
import com.twitter.finagle.util.Conversions._
+import com.twitter.finagle.util.Timer
+import com.twitter.util.{Future, Duration, Throw}
/**
* A filter to apply a global timeout to the request. This allows,
@@ -16,17 +14,6 @@ class TimeoutFilter[Req <: AnyRef, Rep <: AnyRef](
extends Filter[Req, Rep, Req, Rep]
{
def apply(request: Req, service: Service[Req, Rep]): Future[Rep] = {
- val result = new Promise[Rep]
-
- // Dispatch to the service, but throw an exception if it takes too
- // long.
- val timeoutHandle =
- timer(timeout) { result.updateIfEmpty(Throw(new TimedoutRequestException)) }
- service(request) respond { response =>
- timeoutHandle.cancel()
- result.updateIfEmpty(response)
- }
-
- result
+ service(request).timeout(timer, timeout, Throw(new TimedoutRequestException))
}
}
View
11 finagle-core/src/main/scala/com/twitter/finagle/stats/JavaLoggerStatsReceiver.scala
@@ -1,12 +1,12 @@
package com.twitter.finagle.stats
import java.util.logging.Logger
-import org.jboss.netty.util.HashedWheelTimer
import com.twitter.conversions.time._
import com.twitter.finagle.util.Conversions._
+import com.twitter.finagle.util.Timer
-case class JavaLoggerStatsReceiver(logger: Logger) extends StatsReceiver {
- val timer = new HashedWheelTimer()
+class JavaLoggerStatsReceiver(logger: Logger, timer: Timer) extends StatsReceiver {
+ def this(logger: Logger) = this(logger, Timer.default)
def gauge(description: (String, String)*) = new Gauge {
def measure(value: Float) {
@@ -21,7 +21,7 @@ case class JavaLoggerStatsReceiver(logger: Logger) extends StatsReceiver {
}
def mkGauge(name: Seq[(String, String)], f: => Float) {
- timer(10.seconds) {
+ timer.schedule(10.seconds) {
logger.info("%s %2f".format(name, f))
}
}
@@ -34,5 +34,6 @@ case class JavaLoggerStatsReceiver(logger: Logger) extends StatsReceiver {
}
object JavaLoggerStatsReceiver {
- def apply(): JavaLoggerStatsReceiver = JavaLoggerStatsReceiver(Logger.getLogger(getClass.getName))
+ def apply(): JavaLoggerStatsReceiver =
+ new JavaLoggerStatsReceiver(Logger.getLogger("Finagle"))
}
View
2  finagle-core/src/main/scala/com/twitter/finagle/util/Conversions.scala
@@ -1,12 +1,10 @@
package com.twitter.finagle.util
-import org.jboss.netty.util.Timer
import org.jboss.netty.channel.ChannelFuture
import com.twitter.util.Future
object Conversions {
implicit def channelFutureToRichChannelFuture(f: ChannelFuture) = new RichChannelFuture(f)
- implicit def timerToRichTimer(t: Timer) = new RichTimer(t)
implicit def futureToRichFuture[A](f: Future[A]) = new RichFuture(f)
}
View
8 .../main/scala/com/twitter/finagle/util/Future.scala → ...n/scala/com/twitter/finagle/util/RichFuture.scala
@@ -1,18 +1,16 @@
package com.twitter.finagle.util
-import org.jboss.netty.util.Timer
-
import com.twitter.util.{Duration, Future, Promise, Try}
import Conversions._
-class RichFuture[A](val self: Future[A]) {
+class RichFuture[A](self: Future[A]) {
def timeout(timer: Timer, howlong: Duration, orElse: => Try[A]) = {
val promise = new Promise[A]
- val timeout = timer(howlong) { promise.updateIfEmpty(orElse) }
+ val timeout = timer.schedule(howlong.fromNow) { promise.updateIfEmpty(orElse) }
self respond { r =>
- promise.updateIfEmpty(r)
timeout.cancel()
+ promise.updateIfEmpty(r)
}
promise
}
View
36 finagle-core/src/main/scala/com/twitter/finagle/util/Timer.scala
@@ -1,16 +1,34 @@
package com.twitter.finagle.util
import java.util.concurrent.TimeUnit
-import org.jboss.netty.util.{Timer, TimerTask, Timeout}
-import com.twitter.util.Duration
+import com.twitter.util.{Time, Duration, TimerTask}
+import org.jboss.netty.util.{HashedWheelTimer, Timeout}
-class RichTimer(val self: Timer) {
- def apply(after: Duration)(f: => Unit): Timeout = {
- self.newTimeout(new TimerTask {
+object Timer {
+ lazy val default = new Timer(new HashedWheelTimer(10, TimeUnit.MILLISECONDS))
+}
+
+class Timer(underlying: org.jboss.netty.util.Timer) extends com.twitter.util.Timer {
+ def schedule(when: Time)(f: => Unit): TimerTask = {
+ val timeout = underlying.newTimeout(new org.jboss.netty.util.TimerTask {
def run(to: Timeout) {
- if (!to.isCancelled())
- f
+ if (!to.isCancelled) f
}
- }, after.inMilliseconds, TimeUnit.MILLISECONDS)
+ }, when.inMilliseconds, TimeUnit.MILLISECONDS)
+ toTimerTask(timeout)
}
-}
+
+ def schedule(when: Time, period: Duration)(f: => Unit): TimerTask = {
+ val task = schedule(when) {
+ f
+ schedule(period)(f)
+ }
+ task
+ }
+
+ def stop() { underlying.stop() }
+
+ private[this] def toTimerTask(task: Timeout) = new TimerTask {
+ def cancel() { task.cancel() }
+ }
+}
View
4 ...a/com/twitter/finagle/channel/TimeoutBroker.scala → ...m/twitter/finagle/channel/TimeoutBrokerSpec.scala
@@ -7,7 +7,7 @@ import org.specs.mock.Mockito
import org.mockito.Matchers
import org.jboss.netty.util.{Timer, Timeout, TimerTask}
-import org.jboss.netty.channel.{MessageEvent, Channels}
+import org.jboss.netty.channel.{MessageEvent}
import com.twitter.util.{Future, Promise, Return, Throw}
@@ -34,7 +34,7 @@ object TimeoutBrokerSpec extends Specification with Mockito {
timeout
}
- val tob = new TimeoutBroker(timer, b, 100.milliseconds)
+ val tob = new TimeoutBroker(b, 100.milliseconds)
val f = tob(e)
timerTask must notBeNull
View
8 finagle-core/src/test/scala/com/twitter/finagle/integration/EmbeddedServer.scala
@@ -3,7 +3,7 @@ package com.twitter.finagle.integration
import scala.collection.JavaConversions._
import java.net.SocketAddress
-import java.util.concurrent.{Executors, TimeUnit}
+import java.util.concurrent.{Executors}
import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
@@ -11,18 +11,18 @@ import org.jboss.netty.channel._
import org.jboss.netty.buffer._
import org.jboss.netty.channel.group.DefaultChannelGroup
import org.jboss.netty.handler.codec.http._
-import org.jboss.netty.util.HashedWheelTimer
import com.twitter.conversions.time._
import com.twitter.finagle.util.Conversions._
import com.twitter.util.{RandomSocket, Duration}
import com.twitter.ostrich.StatsCollection
+import com.twitter.finagle.util.Timer
object EmbeddedServer {
def apply() = new EmbeddedServer(RandomSocket())
val executor = Executors.newCachedThreadPool()
- val timer = new HashedWheelTimer(10, TimeUnit.MILLISECONDS)
+ val timer = Timer.default
}
class EmbeddedServer(val addr: SocketAddress) {
@@ -91,7 +91,7 @@ class EmbeddedServer(val addr: SocketAddress) {
pipeline.addLast("latency", new SimpleChannelDownstreamHandler {
override def writeRequested(ctx: ChannelHandlerContext, e: MessageEvent) {
if (latency != 0.seconds)
- timer(latency) { super.writeRequested(ctx, e) }
+ timer.schedule(latency) { super.writeRequested(ctx, e) }
else
super.writeRequested(ctx, e)
}
View
45 finagle-core/src/test/scala/com/twitter/finagle/service/TimeoutFilterSpec.scala
@@ -1,53 +1,28 @@
package com.twitter.finagle.service
-import java.util.concurrent.TimeUnit
-
import org.specs.Specification
import org.specs.mock.Mockito
-import org.mockito.{ArgumentCaptor, Matchers}
+import com.twitter.conversions.time._
-import org.jboss.netty.util.{Timer, TimerTask, Timeout}
+import com.twitter.finagle.util.Timer
+import com.twitter.finagle.util.Conversions._
-import com.twitter.util.{Future, Promise, Return, Throw}
import com.twitter.conversions.time._
-
-import com.twitter.finagle.channel.TimedoutRequestException
+import com.twitter.util.{Try, Promise}
object TimeoutFilterSpec extends Specification with Mockito {
"TimeoutFilter" should {
- val timer = mock[Timer]
- val timeout = mock[Timeout]
- val duration = 50.milliseconds
- val (timeoutValue, timeoutUnit) = duration.inTimeUnit
- val request = mock[Object]
- val service = mock[Service[AnyRef, AnyRef]]
- val responseFuture = new Promise[AnyRef]
- val response = mock[Object]
- val taskCaptor = ArgumentCaptor.forClass(classOf[TimerTask])
- timer.newTimeout(
- taskCaptor.capture,
- Matchers.eq(timeoutValue),
- Matchers.eq(timeoutUnit)) returns timeout
- service(request) returns responseFuture
- val timeoutFilter = new TimeoutFilter[AnyRef, AnyRef](timer, duration)
- val future = timeoutFilter(request, service)
+ val timer = Timer.default
+ val alternative = Try(2)
+ val promise = new Promise[Int].timeout(timer, 1.second, alternative)
"cancels the request when the service succeeds" in {
- responseFuture() = Return(response)
- there was one(timeout).cancel
- future() must be_==(response)
+ promise.setValue(1)
+ promise(2.seconds) mustBe 1
}
"times out a request that is not successful" in {
- // Time out:
- val task = taskCaptor.getValue()
- task.run(timeout)
-
- future.isDefined must beTrue
- future.isThrow must beTrue
-
- val Throw(exc) = future.within(0.seconds)
- exc must haveClass[TimedoutRequestException]
+ promise(2.seconds) mustBe alternative.get
}
}
}
View
50 finagle-core/src/test/scala/com/twitter/finagle/util/Future.scala
@@ -1,50 +0,0 @@
-package com.twitter.finagle.util
-
-import java.util.concurrent.TimeUnit
-
-import org.specs.Specification
-import org.specs.mock.Mockito
-import org.mockito.{ArgumentCaptor, Matchers}
-
-import org.jboss.netty.util.{Timer, Timeout, TimerTask}
-
-import com.twitter.conversions.time._
-import com.twitter.util.{Promise, Throw, Return}
-
-object FutureSpec extends Specification with Mockito {
- "Future.timeout" should {
- val timer = mock[Timer]
- val richTimer = new RichTimer(timer)
- val timeout = mock[Timeout]
- val taskCaptor = ArgumentCaptor.forClass(classOf[TimerTask])
- timer.newTimeout(
- taskCaptor.capture,
- Matchers.eq(10000L),
- Matchers.eq(TimeUnit.MILLISECONDS)) returns timeout
-
- val promise = new Promise[Unit]
- val f = new RichFuture(promise)
- .timeout(timer, 10.seconds, Throw(new Exception("timed out")))
- there was one(timer).newTimeout(
- any[TimerTask], Matchers.eq(10000L), Matchers.eq(TimeUnit.MILLISECONDS))
- val timerTask = taskCaptor.getValue
-
- "on success: propagate & cancel the timer" in {
- f.isDefined must beFalse
- promise() = Return(())
- f() must be_==(())
- there was one(timeout).cancel()
- }
-
- "on failure: propagate" in {
- f.isDefined must beFalse
- timerTask.run(timeout)
- f() must throwA(new Exception("timed out"))
-
- // Make sure that setting the promise afterwards doesn't do
- // anything crazy:
- promise() = Return(())
- }
-
- }
-}
View
27 finagle-core/src/test/scala/com/twitter/finagle/util/RichFutureSpec.scala
@@ -0,0 +1,27 @@
+package com.twitter.finagle.util
+
+import org.specs.Specification
+import org.specs.mock.Mockito
+import com.twitter.conversions.time._
+
+import com.twitter.finagle.util.Conversions._
+import com.twitter.util.{Try, Promise}
+
+object RichFutureSpec extends Specification with Mockito {
+ "RichFuture" should {
+ "timeout" in {
+ val timer = Timer.default
+ val alternative = Try(2)
+ val richFuture = new Promise[Int].timeout(timer, 1.second, alternative)
+
+ "on success: propagate & cancel the timer" in {
+ richFuture.setValue(1)
+ richFuture(2.seconds) mustNot throwA[Exception]
+ }
+
+ "on failure: propagate" in {
+ richFuture(2.seconds) mustBe alternative.get
+ }
+ }
+ }
+}
View
45 finagle-core/src/test/scala/com/twitter/finagle/util/TimerSpec.scala
@@ -1,46 +1,31 @@
package com.twitter.finagle.util
-import java.util.concurrent.TimeUnit
-
import org.specs.Specification
import org.specs.mock.Mockito
-import org.mockito.{ArgumentCaptor, Matchers}
-
-import org.jboss.netty.util.{Timer, TimerTask, Timeout}
-import com.twitter.util.{Throw, Future, Promise, Return}
+import com.twitter.util.{CountDownLatch, Time}
import com.twitter.conversions.time._
object TimerSpec extends Specification with Mockito {
- "RichTimer" should {
- val timer = mock[Timer]
- val richTimer = new RichTimer(timer)
- val timeout = mock[Timeout]
- val taskCaptor = ArgumentCaptor.forClass(classOf[TimerTask])
- timer.newTimeout(
- taskCaptor.capture,
- Matchers.eq(10000L),
- Matchers.eq(TimeUnit.MILLISECONDS)) returns timeout
-
- var wasInvoked = false
- richTimer(10.seconds) { wasInvoked = true }
- there was one(timer).newTimeout(
- any[TimerTask],
- Matchers.eq(10000L),
- Matchers.eq(TimeUnit.MILLISECONDS))
- val timeoutTask = taskCaptor.getValue
- wasInvoked must beFalse
+ "Timer" should {
+ val timer = Timer.default
+
+ val start = Time.now
+ var end = Time.now
+ val latch = new CountDownLatch(1)
+ val task = timer.schedule(1.second.fromNow) {
+ latch.countDown()
+ end = Time.now
+ }
"not execute the task until it has timed out" in {
- timeout.isCancelled returns false
- timeoutTask.run(timeout)
- wasInvoked must beTrue
+ latch.await(1.second) must beTrue
+ (end - start).moreOrLessEquals(1.second, 10.milliseconds)
}
"not execute the task if it has been cancelled" in {
- timeout.isCancelled returns true
- timeoutTask.run(timeout)
- wasInvoked must beFalse
+ task.cancel()
+ latch.await(1.second) must beFalse
}
}
}
View
2  project/build/Project.scala
@@ -25,7 +25,7 @@ class Project(info: ProjectInfo) extends StandardParentProject(info)
val nettyRepo =
"repository.jboss.org" at "http://repository.jboss.org/nexus/content/groups/public/"
val netty = "org.jboss.netty" % "netty" % "3.2.3.Final"
- val util = "com.twitter" % "util" % "1.4.8"
+ val util = "com.twitter" % "util" % "1.4.9"
val mockito = "org.mockito" % "mockito-all" % "1.8.5" % "test" withSources()
val specs = "org.scala-tools.testing" % "specs_2.8.0" % "1.6.5" % "test" withSources()
Please sign in to comment.
Something went wrong with that request. Please try again.