Skip to content

Commit

Permalink
[finagle/finagle-core] Factor shared methods out of TrafficDistributor
Browse files Browse the repository at this point in the history
Problem: We want to make some substantial changes to TrafficDistributor, as part
of CSL-10039. However, the class is currently fairly large, and handles multiple
concerns: lifecycle methods for adding new nodes, and distributing traffic between
differently weighted load balancers. Some of its methods are also used by other
classes.

Solution: Before we work on major functional changes, refactor shared concerns
into separate files. This should make our future changes somewhat simpler.

This change creates two new classes, DistributorTypes and AddrLifecycle, and
factor some of the shared logic out of TrafficDistributor.

This change should introduce no functional changes.

Differential Revision: https://phabricator.twitter.biz/D600745
  • Loading branch information
bonniee authored and jenkins committed Jan 20, 2021
1 parent e4006b6 commit 56e904e
Show file tree
Hide file tree
Showing 11 changed files with 258 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package com.twitter.finagle.loadbalancer
import com.twitter.finagle._
import com.twitter.finagle.client.Transporter
import com.twitter.finagle.loadbalancer.aperture.EagerConnections
import com.twitter.finagle.loadbalancer.distributor.AddressedFactory
import com.twitter.finagle.service.FailFastFactory
import com.twitter.finagle.stats._
import com.twitter.finagle.util.DefaultMonitor
import com.twitter.util.{Activity, Event, Var}
import java.util.logging.{Level, Logger}
import com.twitter.finagle.loadbalancer.distributor.AddrLifecycle
import scala.util.control.NonFatal

/**
Expand Down Expand Up @@ -72,11 +74,11 @@ object LoadBalancerFactory {
* If this is configured, the [[Dest]] param will be ignored.
*/
private[finagle] case class Endpoints(
va: Event[Activity.State[Set[TrafficDistributor.AddressedFactory[_, _]]]])
va: Event[Activity.State[Set[AddressedFactory[_, _]]]])

private[finagle] object Endpoints {
implicit val param = Stack.Param(
Endpoints(Event[Activity.State[Set[TrafficDistributor.AddressedFactory[_, _]]]]()))
implicit val param =
Stack.Param(Endpoints(Event[Activity.State[Set[AddressedFactory[_, _]]]]()))
}

/**
Expand Down Expand Up @@ -349,10 +351,10 @@ object LoadBalancerFactory {
// cluster to another, and crucially, to share data between endpoints
val endpoints = if (params.contains[LoadBalancerFactory.Endpoints]) {
params[LoadBalancerFactory.Endpoints].va
.asInstanceOf[Event[Activity.State[Set[TrafficDistributor.AddressedFactory[Req, Rep]]]]]
.asInstanceOf[Event[Activity.State[Set[AddressedFactory[Req, Rep]]]]]
} else {
TrafficDistributor.weightEndpoints(
TrafficDistributor.varAddrToActivity(dest, label),
AddrLifecycle.varAddrToActivity(dest, label),
newEndpointFn(params, next),
!probationEnabled
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,143 +2,23 @@ package com.twitter.finagle.loadbalancer

import com.twitter.finagle._
import com.twitter.finagle.addr.WeightedAddress
import com.twitter.finagle.loadbalancer.distributor.AddrLifecycle._
import com.twitter.finagle.loadbalancer.distributor.{
AddressedFactory,
BalancerEndpoints,
CachedBalancer,
WeightClass
}
import com.twitter.finagle.service.{DelayedFactory, FailingFactory, ServiceFactoryRef}
import com.twitter.finagle.stats.{Counter, NullStatsReceiver, StatsReceiver, Verbosity}
import com.twitter.finagle.util.{Drv, Rng}
import com.twitter.logging.Logger
import com.twitter.util._
import scala.collection.mutable.Builder
import scala.collection.{immutable, mutable}
import scala.util.control.NonFatal

private[finagle] object TrafficDistributor {
val log = Logger.get(classOf[TrafficDistributor[_, _]])

/**
* A [[ServiceFactory]] and its associated [[Address]].
*/
case class AddressedFactory[Req, Rep](factory: EndpointFactory[Req, Rep], address: Address) {
def weight: Double = WeightedAddress.extract(address)._2
}

