Skip to content

Commit

Permalink
finagle-core: Removing "pipelining/pending" stat
Browse files Browse the repository at this point in the history
Summary: Problem  / Solution

The "pipelining/pending" stat is not helpful; we already
have a "pending" stat, and because pending accounting
is different in the push/non-push pipelining implementations,
it's confusing for users. Remove it.

JIRA Issues: CSL-5583

Differential Revision: https://phabricator.twitter.biz/D113424
  • Loading branch information
jcrossley authored and jenkins committed Nov 28, 2017
1 parent fb37aed commit 0d162d1
Show file tree
Hide file tree
Showing 6 changed files with 19 additions and 29 deletions.
3 changes: 3 additions & 0 deletions CHANGES
Expand Up @@ -21,6 +21,9 @@ Runtime Behavior Changes
added to finagle-http2 via finagle-http. It is no longer part of the
finagle-{memcached,mysql,redis} client stacks. ``PHAB_ID=D111722``

* finagle-core: The "pipelining/pending" stat has been removed from protocols
using `c.t.f.dispatch.PipeliningClientDispatcher`. Refer to the "pending" stat
for the number of outstanding requests. ``PHAB_ID=D113424``

Deprecations
~~~~~~~~~~~~
Expand Down
3 changes: 0 additions & 3 deletions doc/src/sphinx/Metrics.rst
Expand Up @@ -142,9 +142,6 @@ queueing rules.
A gauge used by serial dispatchers that can only have a single request
per connection at a time that represents the number of pending requests.

**pipelining/pending**
A gauge used by pipelining dispatchers that represents how many
pipelined requests are currently outstanding.

Thread Usage
------------
Expand Down
Expand Up @@ -37,10 +37,8 @@ abstract class GenPipeliningDispatcher[Req, Rep, In, Out, T](
private[this] var stalled = false
private[this] val q = new AsyncQueue[Pending[T, Rep]]

private[this] val queueSize =
statsReceiver.scope("pipelining").addGauge("pending") {
q.size
}
// exposed for testing
private[dispatch] def queueSize: Int = q.size

private[this] val transRead: Try[Pending[T, Rep]] => Unit = {
case Return(p) =>
Expand Down
Expand Up @@ -34,10 +34,8 @@ private[finagle] final class PipeliningClientPushSession[In, Out](
@volatile private[this] var queueSize: Int = 0 // avoids synchronization on `queue`
@volatile private[this] var running: Boolean = true

private[this] val queueSizeGauge =
statsReceiver.scope("pipelining").addGauge("pending") {
queueSize
}
// exposed for testing
private[pushsession] def getQueueSize: Int = queueSize

handle.onClose.respond { result =>
if (running) handle.serialExecutor.execute(new Runnable {
Expand Down
Expand Up @@ -94,13 +94,10 @@ class PipeliningDispatcherTest extends FunSuite with MockitoSugar {
}
}

test("queue_size gauge") {
test("queue size") {
val stats = new InMemoryStatsReceiver()
val timer = new MockTimer

def assertGaugeSize(size: Int): Unit =
assert(size == stats.gauges(Seq("pipelining", "pending"))())

val p0, p1, p2 = new Promise[String]()
val trans = mock[Transport[String, String]]
when(trans.write(any[String])).thenReturn(Future.Done)
Expand All @@ -112,7 +109,7 @@ class PipeliningDispatcherTest extends FunSuite with MockitoSugar {
when(trans.onClose).thenReturn(closeP)
val dispatcher = new PipeliningDispatcher[String, String](trans, stats, 10.seconds, timer)

assertGaugeSize(0)
assert(dispatcher.queueSize == 0)

// issue 3 pipelined requests that immediately get
// written to the transport, and thus put into the queue.
Expand All @@ -121,17 +118,17 @@ class PipeliningDispatcherTest extends FunSuite with MockitoSugar {
dispatcher("0")
dispatcher("1")
dispatcher("2")
assertGaugeSize(2) // as noted above, the "0" has been removed from the queue
assert(dispatcher.queueSize == 2) // as noted above, the "0" has been removed from the queue

// then even if we fulfil them out of order...
p2.setValue("2")
assertGaugeSize(2)
assert(dispatcher.queueSize == 2)

// this will complete 0, triggering 1 to be removed from the q.
p0.setValue("0")
assertGaugeSize(1)
assert(dispatcher.queueSize == 1)

p1.setValue("1")
assertGaugeSize(0)
assert(dispatcher.queueSize == 0)
}
}
Expand Up @@ -110,13 +110,10 @@ class PipeliningClientPushSessionTest extends FunSuite with MockitoSugar {
}
}

test("queue_size gauge") {
test("queue size") {
val stats = new InMemoryStatsReceiver()
val timer = new MockTimer

def assertGaugeSize(size: Int): Unit =
assert(size == stats.gauges(Seq("pipelining", "pending"))())

var p0, p1, p2 = new Promise[String]()
val handle = new PipeliningMockChannelHandle[String, String]()
val session =
Expand All @@ -127,23 +124,23 @@ class PipeliningClientPushSessionTest extends FunSuite with MockitoSugar {
timer
)
val service = session.toService
assertGaugeSize(0)
assert(session.getQueueSize == 0)

service("0")
service("1")
service("2")
handle.serialExecutor.executeAll()
assertGaugeSize(3)
assert(session.getQueueSize == 3)

session.receive("resp")
assertGaugeSize(2)
assert(session.getQueueSize == 2)


session.receive("resp")
assertGaugeSize(1)
assert(session.getQueueSize == 1)

session.receive("resp")
assertGaugeSize(0)
assert(session.getQueueSize == 0)
}

}

0 comments on commit 0d162d1

Please sign in to comment.