Skip to content

Commit

Permalink
Merge pull request #1 from orrsella/exceeded-timeout-params
Browse files Browse the repository at this point in the history
Add exceededTimeoutParams function to execution parameters to allow custom values reporting
  • Loading branch information
electricmonk committed Feb 11, 2014
2 parents 9bba35d + f13c93b commit d8e4ef5
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 31 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,8 @@ project/plugins/project/
.scala_dependencies

# OS X
.DS_Store
.DS_Store

# IntelliJ specific
.idea/
.idea_modules/
29 changes: 17 additions & 12 deletions src/main/scala/com/wix/async/FuturePerfect.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ trait FuturePerfect extends Reporting[Event] {
timeout: Duration,
retryPolicy: RetryPolicy,
onTimeout: TimeoutHandler,
executionName: String) {
executionName: String,
paramsExtractor: (T) => Map[String, String]) {

private[this] def submitToAsyncExecution(f: => T) = pool(f)
protected[this] lazy val pool = FuturePool(executorService)

private var started = false

def apply(blockingExecution: => T): Future[T] = execute(retryPolicy)(blockingExecution)
Expand All @@ -46,7 +47,7 @@ trait FuturePerfect extends Reporting[Event] {
val res: T = nested
val duration = elapsedInBlockingCall()
if (duration > timeout) {
report(ExceededTimeout(duration, executionName))
report(ExceededTimeout(duration, executionName, paramsExtractor(res)))
}

res
Expand Down Expand Up @@ -74,25 +75,29 @@ trait FuturePerfect extends Reporting[Event] {

throw onTimeout.applyOrElse(e, (cause: TimeoutException) => TimeoutGaveUpException(cause, executionName, timeout))

}.onSuccess { _ =>
report(Successful(submittedToQueue(), executionName))
}.onSuccess { t: T =>
report(Successful(submittedToQueue(), executionName, paramsExtractor(t)))
}.onFailure { error =>
report(Failed(submittedToQueue(), error, executionName))
}
}
}

// for some reason default parameters don't work for a curried function so I had to supply all permutations
def execution[T](retry: RetryPolicy)(f: => T): Future[T] = execution(Duration.Zero, retry)(f)
def execution[T](timeout: Duration)(f: => T): Future[T] = execution(timeout, NoRetries)(f)
def execution[T](f: => T): Future[T] = execution(Duration.Zero, NoRetries)(f)
def execution[T](retryPolicy: RetryPolicy)(f: => T): Future[T] = execution(Duration.Zero, retryPolicy, paramsExtractor = emptyParams[T])(f)
def execution[T](timeout: Duration)(f: => T): Future[T] = execution(timeout, NoRetries, paramsExtractor = emptyParams[T])(f)
def execution[T](timeout: Duration, retryPolicy: RetryPolicy)(f: => T): Future[T] = execution(timeout, retryPolicy, paramsExtractor = emptyParams[T])(f)
def execution[T](f: => T): Future[T] = execution(Duration.Zero, NoRetries, paramsExtractor = emptyParams[T])(f)
def execution[T](timeout: Duration = Duration.Zero,
retryPolicy: RetryPolicy = NoRetries,
onTimeout: TimeoutHandler = PartialFunction.empty,
name: String = defaultName)(blockingExecution: => T): Future[T] =
new AsyncExecution[T](executorService, timeout, retryPolicy, onTimeout, name).apply(blockingExecution)
name: String = defaultName,
paramsExtractor: (T) => Map[String, String] = emptyParams)(blockingExecution: => T): Future[T] =

new AsyncExecution[T](executorService, timeout, retryPolicy, onTimeout, name, paramsExtractor).apply(blockingExecution)

private def defaultName = "async"
private def emptyParams[T] = (t: T) => Map[String, String]()
}

