Skip to content

Commit

Permalink
finatra/inject: Simpler customizing timeout for inMemoryStats
Browse files Browse the repository at this point in the history
Problem:

The only way to set a custom timeout for a metric is with a predicate
`T => Boolean`. If users want to check whether a metric equals a specific
value, it is simpler to pass the expected value instead of wrapping it in a
predicate like `(_ = 1.0f)`.

Solution:

Add a new, non-predicate version of waitFor that waits for an expected
value within a specific timeout. This non-dynamic case keeps waitFor consistent
with other methods that assert or validate expected values.

JIRA Issues: CSL-9951

Differential Revision: https://phabricator.twitter.biz/D576147
  • Loading branch information
tigerlily-he authored and jenkins committed Nov 20, 2020
1 parent 9667845 commit 306b719
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 29 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.rst
Expand Up @@ -10,6 +10,9 @@ Unreleased
Added
~~~~~

* inject-core: Add ability to call `InMemoryStats#waitFor` with a fixed timeout
``PHAB_ID=D576147``

* finatra: Enables cross-build for 2.13.0 for httpclient, http, and jackson. ``PHAB_ID=D574391``

Changed
Expand Down
Expand Up @@ -86,7 +86,7 @@ trait InMemoryStats[T] {
/**
* Waits for the metric represented by the given name to
* equal the given expected value. If a metric value is not found
* or does not match the expected a [[org.scalatest.exceptions.TestFailedDueToTimeoutException]]
* or does not match the expected value, a [[org.scalatest.exceptions.TestFailedDueToTimeoutException]]
* will be thrown.
*
* @note The default timeout is 150ms. To specify a different timeout, users should use
Expand All @@ -98,13 +98,14 @@ trait InMemoryStats[T] {
*
* @see [[org.scalatest.concurrent.Eventually.eventually]]
* @see [[waitFor(name: String, predicate: T => Boolean, timeout: Duration)]]
* @see [[waitFor(name: String, expected: T, timeout: Duration)]]
*/
def waitFor(name: String, expected: T): Unit

/**
* Waits for the metric represented by the given name to
* equal the given expected value. If a metric value is not found
* or does not match the expected a [[org.scalatest.exceptions.TestFailedDueToTimeoutException]]
* or does not match the expected value, a [[org.scalatest.exceptions.TestFailedDueToTimeoutException]]
* will be thrown.
*
* @param name the identifier of the metric to lookup
Expand All @@ -118,4 +119,19 @@ trait InMemoryStats[T] {
*/
def waitFor(name: String, timeout: Duration)(predicate: T => Boolean): Unit

/**
* Waits for the metric represented by the given name to
* equal the given expected value. If a metric value is not found
* or does not match the expected value, a [[org.scalatest.exceptions.TestFailedDueToTimeoutException]]
* will be thrown.
*
* @param name the identifier of the metric to lookup
* from the underlying [[com.twitter.finagle.stats.InMemoryStatsReceiver]].
* @param expected the expected value of the metric
* @param timeout the [[com.twitter.util.Duration]] to wait for the retrieval of the counter and
* the predicate to return `True`.
*
* @see [[org.scalatest.concurrent.Eventually.eventually]]
*/
def waitFor(name: String, expected: T, timeout: Duration): Unit
}
Expand Up @@ -145,26 +145,27 @@ class InMemoryStatsReceiverUtility(inMemoryStatsReceiver: InMemoryStatsReceiver)
/**
* Waits for the [[com.twitter.finagle.stats.Counter]] represented by the given name to
* equal the given expected value. If a [[com.twitter.finagle.stats.Counter]] value is not found
* or does not match the expected a [[org.scalatest.exceptions.TestFailedDueToTimeoutException]]
* will be thrown.
* or does not match the expected value, a
* [[org.scalatest.exceptions.TestFailedDueToTimeoutException]] will be thrown.
*
* @note The default timeout is 150ms. To specify a different timeout, users should use
* [[waitFor(name: String, timeout: Duration)(predicate: Long => Boolean)]].
* [[waitFor(name: String, expected: Long, timeout: Duration)]].
*
* @param name the identifier of the [[com.twitter.finagle.stats.Counter]] to lookup
* from the underlying [[InMemoryStatsReceiver]].
* @param expected the expected value of the [[com.twitter.finagle.stats.Counter]].
*
* @see [[org.scalatest.concurrent.Eventually.eventually]]
* @see [[waitFor(name: String, predicate: Long => Boolean, timeout: Duration)]]
* @see [[waitFor(name: String, expected: Long, timeout: Duration)]]
*/
def waitFor(name: String, expected: Long): Unit = this.assertCounter(name, expected)
def waitFor(name: String, expected: Long): Unit = this.assertCounter(name, expected, 150.millis)

