Skip to content

Commit

Permalink
Make additional parameters for reporting apply to Successful event in…
Browse files Browse the repository at this point in the history
… addition to ExceededTimeout
  • Loading branch information
orrsella committed Feb 11, 2014
1 parent 76cc01e commit f13c93b
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 26 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@ project/plugins/project/
.scala_dependencies

# OS X
.DS_Store
.DS_Store

# IntelliJ specific
.idea/
.idea_modules/
22 changes: 11 additions & 11 deletions src/main/scala/com/wix/async/FuturePerfect.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ trait FuturePerfect extends Reporting[Event] {
retryPolicy: RetryPolicy,
onTimeout: TimeoutHandler,
executionName: String,
exceededTimeoutParams: (T) => Map[String, String]) {
paramsExtractor: (T) => Map[String, String]) {

private[this] def submitToAsyncExecution(f: => T) = pool(f)
protected[this] lazy val pool = FuturePool(executorService)
Expand All @@ -47,7 +47,7 @@ trait FuturePerfect extends Reporting[Event] {
val res: T = nested
val duration = elapsedInBlockingCall()
if (duration > timeout) {
report(ExceededTimeout(duration, executionName, exceededTimeoutParams(res)))
report(ExceededTimeout(duration, executionName, paramsExtractor(res)))
}

res
Expand Down Expand Up @@ -75,26 +75,26 @@ trait FuturePerfect extends Reporting[Event] {

throw onTimeout.applyOrElse(e, (cause: TimeoutException) => TimeoutGaveUpException(cause, executionName, timeout))

}.onSuccess { _ =>
report(Successful(submittedToQueue(), executionName))
}.onSuccess { t: T =>
report(Successful(submittedToQueue(), executionName, paramsExtractor(t)))
}.onFailure { error =>
report(Failed(submittedToQueue(), error, executionName))
}
}
}

// for some reason default parameters don't work for a curried function so I had to supply all permutations
def execution[T](retryPolicy: RetryPolicy)(f: => T): Future[T] = execution(Duration.Zero, retryPolicy, exceededTimeoutParams = emptyParams[T])(f)
def execution[T](timeout: Duration)(f: => T): Future[T] = execution(timeout, NoRetries, exceededTimeoutParams = emptyParams[T])(f)
def execution[T](timeout: Duration, retryPolicy: RetryPolicy)(f: => T): Future[T] = execution(timeout, retryPolicy, exceededTimeoutParams = emptyParams[T])(f)
def execution[T](f: => T): Future[T] = execution(Duration.Zero, NoRetries, exceededTimeoutParams = emptyParams[T])(f)
def execution[T](retryPolicy: RetryPolicy)(f: => T): Future[T] = execution(Duration.Zero, retryPolicy, paramsExtractor = emptyParams[T])(f)
def execution[T](timeout: Duration)(f: => T): Future[T] = execution(timeout, NoRetries, paramsExtractor = emptyParams[T])(f)
def execution[T](timeout: Duration, retryPolicy: RetryPolicy)(f: => T): Future[T] = execution(timeout, retryPolicy, paramsExtractor = emptyParams[T])(f)
def execution[T](f: => T): Future[T] = execution(Duration.Zero, NoRetries, paramsExtractor = emptyParams[T])(f)
def execution[T](timeout: Duration = Duration.Zero,
retryPolicy: RetryPolicy = NoRetries,
onTimeout: TimeoutHandler = PartialFunction.empty,
name: String = defaultName,
exceededTimeoutParams: (T) => Map[String, String] = emptyParams)(blockingExecution: => T): Future[T] =
paramsExtractor: (T) => Map[String, String] = emptyParams)(blockingExecution: => T): Future[T] =

new AsyncExecution[T](executorService, timeout, retryPolicy, onTimeout, name, exceededTimeoutParams).apply(blockingExecution)
new AsyncExecution[T](executorService, timeout, retryPolicy, onTimeout, name, paramsExtractor).apply(blockingExecution)

private def defaultName = "async"
private def emptyParams[T] = (t: T) => Map[String, String]()
Expand All @@ -111,7 +111,7 @@ object FuturePerfect {
case class Retrying(timeout: Duration, remainingRetries: Long, executionName: String) extends Event
case class GaveUp(timeout: Duration, e: TimeoutException, executionName: String) extends Event
case class ExceededTimeout(actual: Duration, executionName: String, params: Map[String, String]) extends Event
case class Successful(elapsed: Duration, executionName: String) extends Event
case class Successful(elapsed: Duration, executionName: String, params: Map[String, String]) extends Event
case class Failed(elapsed: Duration, error: Throwable, executionName: String) extends Event
}

