Skip to content

Commit

Permalink
finagle: mv ConcurrentLoadBalanacerFactory into finagle-memcached
Browse files Browse the repository at this point in the history
Summary:
Problem / Solution

ConcurrentLoadBalanacerFactory is only used in one place – finagle-memcached.
It doesn't seem generally useful since it is specific to pipelined protocols that
have stateless sessions, of which we only have one (finagle-memcached). This patch
moves it into the finagle-memcached directory and fixes up source to reflect
the change.

Test Plan: ./pants test <relevant projects>

Reviewers: O22826 source:/src/scala/com/twitter/mediascience/!, banderson, O22963 source:/src/scala/com/twitter/taxi/!, vkostyukov, O1553 source:/media-analysis-service/!, mnakamura, O1805 source:/notifications-delivery/!, jillianc, O5308 source:/user-image-service/!, koliver, O1567 source:/mediaservices/!, yao, O5146 source:/thumbingbird/!, ryano, O1802 source:/mynahbird/!, dschobel, O948 source:/iesource/!, jdonham, O523 source:/bouncer/!, O5276 source:/udaan/!, O536 source:/cache/!, O8689 source:/ads/review/common/!, cflanagan

Reviewed By: jillianc, koliver, dschobel

Subscribers: ee-phabricator-cron, #rb_traffic, #rb_notifications-infra-team, #rb_sss, #rb_media-platform, #rb_csl, #rb_media-science-eng, #rb_growth-bangalore-eng, #rb_te-eng, #rb_cache-team, #rb_core-data-metrics, #rb_abuse-eng

JIRA Issues: CSL-4067

TBR:

TBR=true

Differential Revision: https://phabricator.twitter.biz/D29215
  • Loading branch information
Ruben Oanta authored and jenkins committed Feb 24, 2017
1 parent 29f3fef commit 36ab44e
Show file tree
Hide file tree
Showing 28 changed files with 157 additions and 220 deletions.
7 changes: 5 additions & 2 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ Breaking API Changes
`JdkServerEngineFactory` respectively.
``RB_ID=907923``

* finagle-core: `withLoadBalancer.connectionsPerEndpoint` was removed and moved
into finagle-memcached, which was the only client that uses the feature. ``RB_ID=908354``

* finagle-kestrel: Remove the deprecated `codec` method on `c.t.f.kestrel.MultiReaderMemcache`.
Use `.stack(Kestrel.client)` on the configured `c.t.f.builder.ClientBuilder` instead.
``RB_ID=907184``
Expand All @@ -77,7 +80,7 @@ Breaking API Changes
* finagle-kestrel: Removed deprecated `c.t.f.kestrel.protocol.Kestrel`. To create a Finagle
Kestrel client, use `c.t.f.Kestrel.client`. ``RB_ID=907422``

* finagle-serversets: Removed the unapply method and modified the signature of
* finagle-serversets: Removed the unapply method and modified the signature of
fromAddrMetadata method in `c.t.f.serverset2.addr.ZkMetadata`. Instead of pattern
matching use the modified fromAddrMetadata method. ``RB_ID=908186``

Expand All @@ -88,7 +91,7 @@ Breaking API Changes

Runtime Behavior Changes
~~~~~~~~~~~~~~~~~~~~~~~~

* finagle-netty4: Servers no longer set SO_LINGER=0 on sockets. ``RB_ID=907325``

Deprecations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,56 +234,6 @@ object LoadBalancerFactory {
}
}

/**
* A load balancer that balances among multiple connections,
* useful for managing concurrency in pipelining protocols.
*
* Each endpoint can open multiple connections. For N endpoints,
* each opens M connections, load balancer balances among N*M
* options. Thus, it increases concurrency of each endpoint.
*/
object ConcurrentLoadBalancerFactory {
import LoadBalancerFactory._

private val ReplicaKey = "concurrent_lb_replica"

// package private for testing
private[finagle] def replicate(num: Int): Address => Set[Address] = {
case Address.Inet(ia, metadata) =>
for (i: Int <- (0 until num).toSet) yield
Address.Inet(ia, metadata + (ReplicaKey -> i))
case addr => Set(addr)
}

/**
* A class eligible for configuring the number of connections
* a single endpoint has.
*/
case class Param(numConnections: Int) {
def mk(): (Param, Stack.Param[Param]) = (this, Param.param)
}
object Param {
implicit val param = Stack.Param(Param(4))
}

private[finagle] def module[Req, Rep]: Stackable[ServiceFactory[Req, Rep]] =
new StackModule[Req, Rep] {
val description = "Balance requests across multiple connections on a single " +
"endpoint, used for pipelining protocols"

override def make(params: Stack.Params, next: Stack[ServiceFactory[Req, Rep]]) = {
val Param(numConnections) = params[Param]
val Dest(dest) = params[Dest]
val newDest = dest.map {
case bound@Addr.Bound(set, _) =>
bound.copy(addrs = set.flatMap(replicate(numConnections)))
case addr => addr
}
super.make(params + Dest(newDest), next)
}
}
}

