Skip to content

Commit

Permalink
finagle-core: Fix bug in scheduler metrics
Browse files Browse the repository at this point in the history
Problem

In rb 828289 I broke the "scheduler/dispatches" and
"scheduler/blocking_ms" gauges.

Solution

Keep a reference to the gauges by mutating the `gauges` list and add a
test to verify this.

RB_ID=854494
  • Loading branch information
kevinoliver authored and jenkins committed Jul 22, 2016
1 parent b65fda6 commit 90f2180
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 11 deletions.
11 changes: 5 additions & 6 deletions util-core/src/main/scala/com/twitter/concurrent/Scheduler.scala
Expand Up @@ -229,10 +229,10 @@ class LocalScheduler(lifo: Boolean) extends Scheduler {
activation
}

/** An implementaiton of Iterator over runnable tasks */
/** An implementation of Iterator over runnable tasks */
@inline def hasNext: Boolean = get().hasNext

/** An implementaiton of Iterator over runnable tasks */
/** An implementation of Iterator over runnable tasks */
@inline def next(): Runnable = get().next()

// Scheduler implementation:
Expand Down Expand Up @@ -262,12 +262,11 @@ trait ExecutorScheduler { self: Scheduler =>
val executorFactory: ThreadFactory => ExecutorService

protected val threadGroup: ThreadGroup = new ThreadGroup(name)
@volatile private[this] var threads = Set[Thread]()

protected val threadFactory: ThreadFactory = new ThreadFactory {
private val n = new AtomicInteger(1)

def newThread(r: Runnable) = {
def newThread(r: Runnable): Thread = {
val thread = new Thread(threadGroup, r, name + "-" + n.getAndIncrement())
thread.setDaemon(true)
thread
Expand All @@ -280,10 +279,10 @@ trait ExecutorScheduler { self: Scheduler =>
// don't try too hard.
val threads = new Array[Thread](threadGroup.activeCount*2)
val n = threadGroup.enumerate(threads)
threads take n
threads.take(n)
}

protected[this] val executor = executorFactory(threadFactory)
protected[this] val executor: ExecutorService = executorFactory(threadFactory)

def shutdown(): Unit = executor.shutdown()
def submit(r: Runnable): Unit = executor.execute(r)
Expand Down
Expand Up @@ -2,15 +2,16 @@ package com.twitter.concurrent

import com.twitter.conversions.time._
import com.twitter.util._
import java.util.concurrent.{Executors, TimeUnit}
import org.junit.runner.RunWith
import org.scalatest.FunSuite
import org.scalatest.concurrent.{Eventually, IntegrationPatience}
import org.scalatest.junit.JUnitRunner

abstract class LocalSchedulerTest(lifo: Boolean) extends FunSuite {
private val scheduler = new LocalScheduler(lifo)
def submit(f: => Unit) = scheduler.submit(new Runnable {
def run() = f
def submit(f: => Unit): Unit = scheduler.submit(new Runnable {
def run(): Unit = f
})

val N = 100
Expand Down Expand Up @@ -50,9 +51,9 @@ abstract class LocalSchedulerTest(lifo: Boolean) extends FunSuite {
}
}
if (lifo)
assert(ran == (0 until N))
assert(ran == 0.until(N).toList)
else
assert(ran == (0 until N).reverse)
assert(ran == 0.until(N).reverse.toList)
}

test("tracks blocking time") {
Expand Down Expand Up @@ -83,6 +84,29 @@ abstract class LocalSchedulerTest(lifo: Boolean) extends FunSuite {
Scheduler.setUnsafe(prevScheduler)
}
}

test("numDispatches") {
val runnable = new Runnable {
def run(): Unit = ()
}
val start = scheduler.numDispatches

// verify increments are seen by the calling thread
scheduler.submit(runnable)
assert(start + 1 == scheduler.numDispatches)

// verify increments are seen by a different thread
val exec = Executors.newCachedThreadPool()
val result = exec.submit {
new Runnable {
def run(): Unit = scheduler.submit(runnable)
}
}
exec.shutdown()
result.get(5, TimeUnit.SECONDS)
assert(start + 2 == scheduler.numDispatches)
}

}

@RunWith(classOf[JUnitRunner])
Expand All @@ -97,7 +121,7 @@ class ThreadPoolSchedulerTest extends FunSuite with Eventually with IntegrationP
val p = new Promise[Unit]
val scheduler = new ThreadPoolScheduler("test")
scheduler.submit(new Runnable {
def run() { p.setDone() }
def run(): Unit = p.setDone()
})

eventually { p.isDone }
Expand Down

0 comments on commit 90f2180

Please sign in to comment.