Skip to content

Commit

Permalink
improve test suite performance by ~70%
Browse files Browse the repository at this point in the history
  • Loading branch information
levkhomich committed Jan 8, 2015
1 parent 8996e3f commit 340952b
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ trait MockCollector { this: Specification =>
var collector: TServer = startCollector()
val results = new ConcurrentLinkedQueue[thrift.LogEntry]()

val MaxAwaitTimeout = 14000
val AwaitTimeout = 4000
val AwaitStep = 20

def startCollector(): TServer = {
val handler = new thrift.Scribe.Iface {
override def Log(messages: util.List[LogEntry]): ResultCode = {
Expand All @@ -64,10 +68,18 @@ trait MockCollector { this: Specification =>
println("collector: stopped")
}
}).start()
Thread.sleep(3000)
Thread.sleep(100)
collector
}

def awaitSpanSubmission(): Unit = {
val prevSize = results.size()
val start = System.currentTimeMillis
while (results.size == prevSize && System.currentTimeMillis - start < MaxAwaitTimeout) {
Thread.sleep(AwaitStep)
}
}

def decodeSpan(logEntryMessage: String): thrift.Span = {
val protocolFactory = new TBinaryProtocol.Factory()
val thriftBytes = DatatypeConverter.parseBase64Binary(logEntryMessage.dropRight(1))
Expand All @@ -79,20 +91,35 @@ trait MockCollector { this: Specification =>
}

def receiveSpan(): thrift.Span = {
Thread.sleep(4000)
val start = System.currentTimeMillis
while (results.size < 1 && System.currentTimeMillis - start < MaxAwaitTimeout) {
Thread.sleep(AwaitStep)
}
val spans = results.map(e => decodeSpan(e.message))
spans.size mustEqual 1
results.clear()
spans.size mustEqual 1
spans.head
}

def receiveSpans(): List[thrift.Span] = {
Thread.sleep(4000)
Thread.sleep(AwaitTimeout)
val spans = results.map(e => decodeSpan(e.message))
results.clear()
spans.toList
}

def expectSpans(count: Int): MatchResult[_] = {
val start = System.currentTimeMillis
if (count == 0)
Thread.sleep(AwaitTimeout)
while (results.size < count && System.currentTimeMillis - start < MaxAwaitTimeout) {
Thread.sleep(AwaitStep)
}
val checkResult = results.size mustEqual count
results.clear()
checkResult
}

private[this] def checkBinaryAnnotationInt[T](span: thrift.Span, key: String, expValue: T)(f: Array[Byte] => T): MatchResult[Any] = {
span.binary_annotations.find(_.get_key == key) match {
case Some(ba) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class PerformanceSpec extends Specification with TracingTestCommons with Tracing
println(s"benchmark: TPS = $tracesPerSecond")

tracesPerSecond must beGreaterThan(ExpectedTPS.toLong)
receiveSpans().size must beEqualTo(SpanCount / sampleRate)
expectSpans(SpanCount / sampleRate)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class TracingExtensionSpec extends Specification with TracingTestCommons with Tr
def generateTracesWithSampleRate(count: Int, sampleRate: Int): Unit = {
val system = testActorSystem(sampleRate)
generateTraces(count, TracingExtension(system))
Thread.sleep(4000)
awaitSpanSubmission()
system.shutdown()
system.awaitTermination(FiniteDuration(5, SECONDS)) must not(throwA[TimeoutException])
}
Expand All @@ -50,17 +50,17 @@ class TracingExtensionSpec extends Specification with TracingTestCommons with Tr
generateTracesWithSampleRate(60, 2)
generateTracesWithSampleRate(500, 5)

receiveSpans().size must beEqualTo(132)
expectSpans(132)
}

"allow forced sampling" in {
val system = testActorSystem(sampleRate = Int.MaxValue)
generateForcedTraces(100, TracingExtension(system))
Thread.sleep(3000)
awaitSpanSubmission()
system.shutdown()
system.awaitTermination(FiniteDuration(5, SECONDS)) must not(throwA[TimeoutException])

receiveSpans().size must beEqualTo(100)
expectSpans(100)
}

def testTraceRecording(f: TracingSupport => Unit)(check: thrift.Span => MatchResult[_]): MatchResult[_] = {
Expand Down Expand Up @@ -178,48 +178,46 @@ class TracingExtensionSpec extends Specification with TracingTestCommons with Tr
for (_ <- 0 until messageCount) {
testActor ? nextRandomMessage
}
receiveSpans().size must beEqualTo(messageCount)
expectSpans(messageCount)
}

"handle collector connectivity problems" in {
// collector won't stop until some message's arrival
generateTraces(1, trace)
collector.stop()

Thread.sleep(3000)
awaitSpanSubmission()
results.clear()

generateTraces(100, trace)

// wait for submission while collector is down
Thread.sleep(3000)
awaitSpanSubmission()

collector = startCollector()

// extension should wait for some time before retrying
receiveSpans().size must beEqualTo(0)
Thread.sleep(4000)

receiveSpans().size must beEqualTo(100)
expectSpans(0)
expectSpans(100)
}

"limit size of span submission buffer" in {
// collector won't stop until some message's arrival
generateTraces(1, trace)
collector.stop()

Thread.sleep(3000)
awaitSpanSubmission()
results.clear()

generateTraces(10000, trace)

// wait for submission while collector is down
Thread.sleep(15000)
// wait for submission while collector is down (it can be 2 batches)
expectSpans(0)
expectSpans(0)

collector = startCollector()
Thread.sleep(10000)

receiveSpans().size must beEqualTo(1000)
expectSpans(1000)
}

"flush traces before stop" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class TracingLoggerSpec extends Specification with TracingTestCommons with Traci
testActor ! nextRandomMessage
}

Thread.sleep(3000)
awaitSpanSubmission()

val spans = receiveSpans()

Expand Down
1 change: 0 additions & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ object AkkaTracingBuild extends Build {
CoverallsPlugin.projectSettings ++
mimaDefaultSettings ++
Seq(
parallelExecution in Test := false,
// TODO: check why %% doesn't work
previousArtifact := Some(organization.value % (moduleName.value + '_' + scalaBinaryVersion.value) % "0.4"),
scalacOptions in Test ++= Seq("-Yrangepos"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class ForcedSamplingSpec extends Specification with TracingTestCommons
response.status mustEqual StatusCodes.OK
}
}
receiveSpans().size mustEqual SpanCount
expectSpans(SpanCount)
}

"force sampling of requests with X-B3-Flags containing Debug flag" in {
Expand All @@ -55,7 +55,7 @@ class ForcedSamplingSpec extends Specification with TracingTestCommons
response.status mustEqual StatusCodes.OK
}
}
receiveSpans().size mustEqual SpanCount
expectSpans(SpanCount)
}

}
Expand All @@ -71,7 +71,7 @@ class ForcedSamplingSpec extends Specification with TracingTestCommons
response.status mustEqual StatusCodes.OK
}
}
receiveSpans().size mustEqual SpanCount
expectSpans(SpanCount)
}

"force sampling of requests with X-B3-Flags containing Debug flag" in {
Expand All @@ -82,7 +82,7 @@ class ForcedSamplingSpec extends Specification with TracingTestCommons
response.status mustEqual StatusCodes.OK
}
}
receiveSpans().size mustEqual SpanCount
expectSpans(SpanCount)
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class TracingDirectivesSpec extends Specification with TracingTestCommons
HttpResponse(StatusCodes.OK)
}) ~> check {
response.status mustEqual statusCode
receiveSpans().size mustEqual 0
expectSpans(0)
}
}

Expand All @@ -141,7 +141,7 @@ class TracingDirectivesSpec extends Specification with TracingTestCommons
"not sample requests without tracing headers" in {
Get(testPath) ~> tracedCompleteRoute ~> check {
response.status mustEqual StatusCodes.OK
receiveSpans().size mustEqual 0
expectSpans(0)
}
}

Expand Down

0 comments on commit 340952b

Please sign in to comment.