Skip to content

Commit

Permalink
finagle-mux: rewrite server dispatcher
Browse files Browse the repository at this point in the history
Problem

My previous change to add metrics made it very obvious that
the Mux server dispatcher was becoming increasingly difficult
to reason about, especially in terms of session state.

Solution

Rewrite the server dispatcher in a more sustainable way. Indeed
this process found more races. Among the changes are:

 *	Separate out request processing from session management
 *	Remove unnecessary configuration (canDispatch, ping), and
	move their functionality into a filter chain.
 *	Introduce a separate transaction tracker that manages draining.
 *	Introduce a fine-grained session state so that we can tell the
	difference between draining, client, and server hangups.

This also introduces 4 new metrics to track draining stats:

**draining**
  the number of times the server has initiated session draining

**drained**
  the number of times the server has succesfully completed the draining protocol within its allotted time

**clienthangup**
  the number of times sessions have been abruptly terminated by the client

**serverhangup**
  the number of times sessions have been abruptly terminated by the server

Result

Much easier to reason about the server dispatcher code and session states.

Smokestack

old:
	mux	1000000	16555 bytes/iter
	muxsess	1000000	30348 bytes/iter

new:
	mux	1000000	16867 bytes/iter
	muxsess	1000000	31555 bytes/iter

RB_ID=580754
  • Loading branch information
mariusae authored and jenkins committed Apr 6, 2015
1 parent 02704ce commit dc6bd22
Show file tree
Hide file tree
Showing 11 changed files with 392 additions and 290 deletions.
9 changes: 9 additions & 0 deletions doc/src/sphinx/Metrics.rst
Expand Up @@ -101,3 +101,12 @@ Understanding these stats often requires deep knowledge of the protocol, or indi
(e.g. Netty) internals.

.. include:: metrics/Transport.rst

Mux
---

.. _mux_stats:

These stats pertain to :ref:`Mux <mux>`.

.. include:: metrics/Mux.rst
12 changes: 12 additions & 0 deletions doc/src/sphinx/metrics/Mux.rst
@@ -0,0 +1,12 @@
**draining**
the number of times the server has initiated session draining

**drained**
the number of times the server has succesfully completed the draining protocol within its allotted time

**clienthangup**
the number of times sessions have been abruptly terminated by the client

**serverhangup**
the number of times sessions have been abruptly terminated by the server

@@ -1,6 +1,7 @@
package com.twitter.finagle.pool

import com.twitter.finagle._
import com.twitter.finagle.client.StackClient
import com.twitter.finagle.service.FailedService
import com.twitter.finagle.stats.{NullStatsReceiver, StatsReceiver}
import com.twitter.util.{Future, Return, Throw, Time, Promise}
Expand All @@ -9,7 +10,7 @@ import scala.annotation.tailrec
import scala.collection.immutable

private[finagle] object SingletonPool {
val role = Stack.Role("SingletonPool")
val role = StackClient.Role.pool

/**
* Creates a [[com.twitter.finagle.Stackable]] [[com.twitter.finagle.pool.SingletonPool]].
Expand Down
Expand Up @@ -235,14 +235,16 @@ class QueueTransport[In, Out](writeq: AsyncQueue[In], readq: AsyncQueue[Out])
writeq.offer(input)
Future.Done
}

def read(): Future[Out] =
readq.poll() onFailure { exc =>
closep.setValue(exc)
closep.updateIfEmpty(Throw(exc))
}

def status = if (closep.isDefined) Status.Closed else Status.Open
def close(deadline: Time) = {
val ex = new IllegalStateException("close() is undefined on QueueTransport")
closep.updateIfEmpty(Throw(ex))
closep.updateIfEmpty(Return(ex))
Future.exception(ex)
}

Expand Down
9 changes: 7 additions & 2 deletions finagle-mux/src/main/scala/com/twitter/finagle/Mux.scala
Expand Up @@ -102,6 +102,11 @@ object Mux extends Client[mux.Request, mux.Response] with Server[mux.Request, mu

protected type In = CB
protected type Out = CB

private[this] val statsReceiver = {
val param.Stats(statsReceiver) = params[param.Stats]
statsReceiver.scope("mux")
}

protected def newListener(): Listener[In, Out] =
Netty3Listener(mux.PipelineFactory, params)
Expand All @@ -111,8 +116,8 @@ object Mux extends Client[mux.Request, mux.Response] with Server[mux.Request, mu
) = {
val param.Tracer(tracer) = params[param.Tracer]
val Lessor.Param(lessor) = params[Lessor.Param]
def ping() = Future.Done
new mux.ServerDispatcher(transport, service, true, lessor, tracer, ping)

mux.ServerDispatcher.newRequestResponse(transport, service, lessor, tracer, statsReceiver)
}
}

Expand Down

0 comments on commit dc6bd22

Please sign in to comment.