/**
* Waits for the [[com.twitter.finagle.stats.Counter]] represented by the given name to
* equal the given expected value. If a [[com.twitter.finagle.stats.Counter]] value is not found
* or does not match the expected a [[org.scalatest.exceptions.TestFailedDueToTimeoutException]]
* will be thrown.
* or does not match the expected value, a
* [[org.scalatest.exceptions.TestFailedDueToTimeoutException]] will be thrown.
*
* @param name the identifier of the [[com.twitter.finagle.stats.Counter]] to lookup
* from the underlying [[InMemoryStatsReceiver]].
Expand All @@ -174,10 +175,31 @@ class InMemoryStatsReceiverUtility(inMemoryStatsReceiver: InMemoryStatsReceiver)
* which is expected to return `True`.
*
* @see [[org.scalatest.concurrent.Eventually.eventually]]
* @see [[waitFor(name: String, expected: Long)]]
* @see [[waitFor(name: String, expected: Long, timeout: Duration)]]
*/
def waitFor(name: String, timeout: Duration = 150.millis)(predicate: Long => Boolean): Unit =
this.assertCounter(name, predicate, timeout)

/**
* Waits for the [[com.twitter.finagle.stats.Counter]] represented by the given name to
* equal the given expected value. If a [[com.twitter.finagle.stats.Counter]] value is not found
* or does not match the expected value, a
* [[org.scalatest.exceptions.TestFailedDueToTimeoutException]] will be thrown.
*
* @param name the identifier of the [[com.twitter.finagle.stats.Counter]] to lookup
* from the underlying [[InMemoryStatsReceiver]].
* @param expected the expected value of the [[com.twitter.finagle.stats.Counter]].
* @param timeout the [[com.twitter.util.Duration]] to wait for the retrieval of the counter and
* the predicate to return `True`.
*
* @see [[org.scalatest.concurrent.Eventually.eventually]]
* @see [[waitFor(name: String, predicate: Long => Boolean, timeout: Duration)]]
* @see [[waitFor(name: String, expected: Long)]]
*/
def waitFor(name: String, expected: Long, timeout: Duration): Unit =
this.assertCounter(name, expected, timeout)