object FuturePerfect {
Expand All @@ -105,8 +110,8 @@ object FuturePerfect {
case class TimeSpentInQueue(time: Duration, executionName: String) extends Event
case class Retrying(timeout: Duration, remainingRetries: Long, executionName: String) extends Event
case class GaveUp(timeout: Duration, e: TimeoutException, executionName: String) extends Event
case class ExceededTimeout(actual: Duration, executionName: String) extends Event
case class Successful(elapsed: Duration, executionName: String) extends Event
case class ExceededTimeout(actual: Duration, executionName: String, params: Map[String, String]) extends Event
case class Successful(elapsed: Duration, executionName: String, params: Map[String, String]) extends Event
case class Failed(elapsed: Duration, error: Throwable, executionName: String) extends Event
}

Expand Down
36 changes: 27 additions & 9 deletions src/main/scala/com/wix/async/LoggerReporting.scala
Original file line number Diff line number Diff line change
@@ -1,24 +1,42 @@
package com.wix.async

import org.slf4j.LoggerFactory
import com.twitter.util.TimeoutException
import com.wix.async.FuturePerfect._
import org.slf4j.LoggerFactory
import scala.concurrent.duration.Duration

/**
* @author shaiyallin
* @since 12/6/13
*/

trait LoggerReporting { this: Reporting[Event] =>
trait LoggerReporting extends LoggerReportingMessages { this: Reporting[Event] =>

private val log = LoggerFactory.getLogger(getClass)

listenFor {
case Retrying(timeout, remainingRetries, executionName) => log.warn(s"Execution [$executionName] timed out after ${timeout.toMillis} ms, retrying $remainingRetries more times.")
case GaveUp(timeout, _, executionName) => log.error(s"Execution [$executionName] timed out after ${timeout.toMillis} ms, giving up.")
case ExceededTimeout(actual, executionName) => log.error(s"Execution [$executionName] timed out, actual duration was ${actual.toMillis} ms.")
case TimeSpentInQueue(time, executionName) => log.info(s"Execution [$executionName] started after spending ${time.toMillis} ms in queue.")
case Successful(time, executionName) => log.info(s"Execution [$executionName] succeeded after ${time.toMillis} ms.")
case Failed(time, error, executionName) => log.error(s"Execution [$executionName] failed after ${time.toMillis} ms.", error)
case TimeoutWhileInQueue(time, error, executionName) => log.error(s"Execution [$executionName] timed out after waiting ${time.toMillis} in queue.", error)
case Retrying(timeout, remainingRetries, executionName) => log.warn(retrying(timeout, remainingRetries, executionName))
case GaveUp(timeout, error, executionName) => log.error(gaveUp(timeout, error, executionName))
case ExceededTimeout(actual, executionName, params) => log.error(exceededTimeout(actual, executionName, params))
case TimeSpentInQueue(time, executionName) => log.info(timeSpentInQueue(time, executionName))
case Successful(elapsed, executionName, params) => log.info(successful(elapsed, executionName, params))
case Failed(elapsed, error, executionName) => log.error(failed(elapsed, error, executionName), error)
case TimeoutWhileInQueue(timeInQueue, error, executionName) => log.error(timeoutWhileInQueue(timeInQueue, error, executionName), error)
}
}

trait LoggerReportingMessages {
def retrying(timeout: Duration, remainingRetries: Long, executionName: String) = s"Execution [$executionName] timed out after ${timeout.toMillis} ms, retrying $remainingRetries more times."
def gaveUp(timeout: Duration, e: TimeoutException, executionName: String) = s"Execution [$executionName] timed out after ${timeout.toMillis} ms, giving up."
def exceededTimeout(actual: Duration, executionName: String, params: Map[String, String]) = s"Execution [$executionName] timed out, actual duration was ${actual.toMillis} ms.${formatParams(params)}"
def timeSpentInQueue(time: Duration, executionName: String) = s"Execution [$executionName] started after spending ${time.toMillis} ms in queue."
def successful(elapsed: Duration, executionName: String, params: Map[String, String]) = s"Execution [$executionName] succeeded after ${elapsed.toMillis} ms.${formatParams(params)}"
def failed(elapsed: Duration, error: Throwable, executionName: String) = s"Execution [$executionName] failed after ${elapsed.toMillis} ms."
def timeoutWhileInQueue(timeInQueue: Duration, e: TimeoutException, executionName: String) = s"Execution [$executionName] timed out after waiting ${timeInQueue.toMillis} in queue."

private def formatParams(params: Map[String, String]) =
if (params.nonEmpty) {
val p = params.map(p => s"${p._1}=[${p._2}]")
s" Additional params: ${p.mkString(", ")}"
} else ""
}
35 changes: 26 additions & 9 deletions src/test/scala/com/wix/async/FuturePerfectTest.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
package com.wix.async

import com.twitter.util.{Future, CountDownLatch, TimeoutException, Await}
import FuturePerfect._
import java.util.concurrent.Executors
import org.specs2.mutable.SpecificationWithJUnit
import org.specs2.time.NoTimeConversions
import org.jmock.lib.concurrent.DeterministicScheduler
import org.specs2.matcher._
import org.specs2.mock.Mockito
import org.specs2.mutable.SpecificationWithJUnit
import org.specs2.specification.Scope
import scala.concurrent.duration.Duration
import org.specs2.time.NoTimeConversions
import scala.concurrent.duration._
import FuturePerfect._
import org.specs2.matcher._
import org.jmock.lib.concurrent.DeterministicScheduler
import com.twitter.util.{CountDownLatch, TimeoutException, Await}

/**
* @author shaiyallin
Expand All @@ -32,7 +31,7 @@ class FuturePerfectTest extends SpecificationWithJUnit with Mockito with NoTimeC

Thread.currentThread().getId must_!= asyncThreadId
}

"succeed when blocking function succeeds" in new AsyncScope {
val f = execution {
bar.succeed()
Expand All @@ -53,6 +52,13 @@ class FuturePerfectTest extends SpecificationWithJUnit with Mockito with NoTimeC
there was one(reporter).report(matchA[Successful].executionName("foo"))
}

"report success with additional params" in new AsyncScope {
val someParams = (u: Unit) => Map("foo" -> "bar")
Await.result(execution(timeout = timeout, name = "foo", paramsExtractor = someParams) { /* do nothing on purpose */ })