/**
* A thin interface around a Balancer's constructor that allows Finagle to pass in
* context from the stack to the balancers at construction time.
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.twitter.conversions.time._
import com.twitter.finagle._
import com.twitter.finagle.addr.WeightedAddress
import com.twitter.finagle.client.StringClient
import com.twitter.finagle.loadbalancer.{ConcurrentLoadBalancerFactory, DefaultBalancerFactory}
import com.twitter.finagle.loadbalancer.DefaultBalancerFactory
import com.twitter.finagle.server.StringServer
import com.twitter.finagle.stats._
import com.twitter.finagle.util.Rng
Expand All @@ -22,7 +22,7 @@ private object TrafficDistributorTest {

def apply(port: Int, weight: Double): Address =
Address.Inet(new InetSocketAddress(port), Addr.Metadata(key -> weight))

def unapply(addr: Address): Option[(Int, Double)] = addr match {
case Address.Inet(ia, metadata) =>
Some((ia.getPort, metadata(key).asInstanceOf[Double]))
Expand Down Expand Up @@ -335,7 +335,6 @@ class TrafficDistributorTest extends FunSuite {
Await.result(dist())
})


test("status is bestOf all weight classes") (new Ctx {
val weightClasses = Seq((1.0, 1), (busyWeight, 2))
val classes = weightClasses.flatMap(weightClass.tupled).toSet
Expand All @@ -352,30 +351,6 @@ class TrafficDistributorTest extends FunSuite {
assert(dist.status == Status.Open)
})

test("handles replicated addresses") (new Ctx {
val init: Set[Address] = (1 to 5).map(Address(_)).toSet
val dest = Var(Activity.Ok(init))
val newDest = dest.map {
case Activity.Ok(set) =>
Activity.Ok(set.flatMap(ConcurrentLoadBalancerFactory.replicate(4)))
case state => state
}
val dist = newDist(newDest)

assert(newEndpointCalls == 20)
assert(newBalancerCalls == 1)

val update: Set[Address] = (2 to 5).map(Address(_)).toSet
resetCounters()
dest() = Activity.Ok(update)
assert(newEndpointCalls == 0)
assert(newBalancerCalls == 0)

assert(balancers.head.endpoints.sample().size == 16)
assert(balancers.head.endpoints.sample() == update.flatMap(
ConcurrentLoadBalancerFactory.replicate(4)).map(AddressFactory))
})

// todo: move this to util-stats?
private class CumulativeGaugeInMemoryStatsReceiver
extends StatsReceiverWithCumulativeGauges
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.twitter.finagle.loadbalancer

import com.twitter.finagle._
import com.twitter.finagle.client.{StackClient, StringClient}
import com.twitter.finagle.client.StringClient
import com.twitter.finagle.param.Stats
import com.twitter.finagle.server.StringServer
import com.twitter.finagle.stats.{InMemoryHostStatsReceiver, InMemoryStatsReceiver}
Expand Down Expand Up @@ -97,44 +97,4 @@ class LoadBalancerFactoryTest extends FunSuite
intercept[NoBrokersAvailableException](Await.result(factory()))
}
}
}

@RunWith(classOf[JUnitRunner])
class ConcurrentLoadBalancerFactoryTest extends FunSuite with StringClient with StringServer {
val echoService = Service.mk[String, String](Future.value(_))

test("makes service factory stack") {
val address = new InetSocketAddress(InetAddress.getLoopbackAddress, 0)
val server = stringServer.serve(address, echoService)

val sr = new InMemoryStatsReceiver
val clientStack =
StackClient.newStack.replace(
LoadBalancerFactory.role, ConcurrentLoadBalancerFactory.module[String, String])
val client = stringClient.withStack(clientStack)
.configured(Stats(sr))
.newService(Name.bound(Address(server.boundAddress.asInstanceOf[InetSocketAddress])), "client")

assert(sr.counters(Seq("client", "loadbalancer", "adds")) == 4)
assert(Await.result(client("hello\n")) == "hello")
}

test("creates fixed number of service factories based on params") {
val addr1 = new InetSocketAddress(InetAddress.getLoopbackAddress, 0)
val server1 = stringServer.serve(addr1, echoService)

val addr2 = new InetSocketAddress(InetAddress.getLoopbackAddress, 0)
val server2 = stringServer.serve(addr2, echoService)

val sr = new InMemoryStatsReceiver
val clientStack =
StackClient.newStack.replace(
LoadBalancerFactory.role, ConcurrentLoadBalancerFactory.module[String, String])
val client = stringClient.withStack(clientStack)
.configured(Stats(sr))
.configured(ConcurrentLoadBalancerFactory.Param(3))
.newService(Name.bound(Address(server1.boundAddress.asInstanceOf[InetSocketAddress]), Address(server2.boundAddress.asInstanceOf[InetSocketAddress])), "client")

assert(sr.counters(Seq("client", "loadbalancer", "adds")) == 6)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object MemcacheStress extends App {
def main() {
var client = Memcached.client
.withLabel("mc")
.withLoadBalancer.connectionsPerEndpoint(config.concurrency())
.connectionsPerEndpoint(config.concurrency())

if (config.nworkers() > 0)
client = client.configured(Netty3Transporter.ChannelFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

import com.twitter.finagle.Memcached;
import com.twitter.finagle.Service;
import com.twitter.finagle.loadbalancer.ConcurrentLoadBalancerFactory;
import com.twitter.finagle.memcached.CacheNode;
import com.twitter.finagle.memcached.CachePoolCluster;
import com.twitter.finagle.memcached.KetamaClientBuilder;
import com.twitter.finagle.memcached.loadbalancer.ConcurrentLoadBalancerFactory;
import com.twitter.finagle.memcached.protocol.Command;
import com.twitter.finagle.memcached.protocol.Response;
import com.twitter.io.Buf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,33 +6,34 @@ import com.twitter.conversions.time._
import com.twitter.finagle
import com.twitter.finagle.client.{ClientRegistry, DefaultPool, StackClient, StdStackClient, Transporter}
import com.twitter.finagle.dispatch.{GenSerialClientDispatcher, PipeliningDispatcher, SerialServerDispatcher}
import com.twitter.finagle.loadbalancer.{Balancers, ConcurrentLoadBalancerFactory, LoadBalancerFactory}
import com.twitter.finagle.loadbalancer.{Balancers, LoadBalancerFactory}
import com.twitter.finagle.memcached._
import com.twitter.finagle.memcached.Toggles
import com.twitter.finagle.memcached.exp.LocalMemcached
import com.twitter.finagle.memcached.protocol.text.CommandToEncoding
import com.twitter.finagle.memcached.loadbalancer.ConcurrentLoadBalancerFactory
import com.twitter.finagle.memcached.protocol.text.client.ClientTransport
import com.twitter.finagle.memcached.protocol.text.server.ServerTransport
import com.twitter.finagle.memcached.protocol.text.client.DecodingToResponse
import com.twitter.finagle.memcached.protocol.text.CommandToEncoding
import com.twitter.finagle.memcached.protocol.text.server.ServerTransport
import com.twitter.finagle.memcached.protocol.text.transport.{Netty3ClientFramer, Netty3ServerFramer, Netty4ClientFramer, Netty4ServerFramer}
import com.twitter.finagle.memcached.protocol.{Command, Response, RetrievalCommand, Values}
import com.twitter.finagle.memcached.Toggles
import com.twitter.finagle.netty3.{Netty3Listener, Netty3Transporter}
import com.twitter.finagle.netty4.{Netty4Listener, Netty4Transporter}
import com.twitter.finagle.param.{ExceptionStatsHandler => _, Monitor => _, ResponseClassifier => _, Tracer => _, _}
import com.twitter.finagle.pool.SingletonPool
import com.twitter.finagle.server.ServerInfo
import com.twitter.finagle.server.{Listener, StackServer, StdStackServer}
import com.twitter.finagle.service._
import com.twitter.finagle.service.exp.FailureAccrualPolicy
import com.twitter.finagle.stats.{ExceptionStatsHandler, StatsReceiver}
import com.twitter.finagle.server.ServerInfo
import com.twitter.finagle.toggle.Toggle
import com.twitter.finagle.tracing._
import com.twitter.finagle.transport.Transport
import com.twitter.finagle.util.DefaultTimer
import com.twitter.hashing
import com.twitter.io.Buf
import com.twitter.util.{Closable, Duration, Monitor}
import com.twitter.util.registry.GlobalRegistry
import com.twitter.util.{Closable, Duration, Monitor}
import scala.collection.mutable

private[finagle] object MemcachedTracingFilter {
Expand Down Expand Up @@ -288,7 +289,6 @@ object Memcached extends finagle.Client[Command, Response]
stack: Stack[ServiceFactory[Command, Response]] = Client.newStack,
params: Stack.Params = Client.defaultParams)
extends StdStackClient[Command, Response, Client]
with WithConcurrentLoadBalancer[Client]
with MemcachedRichClient {

import Client.mkDestination
Expand Down Expand Up @@ -376,10 +376,17 @@ object Memcached extends finagle.Client[Command, Response]
def withNumReps(reps: Int): Client =
configured(param.NumReps(reps))

// Java-friendly forwarders
// See https://issues.scala-lang.org/browse/SI-8905
override val withLoadBalancer: ConcurrentLoadBalancingParams[Client] =
new ConcurrentLoadBalancingParams(this)
/**
* Configures the number of concurrent `connections` a single endpoint has.
* The connections are load balanced over which allows the pipelined client to
* avoid head-of-line blocking and reduce its latency.
*
* We've empirically found that four is a good default for this, but it can be
* increased at the cost of additional connection overhead.
*/
def connectionsPerEndpoint(connections: Int): Client =
configured(ConcurrentLoadBalancerFactory.Param(connections))

