Skip to content

Commit

Permalink
Merge pull request #3 from nadav-dav/master
Browse files Browse the repository at this point in the history
Changed Timer object to use Twitter timer, update twitter-util version
  • Loading branch information
orrsella committed Apr 23, 2014
2 parents 74333fc + 5a07738 commit b33cbe7
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 64 deletions.
4 changes: 1 addition & 3 deletions build.sbt
Expand Up @@ -4,8 +4,6 @@ import sbtrelease._
import ReleaseStateTransformations._
import ReleasePlugin._
import ReleaseKeys._
import Utilities._
import com.typesafe.sbt.SbtPgp.PgpKeys._

name := "future-perfect"

Expand All @@ -19,7 +17,7 @@ resolvers += "Sonatype OSS Snapshots" at "https://oss.sonatype.org/content/repos

libraryDependencies ++= Seq(
"org.slf4j" % "slf4j-api" % "1.7.5",
"com.twitter" %% "util-core" % "6.3.6",
"com.twitter" %% "util-core" % "6.12.1",
"org.specs2" %% "specs2" % "2.3.7" % "test",
"org.jmock" % "jmock" % "2.6.0" % "test"
)
Expand Down
9 changes: 5 additions & 4 deletions src/main/scala/com/wix/async/FuturePerfect.scala
@@ -1,11 +1,11 @@
package com.wix.async

import scala.concurrent.duration.Duration
import com.twitter.util._

import java.util.concurrent.{Executors, ExecutorService, ScheduledExecutorService}
import java.util.concurrent.ExecutorService
import FuturePerfect._
import Implicits._
import com.twitter.util.{Future, FuturePool, ScheduledThreadPoolTimer, Timer, Stopwatch, TimeoutException}

/**
* @author shaiyallin
Expand All @@ -15,7 +15,8 @@ import Implicits._
trait FuturePerfect extends Reporting[Event] {

def executorService: ExecutorService
implicit lazy val timer: Timer = new ScheduledExecutorServiceTimer(Executors.newScheduledThreadPool(10))

private lazy val timer: Timer = new ScheduledThreadPoolTimer()

class AsyncExecution[T](executorService: ExecutorService,
timeout: Duration,
Expand Down Expand Up @@ -59,7 +60,7 @@ trait FuturePerfect extends Reporting[Event] {
}

if (timeout != Duration.Zero)
future = future.within(timeout)
future = future.within(timer, timeout)

future.rescue {
case e: Throwable if retryPolicy.shouldRetryFor(e) =>
Expand Down
30 changes: 0 additions & 30 deletions src/main/scala/com/wix/async/ScheduledExecutorServiceTimer.scala

This file was deleted.

62 changes: 36 additions & 26 deletions src/test/scala/com/wix/async/CustomReportingTest.scala
@@ -1,6 +1,6 @@
package com.wix.async

import com.twitter.util.{CountDownLatch, Await}
import com.twitter.util.CountDownLatch
import com.wix.async.FuturePerfect.{Successful, ExceededTimeout, Event}
import java.util.concurrent.{ExecutorService, Executors}
import org.specs2.mock.Mockito
Expand All @@ -15,46 +15,56 @@ import scala.concurrent.duration._
*/
class CustomReportingTest extends SpecificationWithJUnit with NoTimeConversions with Mockito {

trait CustomReporting {
def reportExceeded(result: String)
def reportSuccessful(result: String)
}

trait StringReporting { this: Reporting[Event] =>
def customReporting: CustomReporting

listenFor {
case ExceededTimeout(actual, name, result: String) => customReporting.reportExceeded(result)
case Successful(actual, name, result: String) => customReporting.reportSuccessful(result)
}
}

trait Context extends Scope with FuturePerfect with StringReporting {
def executorService: ExecutorService = Executors.newFixedThreadPool(4)
val customReporting: CustomReporting = mock[CustomReporting]
val latch = new CountDownLatch(1)
}

"an execution with custom reporting" should {
"correctly report result for exceeded timeout event by passing the calculated value to the reporter" in new Context {
val result = "some result"
val future = execution(timeout = 10 millis) {
val future = execution(timeout = 1 millis) {
latch.await()
result
}.onFailure {
case e: TimeoutGaveUpException => latch.countDown()
}

latch.countDown()
waitForExceededReport
there was one(customReporting).reportExceeded(result)
}

"correctly report result for successful event by passing the calculated value to the reporter" in new Context {
val result = "some result"
val future = execution {
result
}

Await.result(future) must_== result
waitForSuccessfulReport
there was one(customReporting).reportSuccessful(result)
}
}

trait Context extends Scope with FuturePerfect with StringReporting {
def executorService: ExecutorService = Executors.newFixedThreadPool(4)
val customReporting: CustomReporting = mock[CustomReporting]
customReporting.reportExceeded(any) answers {_ => customReportingExceededLatch.countDown()}
customReporting.reportSuccessful(any) answers {_ => customReportingSuccessLatch.countDown()}

val latch = new CountDownLatch(1)
private val customReportingExceededLatch = new CountDownLatch(1)
private val customReportingSuccessLatch = new CountDownLatch(1)

def waitForExceededReport = customReportingExceededLatch.await()
def waitForSuccessfulReport = customReportingSuccessLatch.await()

val result = "some result"
}

trait CustomReporting {
def reportExceeded(result: String)
def reportSuccessful(result: String)
}

trait StringReporting { this: Reporting[Event] =>
def customReporting: CustomReporting

listenFor {
case ExceededTimeout(actual, name, result: String) => customReporting.reportExceeded(result)
case Successful(actual, name, result: String) => customReporting.reportSuccessful(result)
}
}
}
1 change: 0 additions & 1 deletion src/test/scala/com/wix/async/FuturePerfectTest.scala
Expand Up @@ -61,7 +61,6 @@ class FuturePerfectTest extends SpecificationWithJUnit with Mockito with NoTimeC
"report when timed out while in queue" in new AsyncScope {

override val executorService = new DeterministicScheduler
override implicit lazy val timer = new ScheduledExecutorServiceTimer(Executors.newScheduledThreadPool(10))

val f = execution(timeout) { /* do nothing on purpose */ }

Expand Down

0 comments on commit b33cbe7

Please sign in to comment.