there was one(reporter).report(matchA[Successful].params(someParams()))
}

"report time spent in queue" in new AsyncScope {
Await.result(execution(timeout) { /* do nothing on purpose */ })

Expand Down Expand Up @@ -94,6 +100,17 @@ class FuturePerfectTest extends SpecificationWithJUnit with Mockito with NoTimeC
there was one(reporter).report(matchA[GaveUp])
}

"timeout when blocking function stalls with expected params" in new AsyncScope {
val someParams = (b: Boolean) => Map("Value" -> b.toString, "StaticValue" -> "foo")
val f = execution(timeout = timeout, paramsExtractor = someParams) {
bar.await()
}
Await.result(f) must throwA[TimeoutGaveUpException]
bar.release()

there was one(reporter).report(matchA[ExceededTimeout].params(someParams(true)))
}

"retry on timeout" in new AsyncScope {
val f = execution(timeout, RetryPolicy(retries = 1)) {
bar.sleepDecreasing(150)
Expand Down Expand Up @@ -158,7 +175,7 @@ class FuturePerfectTest extends SpecificationWithJUnit with Mockito with NoTimeC

sc.Await.result(execution {true}, 100 millis) must beTrue
}

}

class CustomExecption(cause: Throwable) extends RuntimeException(cause)
Expand Down
28 changes: 28 additions & 0 deletions src/test/scala/com/wix/async/LoggerReportingMessagesTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.wix.async

import org.specs2.mutable.SpecificationWithJUnit
import org.specs2.specification.Scope
import org.specs2.time.NoTimeConversions
import scala.concurrent.duration._

class LoggerReportingMessagesTest extends SpecificationWithJUnit with NoTimeConversions {

trait Context extends Scope with LoggerReportingMessages

"logger reporting messages" should {
"correctly format params in ExceededTimeout message with params" in new Context {
val duration = 1 second
val name = "exec name"
val params = Map("Value1" -> "10.0", "Value2" -> "example.com")
val message = exceededTimeout(duration, name, params)
message must_== s"Execution [$name] timed out, actual duration was ${duration.toMillis} ms. Additional params: Value1=[10.0], Value2=[example.com]"
}

"correctly format params in ExceededTimeout message without params" in new Context {
val duration = 1 second
val name = "exec name"
val message = exceededTimeout(duration, name, Map())
message must_== s"Execution [$name] timed out, actual duration was ${duration.toMillis} ms."
}
}
}

0 comments on commit d8e4ef5

Please sign in to comment.