override val withTransport: ClientTransportParams[Client] =
new ClientTransportParams(this)
override val withSession: ClientSessionParams[Client] =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package com.twitter.finagle.memcached.loadbalancer

import com.twitter.finagle.loadbalancer.LoadBalancerFactory
import com.twitter.finagle.{Addr, Address, Stack, ServiceFactory, Stackable}

/**
* Exposes a [[StackModule]] which composes over the [[LoadBalancerFactory]] to
* change the way the way endpoints are created for a load balancer. In particular,
* each endpoint that is resolved is replicated by the given [[Param]]. This, increases
* concurrency for each identical endpoint and allows them to be load balanced over. This
* is useful for pipelining protocols that may incur head-of-line blocking without
* this replication.
*/
object ConcurrentLoadBalancerFactory {
private val ReplicaKey = "concurrent_lb_replica"

// package private for testing
private[finagle] def replicate(num: Int): Address => Set[Address] = {
case Address.Inet(ia, metadata) =>
for (i: Int <- (0 until num).toSet) yield
Address.Inet(ia, metadata + (ReplicaKey -> i))
case addr => Set(addr)
}

/**
* A class eligible for configuring the number of connections
* a single endpoint has.
*/
case class Param(numConnections: Int) {
def mk(): (Param, Stack.Param[Param]) = (this, Param.param)
}
object Param {
implicit val param = Stack.Param(Param(4))
}

private[finagle] def module[Req, Rep]: Stackable[ServiceFactory[Req, Rep]] =
new LoadBalancerFactory.StackModule[Req, Rep] {
val description = "Balance requests across multiple connections on a single " +
"endpoint, used for pipelining protocols"

override def make(params: Stack.Params, next: Stack[ServiceFactory[Req, Rep]]) = {
val Param(numConnections) = params[Param]
val LoadBalancerFactory.Dest(dest) = params[LoadBalancerFactory.Dest]
val newDest = dest.map {
case bound@Addr.Bound(set, _) =>
bound.copy(addrs = set.flatMap(replicate(numConnections)))
case addr => addr
}
super.make(params + LoadBalancerFactory.Dest(newDest), next)
}
}
}
Loading

0 comments on commit 36ab44e

Please sign in to comment.