Skip to content

Commit

Permalink
finagle-core: Split the ClientDispatcher.scala file into parts
Browse files Browse the repository at this point in the history
Problem

We have a lot of different things going on in ClientDispatcher.scala that
don't need to be in the same file.

Solution

Split it into multiple files.

Differential Revision: https://phabricator.twitter.biz/D342883
  • Loading branch information
Bryce Anderson authored and jenkins committed Jul 22, 2019
1 parent 44b11e3 commit 4b0493c
Show file tree
Hide file tree
Showing 15 changed files with 176 additions and 85 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ Note that ``PHAB_ID=#`` and ``RB_ID=#`` correspond to associated messages in com
Unreleased
----------

Breaking API Changes
~~~~~~~~~~~~~~~~~~~~

* finagle-core: The contents of the `c.t.f.dispatch.GenSerialClientDispatcher` object have been
moved to the new `c.t.f.dispatch.ClientDispatcher` object. The stats receiver free constructors
of `GenSerialClientDispatcher` and `SerialClientDispatcher` have been removed.
``PHAB_ID=D342883``

19.7.0
------

Expand Down
Original file line number Diff line number Diff line change
@@ -1,49 +1,26 @@
package com.twitter.finagle.dispatch

import com.twitter.concurrent.AsyncSemaphore
import com.twitter.finagle.stats.{NullStatsReceiver, StatsReceiver}
import com.twitter.finagle.tracing.Trace
import com.twitter.finagle.transport.Transport
import com.twitter.finagle.{Failure, FailureFlags, Service, Status, WriteException}
import com.twitter.util._
import java.net.InetSocketAddress