private def assertCounter(
name: String,
predicate: Long => Boolean,
Expand All @@ -192,7 +214,7 @@ class InMemoryStatsReceiverUtility(inMemoryStatsReceiver: InMemoryStatsReceiver)
private def assertCounter(
name: String,
expectedValue: Long,
timeout: Duration = 150.millis
timeout: Duration
): Assertion = eventually(timeout) {
val actualValue: Long = this.apply(name, verbose = false)
withClue(s"""Expected "$expectedValue" for counter "$name" but got "$actualValue"""") {
Expand Down Expand Up @@ -314,24 +336,26 @@ class InMemoryStatsReceiverUtility(inMemoryStatsReceiver: InMemoryStatsReceiver)
/**
* Waits for the [[com.twitter.finagle.stats.Stat]] represented by the given name to
* equal the given expected value. If a metric value is not found
* or does not match the expected a [[org.scalatest.exceptions.TestFailedDueToTimeoutException]]
* will be thrown.
* or does not match the expected value, a
* [[org.scalatest.exceptions.TestFailedDueToTimeoutException]] will be thrown.
*
* @note The default timeout is 150ms. To specify a different timeout, users should use
* [[waitFor(name: String, timeout: Duration)(predicate: T => Boolean)]].
* [[waitFor(name: String, expected: Seq[Float], timeout: Duration)]].
* @param name the identifier of the metric to lookup
* from the underlying [[InMemoryStatsReceiver]].
* @param expected the expected value of the metric.
*
* @see [[org.scalatest.concurrent.Eventually.eventually]]
* @see [[waitFor(name: String, predicate: Seq[Float] => Boolean, timeout: Duration)]]
* @see [[waitFor(name: String, expected: Seq[Float], timeout: Duration)]]
*/
override def waitFor(name: String, expected: Seq[Float]): Unit = this.assertStat(name, expected)
override def waitFor(name: String, expected: Seq[Float]): Unit =
this.assertStat(name, expected, 150.millis)

/**
* Waits for the metric represented by the given name to
* equal the given expected value. If a metric value is not found
* or does not match the expected a [[org.scalatest.exceptions.TestFailedDueToTimeoutException]]
* or does not match the expected value, a [[org.scalatest.exceptions.TestFailedDueToTimeoutException]]
* will be thrown.
*
* @param name the identifier of the metric to lookup
Expand All @@ -342,6 +366,8 @@ class InMemoryStatsReceiverUtility(inMemoryStatsReceiver: InMemoryStatsReceiver)
* which is expected to return `True`.
*
* @see [[org.scalatest.concurrent.Eventually.eventually]]
* @see [[waitFor(name: String, expected: Seq[Float])]]
* @see [[waitFor(name: String, expected: Seq[Float], timeout: Duration)]]
*/
override def waitFor(
name: String,
Expand All @@ -350,6 +376,23 @@ class InMemoryStatsReceiverUtility(inMemoryStatsReceiver: InMemoryStatsReceiver)
predicate: Seq[Float] => Boolean
): Unit = this.assertStat(name, predicate, timeout)

/**
* Waits for the metric represented by the given name to equal the given expected value. If a
* metric value is not found or does not match the expected value, a
* [[org.scalatest.exceptions.TestFailedDueToTimeoutException]] will be thrown.
* @param name the identifier of the metric to lookup
* from the underlying [[InMemoryStatsReceiver]].
* @param expected the expected value of the metric.
* @param timeout the [[com.twitter.util.Duration]] to wait for the retrieval of the counter and
* the predicate to return `True`.
*
* @see [[org.scalatest.concurrent.Eventually.eventually]]
* @see [[waitFor(name: String, predicate: Seq[Float] => Boolean, timeout: Duration)]]
* @see [[waitFor(name: String, expected: Seq[Float])]]
*/
override def waitFor(name: String, expected: Seq[Float], timeout: Duration): Unit =
this.assertStat(name, expected, timeout)

private def assertStat(
name: String,
predicate: Seq[Float] => Boolean,
Expand All @@ -364,7 +407,7 @@ class InMemoryStatsReceiverUtility(inMemoryStatsReceiver: InMemoryStatsReceiver)
private def assertStat(
name: String,
expectedValue: Seq[Float],
timeout: Duration = 150.millis
timeout: Duration
): Assertion = eventually(timeout) {
val actualValue: Seq[Float] = this.apply(name, verbose = false)
withClue(s"""Expected "$expectedValue" for stat "$name" but got "$actualValue":""") {
Expand Down Expand Up @@ -482,26 +525,27 @@ class InMemoryStatsReceiverUtility(inMemoryStatsReceiver: InMemoryStatsReceiver)
/**
* Waits for the [[com.twitter.finagle.stats.Gauge]] represented by the given name to
* equal the given expected value. If a [[com.twitter.finagle.stats.Gauge]] value is not found
* or does not match the expected a [[org.scalatest.exceptions.TestFailedDueToTimeoutException]]
* will be thrown.
* or does not match the expected value, a
* [[org.scalatest.exceptions.TestFailedDueToTimeoutException]] will be thrown.
*
* @note The default timeout is 150ms. To specify a different timeout, users should use
* [[waitFor(name: String, timeout: Duration)(predicate: Long => Boolean)]].
* [[waitFor(name: String, expected: Float, timeout: Duration)]].
*
* @param name the identifier of the [[com.twitter.finagle.stats.Gauge]] to lookup
* from the underlying [[InMemoryStatsReceiver]].
* @param expected the expected value of the [[com.twitter.finagle.stats.Gauge]].
*
* @see [[org.scalatest.concurrent.Eventually.eventually]]
* @see [[waitFor(name: String, predicate: Long => Boolean, timeout: Duration)]]
* @see [[waitFor(name: String, predicate: Float => Boolean, timeout: Duration)]]
* @see [[waitFor(name: String, expected: Float, timeout: Duration)]]
*/
def waitFor(name: String, expected: Float): Unit = this.assertGauge(name, expected)
def waitFor(name: String, expected: Float): Unit = this.assertGauge(name, expected, 150.millis)

/**
* Waits for the [[com.twitter.finagle.stats.Gauge]] represented by the given name to
* equal the given expected value. If a [[com.twitter.finagle.stats.Gauge]] value is not found
* or does not match the expected a [[org.scalatest.exceptions.TestFailedDueToTimeoutException]]
* will be thrown.
* or does not match the expected value, a
* [[org.scalatest.exceptions.TestFailedDueToTimeoutException]] will be thrown.
*
* @param name the identifier of the [[com.twitter.finagle.stats.Gauge]] to lookup
* from the underlying [[InMemoryStatsReceiver]].
Expand All @@ -511,10 +555,31 @@ class InMemoryStatsReceiverUtility(inMemoryStatsReceiver: InMemoryStatsReceiver)
* which is expected to return `True`.
*
* @see [[org.scalatest.concurrent.Eventually.eventually]]
* @see [[waitFor(name: String, expected: Float)]]
* @see [[waitFor(name: String, expected: Float, timeout: Duration)]]
*/
def waitFor(name: String, timeout: Duration = 150.millis)(predicate: Float => Boolean): Unit =
this.assertGauge(name, predicate, timeout)

/**
* Waits for the [[com.twitter.finagle.stats.Gauge]] represented by the given name to
* equal the given expected value. If a [[com.twitter.finagle.stats.Gauge]] value is not found
* or does not match the expected value, a
* [[org.scalatest.exceptions.TestFailedDueToTimeoutException]] will be thrown.
*
* @param name the identifier of the [[com.twitter.finagle.stats.Gauge]] to lookup
* from the underlying [[InMemoryStatsReceiver]].
* @param expected the expected value of the [[com.twitter.finagle.stats.Gauge]].
* @param timeout the [[com.twitter.util.Duration]] to wait for the retrieval of the counter and
* the predicate to return `True`.
*
* @see [[org.scalatest.concurrent.Eventually.eventually]]
* @see [[waitFor(name: String, predicate: Float => Boolean, timeout: Duration)]]
* @see [[waitFor(name: String, expected: Float)]]
*/
def waitFor(name: String, expected: Float, timeout: Duration): Unit =
this.assertGauge(name, expected, timeout)

private def assertGauge(
name: String,
predicate: Float => Boolean,
Expand All @@ -529,7 +594,7 @@ class InMemoryStatsReceiverUtility(inMemoryStatsReceiver: InMemoryStatsReceiver)
private def assertGauge(
name: String,
expectedValue: Float,
timeout: Duration = 150.millis
timeout: Duration
): Assertion = eventually(timeout) {
val actualValue: Float = this.apply(name, verbose = false)
withClue(s"""Expected "$expectedValue" for gauge "$name" but got "$actualValue":""") {
Expand Down
@@ -0,0 +1,99 @@
package com.twitter.inject.tests

import com.twitter.finagle.stats.InMemoryStatsReceiver
import com.twitter.inject.{InMemoryStatsReceiverUtility, Test}
import com.twitter.conversions.DurationOps._
import com.twitter.util.FuturePool
import org.scalatest.exceptions.TestFailedDueToTimeoutException

class InMemoryStatsReceiverUtilityTest extends Test {
test("InMemoryStatsReceiverUtility#waitFor will fail if the metric is the incorrect value") {
val inMemoryStatsReceiver = new InMemoryStatsReceiver
val statUtils = new InMemoryStatsReceiverUtility(inMemoryStatsReceiver)
// A short timeout is set because the metric will never reach the expected value
val timeout = 10.millis

inMemoryStatsReceiver.counter("counter").incr()
intercept[TestFailedDueToTimeoutException] {
statUtils.counters.waitFor("counter", 2, timeout)
}

inMemoryStatsReceiver.stats(Seq("stats")) = Seq(1.0f, 2.0f, 3.0f)
intercept[TestFailedDueToTimeoutException] {
statUtils.stats.waitFor("stats", Seq(1.0f), timeout)
}

inMemoryStatsReceiver.addGauge("gauge") { 1.0f }
intercept[TestFailedDueToTimeoutException] {
statUtils.gauges.waitFor("gauge", 2.0f, timeout)
}
}

test("InMemoryStatsReceiverUtility#waitFor will fail if metric does not have a declared value") {
val inMemoryStatsReceiver = new InMemoryStatsReceiver
val statUtils = new InMemoryStatsReceiverUtility(inMemoryStatsReceiver)
// A short timeout is set because the metric does not have a declared value
val timeout = 20.millis

inMemoryStatsReceiver.stats(Seq("stats")) = Seq()
intercept[TestFailedDueToTimeoutException] {
statUtils.stats.waitFor("stats", Seq(1.0f), timeout)
}
}

test(
"InMemoryStatsReceiverUtility#waitFor will accept metrics that are already the expected value") {
val inMemoryStatsReceiver = new InMemoryStatsReceiver
val statUtils = new InMemoryStatsReceiverUtility(inMemoryStatsReceiver)
val timeout = 20.millis

inMemoryStatsReceiver.counter("counter").incr(2)
statUtils.counters.waitFor("counter", 2, timeout)

inMemoryStatsReceiver.stats(Seq("stat")) = Seq(1.0f, 2.0f, 3.0f)
statUtils.stats.waitFor("stat", Seq(1.0f, 2.0f, 3.0f), timeout)

inMemoryStatsReceiver.addGauge("gauge") { 2.0f }
statUtils.gauges.waitFor("gauge", 2.0f, timeout)
}

test(
"InMemoryStatsReceiverUtility#waitFor will succeed when metric changes to the expected value within the timeout window") {
val inMemoryStatsReceiver = new InMemoryStatsReceiver
val sr = new InMemoryStatsReceiverUtility(inMemoryStatsReceiver)
val timeout = 100.millis

val counter = inMemoryStatsReceiver.counter("counter")
val stat = inMemoryStatsReceiver.stat("stat")
var gaugeValue: Float = 0.0f
val gauge = inMemoryStatsReceiver.addGauge("gauge") { gaugeValue }

// Change metrics every 20 millis
val increaseCounter = FuturePool.unboundedPool {
for (_ <- 0 to 2) {
Thread.sleep(20)
// counter values: 0L, 1L, 2L, 3L
counter.incr()
}
}
sr.counters.waitFor("counter", 1L, timeout)

val addToHistogram = FuturePool.unboundedPool {
for (s <- 0 to 2) {
Thread.sleep(20)
// histogram values: Seq(0.0f, 1.0f, 2.0f)
stat.add(s)
}
}
sr.stats.waitFor("stat", Seq(0.0f, 1.0f), timeout)

val increaseGauge = FuturePool.unboundedPool {
for (g <- 0 to 2) {
Thread.sleep(20)
// gauge values: 0.0f, 1.0f, 2.0f
gaugeValue = g
}
}
sr.gauges.waitFor("gauge", 1.0f, timeout)
}
}
Expand Up @@ -107,7 +107,7 @@ class WordLookupAsyncServerFeatureTestBase extends KafkaStreamsFeatureTest {
headers = recordHeaders3
)

server.inMemoryStats.gauges.waitFor("kafka/stream/outstandingFutures", 5.seconds)(_ == 3)
server.inMemoryStats.gauges.waitFor("kafka/stream/outstandingFutures", 3, 5.seconds)
server.inMemoryStats.gauges.assert("kafka/stream/outstandingResults", 0)

assertOutputRecordsAndHeaders(
Expand Down

0 comments on commit 306b719

Please sign in to comment.