/**
* An intermediate representation of the endpoints that a load balancer
* operates over, capable of being updated.
*/
type BalancerEndpoints[Req, Rep] =
Var[Activity.State[Set[EndpointFactory[Req, Rep]]]]
with Updatable[
Activity.State[Set[EndpointFactory[Req, Rep]]]
]

/**
* Represents cache entries for load balancer instances. Stores both
* the load balancer instance and its backing updatable collection.
* Size refers to the number of elements in `endpoints`.
*/
case class CachedBalancer[Req, Rep](
balancer: ServiceFactory[Req, Rep],
endpoints: BalancerEndpoints[Req, Rep],
size: Int)

/**
* A load balancer and its associated weight. Size refers to the
* size of the balancers backing collection. The [[Distributor]]
* operates over these.
*/
case class WeightClass[Req, Rep](
balancer: ServiceFactory[Req, Rep],
endpoints: BalancerEndpoints[Req, Rep],
weight: Double,
size: Int)

/**
* Transforms a [[Var]] of bound Addresses to an [[Activity]] of bound Addresses
* @note The Addr.Bound metadata is stripped out in this method, the metadata can
* be found in [[AddrMetadataExtraction]]
*/
private[finagle] def varAddrToActivity(dest: Var[Addr], label: String): Activity[Set[Address]] =
Activity(dest.map {
case Addr.Bound(set, _) => Activity.Ok(set)
case Addr.Neg =>
log.info(s"$label: name resolution is negative (local dtab: ${Dtab.local})")
Activity.Ok(Set.empty[Address])
case Addr.Failed(e) =>
log.info(s"$label: name resolution failed (local dtab: ${Dtab.local})", e)
Activity.Failed(e)
case Addr.Pending =>
log.debug(s"$label: name resolution is pending")
Activity.Pending
})

/**
* Folds and accumulates over an [[Activity]] based event `stream` while biasing
* for success by suppressing intermediate failures.
*
* Once we have seen data (via Activity.Ok), ignore Activity.Pending/Failed state
* changes and keep using stale data to prevent discarding possibly valid data
* if the updating activity is having transient failures.
*/
private[finagle] def safelyScanLeft[T, U](
init: U,
stream: Event[Activity.State[T]]
)(
f: (U, T) => U
): Event[Activity.State[U]] = {
val initState: Activity.State[U] = Activity.Ok(init)
stream.foldLeft(initState) {
case (Activity.Pending, Activity.Ok(update)) => Activity.Ok(f(init, update))
case (Activity.Failed(_), Activity.Ok(update)) => Activity.Ok(f(init, update))
case (Activity.Ok(state), Activity.Ok(update)) => Activity.Ok(f(state, update))
case (stale @ Activity.Ok(state), Activity.Failed(_)) if init != state => stale
case (stale @ Activity.Ok(state), Activity.Pending) if init != state => stale
case (_, failed @ Activity.Failed(_)) => failed
case (_, Activity.Pending) => Activity.Pending
}
}

/**
* This interface handles operations when partitioning a Set of elements into a Map.
*/
trait DiffOps[U, Partition] {

/** Removes/cleans up an existing partition */
def remove(partition: Partition): Unit

/** Constructs a new partition from the an addition set of elements */
def add(current: Set[U]): Partition

/** Updates the existing partition with a set of elements */
def update(current: Set[U], partition: Partition): Partition
}

/**
* Transform the current element Set to a new Partition Map which updates the previous
* Partition Map by provided operations.
*
* @param accumulated Previous Partition Map
* @param current Current Set of elements
* @param getKeys A discriminator function for current Set to get keys
* @param diffOps [[DiffOps]] to handle the diff between the previous partition map and
* newly transformed partition map from the current Set. This function
* usually handles transactions during creating, updating, and removing.
*/
private[loadbalancer] def updatePartitionMap[Key, Partition, U](
accumulated: Map[Key, Partition],
current: Set[U],
getKeys: U => Seq[Key],
diffOps: DiffOps[U, Partition]
): Map[Key, Partition] = {
val grouped = groupBy(current, getKeys)
val removals = accumulated.keySet &~ grouped.keySet
val additions = grouped.keySet &~ accumulated.keySet
val updates = grouped.keySet & accumulated.keySet
removals.foreach { key => diffOps.remove(accumulated(key)) }
val added = additions.map { key => key -> diffOps.add(grouped(key)) }.toMap
val updated = updates.map { key => key -> diffOps.update(grouped(key), accumulated(key)) }.toMap
added ++ updated
}