/**
* Dispatches requests one at a time; concurrent requests are queued.
* Dispatches requests.
*
* @param statsReceiver typically scoped to `clientName/dispatcher`
* @note this construct makes no guarantees with regard to concurrent
* dispatches. For implementations that require serial dispatch see the
* [[GenSerialClientDispatcher]].
*/
abstract class GenSerialClientDispatcher[Req, Rep, In, Out](
trans: Transport[In, Out],
statsReceiver: StatsReceiver)
abstract class ClientDispatcher[Req, Rep, In, Out](trans: Transport[In, Out])
extends Service[Req, Rep] {

def this(trans: Transport[In, Out]) =
this(trans, NullStatsReceiver)

private[this] val semaphore = new AsyncSemaphore(1)

private[this] val queueSize =
statsReceiver.scope("serial").addGauge("queue_size") {
semaphore.numWaiters
}

private[this] val localAddress: InetSocketAddress = trans.context.localAddress match {
private[this] def localAddress: InetSocketAddress = trans.context.localAddress match {
case ia: InetSocketAddress => ia
case _ => new InetSocketAddress(0)
}

// satisfy pending requests on transport close with a retryable failure
trans.onClose.respond { res =>
val exc = res match {
case Return(exc) => exc
case Throw(exc) => exc
}

queueSize.remove() // ensure that we don't leak the gauge
semaphore.fail(Failure.retryable(exc))
}

/**
* Dispatch a request, satisfying Promise `p` with the response;
* the returned Future is satisfied when the dispatch is complete:
Expand Down Expand Up @@ -77,18 +54,10 @@ abstract class GenSerialClientDispatcher[Req, Rep, In, Out](

def apply(req: Req): Future[Rep] = {
val p = new Promise[Rep]

semaphore.acquire().respond {
case Return(permit) =>
tryDispatch(req, p).respond {
case t @ Throw(_) =>
p.updateIfEmpty(t.cast[Rep])
permit.release()
case Return(_) =>
permit.release()
}
tryDispatch(req, p).respond {
case t @ Throw(_) =>
p.update(t.cast[Rep])
p.updateIfEmpty(t.cast[Rep])
case Return(_) =>
}

p
Expand All @@ -99,36 +68,10 @@ abstract class GenSerialClientDispatcher[Req, Rep, In, Out](
override def close(deadline: Time): Future[Unit] = trans.close()
}

object GenSerialClientDispatcher {
object ClientDispatcher {

val StatsScope: String = "dispatcher"

def wrapWriteException(exc: Throwable): Future[Nothing] =
Future.exception(WriteException(exc))
}

/**
* @param statsReceiver typically scoped to `clientName/dispatcher`
*/
class SerialClientDispatcher[Req, Rep](trans: Transport[Req, Rep], statsReceiver: StatsReceiver)
extends GenSerialClientDispatcher[Req, Rep, Req, Rep](trans, statsReceiver) {

import GenSerialClientDispatcher.wrapWriteException

def this(trans: Transport[Req, Rep]) =
this(trans, NullStatsReceiver)

private[this] val tryReadTheTransport: Try[Unit] => Future[Rep] = {
case Return(_) => trans.read()
case Throw(exc) => wrapWriteException(exc)
}

protected def dispatch(req: Req, p: Promise[Rep]): Future[Unit] =
trans
.write(req)
.transform(tryReadTheTransport)
.respond(rep => p.updateIfEmpty(rep))
.unit

protected def write(req: Req): Future[Unit] = trans.write(req)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package com.twitter.finagle.dispatch

import com.twitter.concurrent.AsyncSemaphore
import com.twitter.finagle.{Failure, FailureFlags, Service, Status}
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.tracing.Trace
import com.twitter.finagle.transport.Transport
import com.twitter.util.{Future, Promise, Return, Throw, Time}
import java.net.InetSocketAddress

/**
* Dispatches requests one at a time; concurrent requests are queued.
*
* @param statsReceiver typically scoped to `clientName/dispatcher`
*/
abstract class GenSerialClientDispatcher[Req, Rep, In, Out](
trans: Transport[In, Out],
statsReceiver: StatsReceiver)
extends Service[Req, Rep] {

private[this] val semaphore = new AsyncSemaphore(1)

private[this] val queueSize =
statsReceiver.scope("serial").addGauge("queue_size") {
semaphore.numWaiters
}

private[this] val localAddress: InetSocketAddress = trans.context.localAddress match {
case ia: InetSocketAddress => ia
case _ => new InetSocketAddress(0)
}

// satisfy pending requests on transport close with a retryable failure
trans.onClose.respond { res =>
val exc = res match {
case Return(exc) => exc
case Throw(exc) => exc
}

queueSize.remove() // ensure that we don't leak the gauge
semaphore.fail(Failure.retryable(exc))
}

/**
* Dispatch a request, satisfying Promise `p` with the response;
* the returned Future is satisfied when the dispatch is complete:
* only one request is admitted at any given time.
*
* Note that GenSerialClientDispatcher manages interrupts,
* satisfying `p` should it be interrupted -- implementors beware:
* use only `updateIfEmpty` variants for satisfying the Promise.
*
* GenSerialClientDispatcher will also attempt to satisfy the promise
* if the returned `Future[Unit]` fails.
*/
protected def dispatch(req: Req, p: Promise[Rep]): Future[Unit]

private[this] def tryDispatch(req: Req, p: Promise[Rep]): Future[Unit] =
p.isInterrupted match {
case Some(intr) =>
p.setException(Failure.adapt(intr, FailureFlags.Interrupted))
Future.Done
case None =>
Trace.recordClientAddr(localAddress)

p.setInterruptHandler {
case intr =>
if (p.updateIfEmpty(Throw(intr)))
trans.close()
}

dispatch(req, p)
}

def apply(req: Req): Future[Rep] = {
val p = new Promise[Rep]

semaphore.acquire().respond {
case Return(permit) =>
tryDispatch(req, p).respond {
case t @ Throw(_) =>
p.updateIfEmpty(t.cast[Rep])
permit.release()
case Return(_) =>
permit.release()
}
case t @ Throw(_) =>
p.update(t.cast[Rep])
}

p
}

override def status: Status = trans.status

override def close(deadline: Time): Future[Unit] = trans.close()
}

object GenSerialClientDispatcher {

@deprecated("Use the value in `ClientDispatcher` object instead.", "2019-07-18")
val StatsScope: String = ClientDispatcher.StatsScope

@deprecated("Use the function in `ClientDispatcher` object instead.", "2019-07-18")
def wrapWriteException(exc: Throwable): Future[Nothing] =
ClientDispatcher.wrapWriteException(exc)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.twitter.finagle.dispatch

import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.transport.Transport
import com.twitter.util.{Future, Promise, Return, Throw, Try}

/**
* @param statsReceiver typically scoped to `clientName/dispatcher`
*/
class SerialClientDispatcher[Req, Rep](trans: Transport[Req, Rep], statsReceiver: StatsReceiver)
extends GenSerialClientDispatcher[Req, Rep, Req, Rep](trans, statsReceiver) {

import ClientDispatcher.wrapWriteException

private[this] val tryReadTheTransport: Try[Unit] => Future[Rep] = {
case Return(_) => trans.read()
case Throw(exc) => wrapWriteException(exc)
}

protected def dispatch(req: Req, p: Promise[Rep]): Future[Unit] =
trans
.write(req)
.transform(tryReadTheTransport)
.respond(rep => p.updateIfEmpty(rep))
.unit

protected def write(req: Req): Future[Unit] = trans.write(req)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import com.twitter.finagle.netty4.Netty4Transporter
import com.twitter.finagle.server.utils.StringServer
import com.twitter.finagle.service.FailFastFactory.FailFast
import com.twitter.finagle.service.PendingRequestFilter
import com.twitter.finagle.stats.InMemoryStatsReceiver
import com.twitter.finagle.stats.{InMemoryStatsReceiver, NullStatsReceiver}
import com.twitter.finagle.transport.{Transport, TransportContext}
import com.twitter.finagle.util.StackRegistry
import com.twitter.finagle.{Name, param}
Expand Down Expand Up @@ -56,7 +56,7 @@ private object StackClientTest {
Future.exception(new IllegalStateException("should not have a local context: " + s))
)
case None =>
new SerialClientDispatcher(transport)
new SerialClientDispatcher(transport, NullStatsReceiver)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.twitter.finagle.client.{StackClient, StdStackClient, Transporter}
import com.twitter.finagle.dispatch.SerialClientDispatcher
import com.twitter.finagle.netty4.Netty4Transporter
import com.twitter.finagle.param.ProtocolLibrary
import com.twitter.finagle.stats.NullStatsReceiver
import com.twitter.finagle.transport.{Transport, TransportContext}
import com.twitter.finagle.{Service, ServiceFactory, Stack}
import io.netty.channel.{
Expand Down Expand Up @@ -72,7 +73,7 @@ object StringClient {
protected def newDispatcher(
transport: Transport[In, Out] { type Context <: Client.this.Context }
): Service[String, String] =
new SerialClientDispatcher(transport)
new SerialClientDispatcher(transport, NullStatsReceiver)

def withEndpoint(s: Service[String, String]): Client =
withStack(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.twitter.finagle.{Status => FinagleStatus, _}
import com.twitter.finagle.Http.{H2ClientImpl, HttpImpl}
import com.twitter.finagle.client.{EndpointerModule, Transporter}
import com.twitter.finagle.context.Contexts
import com.twitter.finagle.dispatch.GenSerialClientDispatcher
import com.twitter.finagle.dispatch.ClientDispatcher
import com.twitter.finagle.http.codec.HttpClientDispatcher
import com.twitter.finagle.http2.transport.MultiplexTransporter
import com.twitter.finagle.http.exp.StreamTransport
Expand Down Expand Up @@ -80,7 +80,7 @@ private[finagle] object ClientEndpointer {
val modifier = prms[TransportModifier].modifier
val transporter = mkTransporter(prms)(addr)
val dispatcherStats =
prms[Stats].statsReceiver.scope(GenSerialClientDispatcher.StatsScope)
prms[Stats].statsReceiver.scope(ClientDispatcher.StatsScope)

new ServiceFactory[Request, Response] {
def apply(conn: ClientConnection): Future[Service[Request, Response]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.twitter.concurrent.Broker
import com.twitter.conversions.DurationOps._
import com.twitter.finagle.client._
import com.twitter.finagle.dispatch.{
GenSerialClientDispatcher,
ClientDispatcher,
SerialServerDispatcher,
StalledPipelineTimeout
}
Expand Down Expand Up @@ -308,7 +308,7 @@ object Memcached extends finagle.Client[Command, Response] with finagle.Server[C
Future.value(
new PipeliningClientPushSession[Response, Command](
handle,
params[finagle.param.Stats].statsReceiver.scope(GenSerialClientDispatcher.StatsScope),
params[finagle.param.Stats].statsReceiver.scope(ClientDispatcher.StatsScope),
params[StalledPipelineTimeout].timeout,
params[finagle.param.Timer].timer
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package com.twitter.finagle.mysql
import com.github.benmanes.caffeine.cache.{Caffeine, RemovalCause, RemovalListener}
import com.twitter.cache.caffeine.CaffeineCache
import com.twitter.finagle.dispatch.GenSerialClientDispatcher
import com.twitter.finagle.dispatch.GenSerialClientDispatcher.wrapWriteException
import com.twitter.finagle.dispatch.ClientDispatcher.wrapWriteException
import com.twitter.finagle.mysql.LostSyncException.const
import com.twitter.finagle.mysql.param.{MaxConcurrentPrepareStatements, UnsignedColumns}
import com.twitter.finagle.mysql.transport.{MysqlBuf, MysqlBufReader, Packet}
import com.twitter.finagle.param.Stats
import com.twitter.finagle.transport.Transport
import com.twitter.finagle.{Service, ServiceProxy, Stack}
import com.twitter.util._
Expand Down Expand Up @@ -94,7 +95,9 @@ private[finagle] final class ClientDispatcher(
trans: Transport[Packet, Packet],
params: Stack.Params,
performHandshake: Boolean)
extends GenSerialClientDispatcher[Request, Result, Packet, Packet](trans) {
extends GenSerialClientDispatcher[Request, Result, Packet, Packet](
trans,
params[Stats].statsReceiver) {
import ClientDispatcher._

// We only support plain handshaking when it's done inside
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.twitter.finagle.netty4.http

import com.twitter.finagle.dispatch.GenSerialClientDispatcher.wrapWriteException
import com.twitter.finagle.dispatch.ClientDispatcher.wrapWriteException
import com.twitter.finagle.http._
import com.twitter.finagle.http.exp.{Multi, StreamTransportProxy}
import com.twitter.finagle.netty4.ByteBufConversion
Expand Down
4 changes: 2 additions & 2 deletions finagle-redis/src/main/scala/com/twitter/finagle/Redis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.twitter.finagle

import com.twitter.finagle
import com.twitter.finagle.client._
import com.twitter.finagle.dispatch.{GenSerialClientDispatcher, StalledPipelineTimeout}
import com.twitter.finagle.dispatch.{ClientDispatcher, StalledPipelineTimeout}
import com.twitter.finagle.netty4.Netty4Transporter
import com.twitter.finagle.param.{
ExceptionStatsHandler => _,
Expand Down Expand Up @@ -85,7 +85,7 @@ object Redis extends Client[Command, Reply] with RedisRichClient {
): Service[Command, Reply] =
RedisPool.newDispatcher(
new StageTransport(transport),
params[finagle.param.Stats].statsReceiver.scope(GenSerialClientDispatcher.StatsScope),
params[finagle.param.Stats].statsReceiver.scope(ClientDispatcher.StatsScope),
params[StalledPipelineTimeout].timeout
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ object RedisPool {
stallTimeout: Duration
): Service[Command, Reply] =
useFor() match {
case Some(Subscription) => new SubscribeDispatcher(transport)
case Some(Subscription) => new SubscribeDispatcher(transport, statsReceiver)
case _ => new PipeliningDispatcher(transport, statsReceiver, stallTimeout, DefaultTimer)
}

Expand Down
Loading

0 comments on commit 4b0493c

Please sign in to comment.