From 631ec9ac57e6facd2a691a367742ff280b238554 Mon Sep 17 00:00:00 2001 From: Nadav Leshem Date: Wed, 23 Apr 2014 12:13:02 +0300 Subject: [PATCH 1/2] updated twitter utils version --- build.sbt | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/build.sbt b/build.sbt index e776a29..4a1aea7 100644 --- a/build.sbt +++ b/build.sbt @@ -4,8 +4,6 @@ import sbtrelease._ import ReleaseStateTransformations._ import ReleasePlugin._ import ReleaseKeys._ -import Utilities._ -import com.typesafe.sbt.SbtPgp.PgpKeys._ name := "future-perfect" @@ -19,7 +17,7 @@ resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repos libraryDependencies ++= Seq( "org.slf4j" % "slf4j-api" % "1.7.5", - "com.twitter" %% "util-core" % "6.3.6", + "com.twitter" %% "util-core" % "6.12.1", "org.specs2" %% "specs2" % "2.3.7" % "test", "org.jmock" % "jmock" % "2.6.0" % "test" ) From 5a07738764fd9d7bf48e9f0dd172ecbe3c01770c Mon Sep 17 00:00:00 2001 From: Nadav Leshem Date: Wed, 23 Apr 2014 12:14:39 +0300 Subject: [PATCH 2/2] changed Timer to use the original Twitter timer class --- .../scala/com/wix/async/FuturePerfect.scala | 9 +-- .../async/ScheduledExecutorServiceTimer.scala | 30 --------- .../com/wix/async/CustomReportingTest.scala | 62 +++++++++++-------- .../com/wix/async/FuturePerfectTest.scala | 1 - 4 files changed, 41 insertions(+), 61 deletions(-) delete mode 100644 src/main/scala/com/wix/async/ScheduledExecutorServiceTimer.scala diff --git a/src/main/scala/com/wix/async/FuturePerfect.scala b/src/main/scala/com/wix/async/FuturePerfect.scala index d64949a..24aeac7 100644 --- a/src/main/scala/com/wix/async/FuturePerfect.scala +++ b/src/main/scala/com/wix/async/FuturePerfect.scala @@ -1,11 +1,11 @@ package com.wix.async import scala.concurrent.duration.Duration -import com.twitter.util._ -import java.util.concurrent.{Executors, ExecutorService, ScheduledExecutorService} +import java.util.concurrent.ExecutorService import FuturePerfect._ import Implicits._ +import com.twitter.util.{Future, FuturePool, ScheduledThreadPoolTimer, Timer, Stopwatch, TimeoutException} /** * @author shaiyallin @@ -15,7 +15,8 @@ import Implicits._ trait FuturePerfect extends Reporting[Event] { def executorService: ExecutorService - implicit lazy val timer: Timer = new ScheduledExecutorServiceTimer(Executors.newScheduledThreadPool(10)) + + private lazy val timer: Timer = new ScheduledThreadPoolTimer() class AsyncExecution[T](executorService: ExecutorService, timeout: Duration, @@ -59,7 +60,7 @@ trait FuturePerfect extends Reporting[Event] { } if (timeout != Duration.Zero) - future = future.within(timeout) + future = future.within(timer, timeout) future.rescue { case e: Throwable if retryPolicy.shouldRetryFor(e) => diff --git a/src/main/scala/com/wix/async/ScheduledExecutorServiceTimer.scala b/src/main/scala/com/wix/async/ScheduledExecutorServiceTimer.scala deleted file mode 100644 index fd17e23..0000000 --- a/src/main/scala/com/wix/async/ScheduledExecutorServiceTimer.scala +++ /dev/null @@ -1,30 +0,0 @@ -package com.wix.async - -import com.twitter.util._ -import java.util.concurrent._ - -/** - * @author shaiyallin - * @since 1/13/13 - */ - -class ScheduledExecutorServiceTimer(underlying: ScheduledExecutorService) extends Timer { - - def schedule(when: Time)(f: => Unit): TimerTask = { - val runnable = new Runnable { def run = f } - val javaFuture = underlying.schedule(runnable, when.sinceNow.inMillis, TimeUnit.MILLISECONDS) - new TimerTask { def cancel() { javaFuture.cancel(true) } } - } - - def schedule(when: Time, period: Duration)(f: => Unit): TimerTask = - schedule(when.sinceNow, period)(f) - - def schedule(wait: Duration, period: Duration)(f: => Unit): TimerTask = { - val runnable = new Runnable { def run = f } - val javaFuture = underlying.scheduleAtFixedRate(runnable, - wait.inMillis, period.inMillis, TimeUnit.MILLISECONDS) - new TimerTask { def cancel() { javaFuture.cancel(true) } } - } - - def stop() = underlying.shutdown() -} \ No newline at end of file diff --git a/src/test/scala/com/wix/async/CustomReportingTest.scala b/src/test/scala/com/wix/async/CustomReportingTest.scala index da3d1de..a7a44bd 100644 --- a/src/test/scala/com/wix/async/CustomReportingTest.scala +++ b/src/test/scala/com/wix/async/CustomReportingTest.scala @@ -1,6 +1,6 @@ package com.wix.async -import com.twitter.util.{CountDownLatch, Await} +import com.twitter.util.CountDownLatch import com.wix.async.FuturePerfect.{Successful, ExceededTimeout, Event} import java.util.concurrent.{ExecutorService, Executors} import org.specs2.mock.Mockito @@ -15,46 +15,56 @@ import scala.concurrent.duration._ */ class CustomReportingTest extends SpecificationWithJUnit with NoTimeConversions with Mockito { - trait CustomReporting { - def reportExceeded(result: String) - def reportSuccessful(result: String) - } - - trait StringReporting { this: Reporting[Event] => - def customReporting: CustomReporting - - listenFor { - case ExceededTimeout(actual, name, result: String) => customReporting.reportExceeded(result) - case Successful(actual, name, result: String) => customReporting.reportSuccessful(result) - } - } - - trait Context extends Scope with FuturePerfect with StringReporting { - def executorService: ExecutorService = Executors.newFixedThreadPool(4) - val customReporting: CustomReporting = mock[CustomReporting] - val latch = new CountDownLatch(1) - } - "an execution with custom reporting" should { "correctly report result for exceeded timeout event by passing the calculated value to the reporter" in new Context { - val result = "some result" - val future = execution(timeout = 10 millis) { + val future = execution(timeout = 1 millis) { latch.await() result + }.onFailure { + case e: TimeoutGaveUpException => latch.countDown() } - latch.countDown() + waitForExceededReport there was one(customReporting).reportExceeded(result) } "correctly report result for successful event by passing the calculated value to the reporter" in new Context { - val result = "some result" val future = execution { result } - Await.result(future) must_== result + waitForSuccessfulReport there was one(customReporting).reportSuccessful(result) } } + + trait Context extends Scope with FuturePerfect with StringReporting { + def executorService: ExecutorService = Executors.newFixedThreadPool(4) + val customReporting: CustomReporting = mock[CustomReporting] + customReporting.reportExceeded(any) answers {_ => customReportingExceededLatch.countDown()} + customReporting.reportSuccessful(any) answers {_ => customReportingSuccessLatch.countDown()} + + val latch = new CountDownLatch(1) + private val customReportingExceededLatch = new CountDownLatch(1) + private val customReportingSuccessLatch = new CountDownLatch(1) + + def waitForExceededReport = customReportingExceededLatch.await() + def waitForSuccessfulReport = customReportingSuccessLatch.await() + + val result = "some result" + } + + trait CustomReporting { + def reportExceeded(result: String) + def reportSuccessful(result: String) + } + + trait StringReporting { this: Reporting[Event] => + def customReporting: CustomReporting + + listenFor { + case ExceededTimeout(actual, name, result: String) => customReporting.reportExceeded(result) + case Successful(actual, name, result: String) => customReporting.reportSuccessful(result) + } + } } diff --git a/src/test/scala/com/wix/async/FuturePerfectTest.scala b/src/test/scala/com/wix/async/FuturePerfectTest.scala index e5f1288..c998fcb 100644 --- a/src/test/scala/com/wix/async/FuturePerfectTest.scala +++ b/src/test/scala/com/wix/async/FuturePerfectTest.scala @@ -61,7 +61,6 @@ class FuturePerfectTest extends SpecificationWithJUnit with Mockito with NoTimeC "report when timed out while in queue" in new AsyncScope { override val executorService = new DeterministicScheduler - override implicit lazy val timer = new ScheduledExecutorServiceTimer(Executors.newScheduledThreadPool(10)) val f = execution(timeout) { /* do nothing on purpose */ }