Skip to content

Commit

Permalink
finagle-memcached: Use fixed number of connections for pipelining mem…
Browse files Browse the repository at this point in the history
…cache client

Problem

When memcache client use pipelining with singleton pool,
p9999 is worse than non pipelining with multiple
connections due to head of line blocking.

Solution

Using a samll fixed number of connections, and load
balance among those connections based on outstanding
requests and time.

Result

A customized pipline memcache client, using ewma P2C
loadbalancer. With much fewer connections, Memcache
pipelining client achieve similar tail latency as non
pipelining client.

RB_ID=614228
  • Loading branch information
blackicewei authored and jenkins committed May 18, 2015
1 parent ede2aff commit 9cd3d44
Show file tree
Hide file tree
Showing 16 changed files with 802 additions and 467 deletions.
11 changes: 10 additions & 1 deletion CHANGES
Expand Up @@ -5,8 +5,17 @@
6.x
-----

6.26.0
------

Breaking API Changes
~~~~~~~~~~~~~~~~~~~~

* finagle-core: Define `WeightedSocketAddress` as a case class. Add
`WeightedSocketAddress.extract` method to extract weight.

6.25.0
~~~~~~~
------

Breaking API Changes
~~~~~~~~~~~~~~~~~~~~
Expand Down
Expand Up @@ -298,7 +298,7 @@ private class SyncInetResolver extends InetResolver {
val (hosts, ports, weights) = whp.unzip3
val hostports = hosts.zip(ports)
val addrs = resolveHostPortsSeq(hostports)
val weighted = addrs.zip(weights) collect {
val weighted: Seq[SocketAddress] = addrs.zip(weights) collect {
case (Seq(a, _*), w) => WeightedSocketAddress(a, w)
}

Expand Down
Expand Up @@ -3,40 +3,37 @@ package com.twitter.finagle
import java.net.{SocketAddress, InetSocketAddress}

/**
* A SocketAddress with a weight.
* A SocketAddress with a weight. `WeightedSocketAddress` can be nested.
*/
object WeightedSocketAddress {
private case class Impl(
addr: SocketAddress,
weight: Double
) extends SocketAddress

/**
* Create a weighted socket address with weight `weight`.
*/
def apply(addr: SocketAddress, weight: Double): SocketAddress =
Impl(addr, weight)
case class WeightedSocketAddress(addr: SocketAddress, weight: Double) extends SocketAddress

object WeightedSocketAddress {
/**
* Destructuring a weighted socket address is liberal: we return a
* weight of 1 if it is unweighted.
* Extract `SocketAddress` and weight recursively until it reaches
* an unweighted address instance. Weights are multiplied.
*
* If the input `addr` is an unweighted instance, return a weight of 1.0.
*/
def unapply(addr: SocketAddress): Option[(SocketAddress, Double)] =
def extract(addr: SocketAddress): (SocketAddress, Double) =
addr match {
case Impl(addr, weight) => Some(addr, weight)
case addr => Some(addr, 1D)
case WeightedSocketAddress(sa, weight) =>
val (underlying, anotherW) = extract(sa)
(underlying, weight * anotherW)
case _ =>
(addr, 1.0)
}
}

object WeightedInetSocketAddress {
/**
/**
* Destructuring a weighted inet socket address is liberal: we
* return a weight of 1 if it is unweighted.
*/
def unapply(addr: SocketAddress): Option[(InetSocketAddress, Double)] =
addr match {
case WeightedSocketAddress(ia: InetSocketAddress, weight) => Some(ia, weight)
case ia: InetSocketAddress => Some(ia, 1D)
def unapply(addr: SocketAddress): Option[(InetSocketAddress, Double)] = {
val (base, weight) = WeightedSocketAddress.extract(addr)
base match {
case sa: InetSocketAddress => Some(sa, weight)
case _ => None
}
}
}
}
@@ -0,0 +1,198 @@
package com.twitter.finagle.loadbalancer

import com.twitter.finagle._
import com.twitter.finagle.client.Transporter
import com.twitter.finagle.service.DelayedFactory
import com.twitter.finagle.stats._
import com.twitter.finagle.util.OnReady
import com.twitter.util.{Activity, Future}
import java.net.SocketAddress
import java.util.logging.Level
import scala.collection.mutable

/**
* A load balancer module that operates over stacks and the
* stack parameters, and creates a WeightedServiceFactory for
* each resolved SocketAddress.
*/
private[loadbalancer] trait BalancerStackModule[Req, Rep]
extends Stack.Module[ServiceFactory[Req, Rep]] {
import com.twitter.finagle.loadbalancer.LoadBalancerFactory._

/**
* A tuple containing a [[com.twitter.finagle.ServiceFactory]] and its
* associated weight.
*/
type WeightedFactory[Req, Rep] = (ServiceFactory[Req, Rep], Double)

/**
* Allows implementations to process `addrs` before they are committed
* to the load balancer's active set.
*
* @param params stack parameters which may contain configurations about
* how to process the addresses.
* @param addrs input addresses
* @return processed addresses
*/
protected def processAddrs(
params: Stack.Params,
addrs: Set[SocketAddress]
): Set[SocketAddress] = addrs

val role = LoadBalancerFactory.role

/**
* Update a mutable Map of `WeightedFactory`s according to a set of
* active SocketAddresses and a factory construction function.
*
* `cachedFactories` are keyed by its unweighted address.
* When an active address has a new weight compared with its
* counterpart in the cache, the cached `WeightedFactory` is updated
* with the new weight; and the `ServiceFactory` is reused.
*
* A new `ServiceFactory` is created only when an active address
* does not have a unweighted counterpart in the cache. The newly
* created `ServiceFactory` along with the weight of the `SocketAddress`
* (if any) are cached. If no weight is provided from the `SocketAddress`,
* a default weight is used in `WeightedFactory`.
*
* If `activeAddrs` contains duplicated host socket addresses, only one
* such socket address and its weighted factory is cached. The weight in
* the last socket address wins.
*
* When `probationEnabled` is true, `ServiceFactory` is only removed from
* the active set when its status has changed to !Open. i.e. Removal may
* not happen in the current update.
*/
def updateFactories[Req, Rep](
activeAddrs: Set[SocketAddress],
cachedFactories: mutable.Map[SocketAddress, WeightedFactory[Req, Rep]],
mkFactory: SocketAddress => ServiceFactory[Req, Rep],
probationEnabled: Boolean
): Unit = cachedFactories.synchronized {
val addrsWithWeight = activeAddrs.map(WeightedSocketAddress.extract)

addrsWithWeight.foreach { case (unweightedAddr, weight) =>
cachedFactories.get(unweightedAddr) match {
case Some((f, oldWeight)) if weight != oldWeight =>
// update factory with weight
cachedFactories += unweightedAddr -> (f, weight)
case None =>
// add new factory
cachedFactories += unweightedAddr -> (mkFactory(unweightedAddr), weight)
case _ =>
// nothing to do
}
}

// remove a serviceFactory only when its status is !open with probationEnabled enabled
(cachedFactories.keySet &~ addrsWithWeight.map(_._1)).foreach { sa =>
cachedFactories.get(sa) match {
case Some((factory, _)) if !probationEnabled || factory.status != Status.Open =>
factory.close()
cachedFactories -= sa

case _ => // nothing to do
}
}
}

def make(
params: Stack.Params,
next: Stack[ServiceFactory[Req, Rep]]
): Stack[ServiceFactory[Req, Rep]] = {
val ErrorLabel(errorLabel) = params[ErrorLabel]
val Dest(dest) = params[Dest]
val Param(loadBalancerFactory) = params[Param]
val EnableProbation(probationEnabled) = params[EnableProbation]


/**
* Determine which stats receiver to use based on `perHostStats`
* flag and the configured `HostStats` param. Report per-host stats
* only when the flag is set.
*/
val hostStatsReceiver =
if (!perHostStats()) NullStatsReceiver
else params[LoadBalancerFactory.HostStats].hostStatsReceiver

val param.Stats(statsReceiver) = params[param.Stats]
val param.Logger(log) = params[param.Logger]
val param.Label(label) = params[param.Label]
val param.Monitor(monitor) = params[param.Monitor]
val param.Reporter(reporter) = params[param.Reporter]

val noBrokersException = new NoBrokersAvailableException(errorLabel)

def mkFactory(sockaddr: SocketAddress): ServiceFactory[Req, Rep] = {
val stats = if (hostStatsReceiver.isNull) statsReceiver else {
val scope = sockaddr match {
case WeightedInetSocketAddress(addr, _) =>
"%s:%d".format(addr.getHostName, addr.getPort)
case other => other.toString
}
val host = hostStatsReceiver.scope(label).scope(scope)
BroadcastStatsReceiver(Seq(host, statsReceiver))
}

val composite = reporter(label, Some(sockaddr)) andThen monitor

val underlying = next.make(params +
Transporter.EndpointAddr(SocketAddresses.unwrap(sockaddr)) +
param.Stats(stats) +
param.Monitor(composite))

new ServiceFactoryProxy(underlying) {
override def toString = sockaddr.toString
}
}

val cachedFactories = mutable.Map.empty[SocketAddress, WeightedFactory[Req, Rep]]
val endpoints = Activity(
dest.map {
case Addr.Bound(sockaddrs, metadata) =>
updateFactories(
processAddrs(params, sockaddrs.toSet), cachedFactories, mkFactory, probationEnabled)
Activity.Ok(cachedFactories.values.toSet)

case Addr.Neg =>
log.info(s"$label: name resolution is negative")
updateFactories(
Set.empty, cachedFactories, mkFactory, probationEnabled)
Activity.Ok(cachedFactories.values.toSet)

case Addr.Failed(e) =>
log.log(Level.INFO, s"$label: name resolution failed", e)
Activity.Failed(e)

case Addr.Pending =>
if (log.isLoggable(Level.FINE)) {
log.fine(s"$label: name resolution is pending")
}
Activity.Pending
}
)

val rawStatsReceiver = statsReceiver match {
case sr: RollupStatsReceiver => sr.self
case sr => sr
}

val lb = loadBalancerFactory.newWeightedLoadBalancer(
endpoints,
rawStatsReceiver.scope(role.toString),
noBrokersException)

val lbReady = lb match {
case onReady: OnReady =>
onReady.onReady before Future.value(lb)
case _ =>
log.warning("Load balancer cannot signal readiness and may throw "+
"NoBrokersAvailableExceptions during resolution.")
Future.value(lb)
}

val delayed = DelayedFactory.swapOnComplete(lbReady)
Stack.Leaf(role, delayed)
}
}
@@ -0,0 +1,50 @@
package com.twitter.finagle.loadbalancer

import com.twitter.finagle._
import com.twitter.util.Activity
import java.net.SocketAddress
import java.util.logging.Level

/**
* A load balancer that balances among multiple connections,
* useful for managing concurrency in pipelining protocols.
*
* Each endpoint can open multiple connections. For N endpoints,
* each opens M connections, load balancer balances among N*M
* options. Thus, it increases concurrency of each endpoint.
*/
private[finagle] object ConcurrentLoadBalancerFactory {
import LoadBalancerFactory._

/**
* A class eligible for configuring the number of connections
* a single endpoint has.
*/
case class Param(numConnections: Int) {
def mk(): (Param, Stack.Param[Param]) = (this, Param.param)
}
object Param {
implicit val param = Stack.Param(Param(4))
}

def module[Req, Rep]: Stackable[ServiceFactory[Req, Rep]] =
new BalancerStackModule[Req, Rep] {
val description = "Balance requests across multiple connections on a single endpoint, used for pipelining protocols"
val parameters = Seq(
implicitly[Stack.Param[ErrorLabel]],
implicitly[Stack.Param[Dest]],
implicitly[Stack.Param[param.Stats]],
implicitly[Stack.Param[param.Logger]],
implicitly[Stack.Param[param.Monitor]],
implicitly[Stack.Param[param.Reporter]],
implicitly[Stack.Param[Param]])

override protected def processAddrs(
params: Stack.Params,
addrs: Set[SocketAddress]
): Set[SocketAddress] = {
val n = params[Param].numConnections
addrs.flatMap(SocketAddresses.replicate(n))
}
}
}

0 comments on commit 9cd3d44

Please sign in to comment.