Expand Down
22 changes: 9 additions & 13 deletions src/main/scala/com/wix/async/LoggerReporting.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ trait LoggerReporting extends LoggerReportingMessages { this: Reporting[Event] =
case GaveUp(timeout, error, executionName) => log.error(gaveUp(timeout, error, executionName))
case ExceededTimeout(actual, executionName, params) => log.error(exceededTimeout(actual, executionName, params))
case TimeSpentInQueue(time, executionName) => log.info(timeSpentInQueue(time, executionName))
case Successful(elapsed, executionName) => log.info(successful(elapsed, executionName))
case Successful(elapsed, executionName, params) => log.info(successful(elapsed, executionName, params))
case Failed(elapsed, error, executionName) => log.error(failed(elapsed, error, executionName), error)
case TimeoutWhileInQueue(timeInQueue, error, executionName) => log.error(timeoutWhileInQueue(timeInQueue, error, executionName), error)
}
Expand All @@ -28,19 +28,15 @@ trait LoggerReporting extends LoggerReportingMessages { this: Reporting[Event] =
trait LoggerReportingMessages {
def retrying(timeout: Duration, remainingRetries: Long, executionName: String) = s"Execution [$executionName] timed out after ${timeout.toMillis} ms, retrying $remainingRetries more times."
def gaveUp(timeout: Duration, e: TimeoutException, executionName: String) = s"Execution [$executionName] timed out after ${timeout.toMillis} ms, giving up."

def exceededTimeout(actual: Duration, executionName: String, params: Map[String, String]) = {
val paramsStr =
if (params.nonEmpty) {
val p = params.map(p => s"${p._1}=[${p._2}]")
s" Additional params: ${p.mkString(", ")}"
} else ""

s"Execution [$executionName] timed out, actual duration was ${actual.toMillis} ms.$paramsStr"
}

def exceededTimeout(actual: Duration, executionName: String, params: Map[String, String]) = s"Execution [$executionName] timed out, actual duration was ${actual.toMillis} ms.${formatParams(params)}"
def timeSpentInQueue(time: Duration, executionName: String) = s"Execution [$executionName] started after spending ${time.toMillis} ms in queue."
def successful(elapsed: Duration, executionName: String) = s"Execution [$executionName] succeeded after ${elapsed.toMillis} ms."
def successful(elapsed: Duration, executionName: String, params: Map[String, String]) = s"Execution [$executionName] succeeded after ${elapsed.toMillis} ms.${formatParams(params)}"
def failed(elapsed: Duration, error: Throwable, executionName: String) = s"Execution [$executionName] failed after ${elapsed.toMillis} ms."
def timeoutWhileInQueue(timeInQueue: Duration, e: TimeoutException, executionName: String) = s"Execution [$executionName] timed out after waiting ${timeInQueue.toMillis} in queue."

private def formatParams(params: Map[String, String]) =
if (params.nonEmpty) {
val p = params.map(p => s"${p._1}=[${p._2}]")
s" Additional params: ${p.mkString(", ")}"
} else ""
}
9 changes: 8 additions & 1 deletion src/test/scala/com/wix/async/FuturePerfectTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ class FuturePerfectTest extends SpecificationWithJUnit with Mockito with NoTimeC
there was one(reporter).report(matchA[Successful].executionName("foo"))
}

"report success with additional params" in new AsyncScope {
val someParams = (u: Unit) => Map("foo" -> "bar")
Await.result(execution(timeout = timeout, name = "foo", paramsExtractor = someParams) { /* do nothing on purpose */ })

there was one(reporter).report(matchA[Successful].params(someParams()))
}

"report time spent in queue" in new AsyncScope {
Await.result(execution(timeout) { /* do nothing on purpose */ })

Expand Down Expand Up @@ -95,7 +102,7 @@ class FuturePerfectTest extends SpecificationWithJUnit with Mockito with NoTimeC

"timeout when blocking function stalls with expected params" in new AsyncScope {
val someParams = (b: Boolean) => Map("Value" -> b.toString, "StaticValue" -> "foo")
val f = execution(timeout = timeout, exceededTimeoutParams = someParams) {
val f = execution(timeout = timeout, paramsExtractor = someParams) {
bar.await()
}
Await.result(f) must throwA[TimeoutGaveUpException]
Expand Down

0 comments on commit f13c93b

Please sign in to comment.