/**
* Creates a `newEndpoint` for each distinct [[Address]] in the `addrs`
* stream. Calls to `newEndpoint` are cached based on the input address. The cache is
Expand Down Expand Up @@ -183,48 +63,15 @@ private[finagle] object TrafficDistributor {

// Remove stale cache entries. When `eagerEviction` is false cache
// entries are only removed in subsequent stream updates.
val removed = merged.keySet -- addrs.map { case WeightedAddress(addr, _) => addr }
removed.foldLeft(merged) {
case (cache, addr) =>
cache.get(addr) match {
case Some(AddressedFactory(f, _)) if eagerEviction || f.status != Status.Open =>
try f.close()
catch {
case NonFatal(t) => log.warning(t, s"unable to close endpoint $addr")
}
cache - addr
case _ => cache
}
}
removeStaleAddresses(merged, addrs, eagerEviction)

}.map {
case Activity.Ok(cache) => Activity.Ok(cache.values.toSet)
case Activity.Pending => Activity.Pending
case failed @ Activity.Failed(_) => failed
}
}

/**
* A modified version of scala collection's groupBy function. `f` is a multi-mapping function
* and this achieves many to many mapping.
*/
private[finagle] def groupBy[U, Key](
coll: Set[U],
f: U => Seq[Key]
): immutable.Map[Key, Set[U]] = {
val m = mutable.Map.empty[Key, Builder[U, Set[U]]]
for (elem <- coll) {
val keys = f(elem)
for (k <- keys) {
val bldr = m.getOrElseUpdate(k, Set.newBuilder)
bldr += elem
}
}
val b = immutable.Map.newBuilder[Key, Set[U]]
for ((k, v) <- m)
b += ((k, v.result))
b.result
}

/**
* Distributes requests to `classes` according to their weight and size.
*/
Expand Down Expand Up @@ -320,7 +167,7 @@ private[finagle] object TrafficDistributor {
* [[c.t.f.loadbalancer.aperture.EagerConnections]] feature disabled.
*/
private class TrafficDistributor[Req, Rep](
dest: Event[Activity.State[Set[TrafficDistributor.AddressedFactory[Req, Rep]]]],
dest: Event[Activity.State[Set[AddressedFactory[Req, Rep]]]],
newBalancer: (Activity[Set[EndpointFactory[Req, Rep]]], Boolean) => ServiceFactory[Req, Rep],
rng: Rng = Rng.threadLocal,
statsReceiver: StatsReceiver = NullStatsReceiver)
Expand All @@ -338,7 +185,10 @@ private class TrafficDistributor[Req, Rep](
// which is updatable. The entries are keyed by weight class.
val init = Map.empty[Double, CachedBalancer[Req, Rep]]

val balancerDiffOps = new DiffOps[AddressedFactory[Req, Rep], CachedBalancer[Req, Rep]] {
val balancerDiffOps = new DiffOps[
AddressedFactory[Req, Rep],
CachedBalancer[Req, Rep]
] {
// Close balancers that don't correspond to new endpoints.
def remove(cachedBalancer: CachedBalancer[Req, Rep]): Unit =
try cachedBalancer.balancer.close()
Expand All @@ -347,7 +197,9 @@ private class TrafficDistributor[Req, Rep](
}

// Construct new balancers from new endpoints.
def add(factories: Set[AddressedFactory[Req, Rep]]): CachedBalancer[Req, Rep] = {
def add(
factories: Set[AddressedFactory[Req, Rep]]
): CachedBalancer[Req, Rep] = {
val group = factories.map(_.factory)
val weight = if (factories.isEmpty) 1D else factories.head.weight
val endpoints: BalancerEndpoints[Req, Rep] = Var(Activity.Ok(group))
Expand Down Expand Up @@ -415,7 +267,9 @@ private class TrafficDistributor[Req, Rep](

private[this] val busyWeightClasses: Counter = statsReceiver.counter("busy_weight_classes")

private[this] def updateGauges(classes: Iterable[WeightClass[Req, Rep]]): Unit = {
private[this] def updateGauges(
classes: Iterable[WeightClass[Req, Rep]]
): Unit = {
numWeightClasses = classes.size.toFloat
val numEndpoints = classes.map(_.size).sum
meanWeight =
Expand Down
Loading

0 comments on commit 56e904e

Please sign in to comment.