Skip to content

Commit

Permalink
finagle-core: Remove AddressedFactory
Browse files Browse the repository at this point in the history
[This reverts commit 9a0e91cc20bd3f224fa90eff10a432723e42370e.]

Problem:

AddressedFactory was a useful abstraction, specifically for
updating address weights without having to close the underlying
EndpointFactory. However, it had its limitations, specifically in that
updating an AddressedFactory would not update its underlying
EndpointFactory at all. This resulted in some weights not being
propagated properly throughout LoadBalancer code

Solution:

We remove AddressedFactory and introduce an EndpointFactoryProxy
where necessary, which gives us the benefit of updating EndpointFactory
address weights while consolidating weights to one "true" value.

JIRA Issues: CSL-11291

Differential Revision: https://phabricator.twitter.biz/D751145
  • Loading branch information
joybestourous authored and jenkins committed Oct 11, 2021
1 parent 4651b6f commit 4043382
Show file tree
Hide file tree
Showing 17 changed files with 313 additions and 147 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ Note that ``PHAB_ID=#`` and ``RB_ID=#`` correspond to associated messages in com
Unreleased
----------

Breaking API Changes
~~~~~~~~~~~~~~~~~~~~

* finagle-core: `c.t.f.loadbalancer.distributor.AddressedFactory` has been removed. Use
`c.t.f.loadbalancer.EndpointFactory` directly instead. ``PHAB_ID=D751145``

21.9.0
------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ trait EndpointFactory[Req, Rep] extends ServiceFactory[Req, Rep] {
/**
* Returns the address which this endpoint connects to.
*/
def address: Address
val address: Address

private[loadbalancer] lazy val weight: Double = WeightedAddress.extract(address)._2

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import com.twitter.finagle._
import com.twitter.finagle.client.Transporter
import com.twitter.finagle.loadbalancer.aperture.EagerConnections
import com.twitter.finagle.loadbalancer.aperture.WeightedApertureToggle
import com.twitter.finagle.loadbalancer.distributor.AddressedFactory
import com.twitter.finagle.service.FailFastFactory
import com.twitter.finagle.stats._
import com.twitter.finagle.util.DefaultLogger
Expand Down Expand Up @@ -114,11 +113,11 @@ object LoadBalancerFactory {
* If this is configured, the [[Dest]] param will be ignored.
*/
private[finagle] case class Endpoints(
va: Event[Activity.State[Set[AddressedFactory[_, _]]]])
va: Event[Activity.State[Set[EndpointFactory[_, _]]]])

private[finagle] object Endpoints {
implicit val param =
Stack.Param(Endpoints(Event[Activity.State[Set[AddressedFactory[_, _]]]]()))
Stack.Param(Endpoints(Event[Activity.State[Set[EndpointFactory[_, _]]]]()))
}

/**
Expand Down Expand Up @@ -398,7 +397,7 @@ object LoadBalancerFactory {
// cluster to another, and crucially, to share data between endpoints
val endpoints = if (params.contains[LoadBalancerFactory.Endpoints]) {
params[LoadBalancerFactory.Endpoints].va
.asInstanceOf[Event[Activity.State[Set[AddressedFactory[Req, Rep]]]]]
.asInstanceOf[Event[Activity.State[Set[EndpointFactory[Req, Rep]]]]]
} else {
TrafficDistributor.weightEndpoints(
AddrLifecycle.varAddrToActivity(dest, label),
Expand All @@ -411,18 +410,10 @@ object LoadBalancerFactory {
// newBalancer in a TrafficDistributor.
if (loadBalancerFactory.supportsWeighted && WeightedApertureToggle(label)) {

// Convert endpoints from AddressedFactories to EndpointFactories
val formattedEndpoints: Activity[Set[EndpointFactory[Req, Rep]]] = {
Activity(endpoints).map { set: Set[AddressedFactory[Req, Rep]] =>
set.map { af: AddressedFactory[Req, Rep] =>
af.factory
}
}
}
// Add the newBalancer to the stack
Stack.leaf(
role,
newBalancer(formattedEndpoints, disableEagerConnections = false, manageWeights = true)
newBalancer(Activity(endpoints), disableEagerConnections = false, manageWeights = true)
)
} else {
// Instead of simply creating a newBalancer here, we defer to the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package com.twitter.finagle.loadbalancer
import com.twitter.finagle._
import com.twitter.finagle.addr.WeightedAddress
import com.twitter.finagle.loadbalancer.distributor.AddrLifecycle._
import com.twitter.finagle.loadbalancer.distributor.AddressedFactory
import com.twitter.finagle.loadbalancer.distributor.BalancerEndpoints
import com.twitter.finagle.loadbalancer.distributor.CachedBalancer
import com.twitter.finagle.loadbalancer.distributor.WeightClass
Expand Down Expand Up @@ -42,9 +41,23 @@ private[finagle] object TrafficDistributor {
private[finagle] def weightEndpoints[Req, Rep](
addrs: Activity[Set[Address]],
newEndpoint: Address => ServiceFactory[Req, Rep],
eagerEviction: Boolean
): Event[Activity.State[Set[AddressedFactory[Req, Rep]]]] = {
val init = Map.empty[Address, AddressedFactory[Req, Rep]]
eagerEviction: Boolean,
): Event[Activity.State[Set[EndpointFactory[Req, Rep]]]] = {

// The EndpointFactoryProxy enables us to replace an EndpointFactory's
// Address without having to close the EndpointFactory and build a new one.
final case class EndpointFactoryProxy(
factory: EndpointFactory[Req, Rep],
address: Address)
extends EndpointFactory[Req, Rep] {

override def status: Status = factory.status
override def remake(): Unit = factory.remake()
override def close(deadline: Time): Future[Unit] = factory.close(deadline)
override def apply(conn: ClientConnection): Future[Service[Req, Rep]] = factory(conn)
}

val init = Map.empty[Address, EndpointFactoryProxy]
safelyScanLeft(init, addrs.run.changes.dedup) {
case (active, addrs) =>
// Note, if an update contains multiple `Address` instances that are the same
Expand All @@ -56,11 +69,12 @@ private[finagle] object TrafficDistributor {
// An update with an existing Address that has a new weight
// results in the the weight being overwritten but the [[ServiceFactory]]
// instance is maintained.
case Some(af @ AddressedFactory(_, WeightedAddress(_, w))) if w != weight =>
cache.updated(addr, af.copy(address = weightedAddr))
case Some(efp) if efp.weight != weight =>
cache.updated(addr, efp.copy(address = weightedAddr))
case None =>
val endpoint = new LazyEndpointFactory(() => newEndpoint(addr), addr)
cache.updated(addr, AddressedFactory(endpoint, weightedAddr))
val endpoint: LazyEndpointFactory[Req, Rep] =
new LazyEndpointFactory(() => newEndpoint(addr), addr)
cache.updated(addr, EndpointFactoryProxy(endpoint, weightedAddr))
case _ => cache
}
}
Expand All @@ -76,6 +90,31 @@ private[finagle] object TrafficDistributor {
}
}

private def removeStaleAddresses[EFactoryT <: EndpointFactory[_, _]](
merged: Map[Address, EFactoryT],
addresses: Set[Address],
eagerEviction: Boolean
): Map[Address, EFactoryT] = {
// Remove stale cache entries. When `eagerEviction` is false cache
// entries are only removed in subsequent stream updates.
val removed: Set[Address] = merged.keySet -- addresses.map {
case WeightedAddress(addr, _) => addr
}
removed.foldLeft(merged) {
case (cache, addr) =>
cache.get(addr) match {
case Some(f) if eagerEviction || f.status != Status.Open =>
try f.close()
catch {
case NonFatal(t) => log.warning(t, s"unable to close endpoint $addr")
}
cache - addr
case _ =>
cache
}
}
}

/**
* Distributes requests to `classes` according to their weight and size.
*/
Expand Down Expand Up @@ -177,7 +216,7 @@ private[finagle] object TrafficDistributor {
* [[c.t.f.loadbalancer.aperture.EagerConnections]] feature disabled.
*/
private class TrafficDistributor[Req, Rep](
dest: Event[Activity.State[Set[AddressedFactory[Req, Rep]]]],
dest: Event[Activity.State[Set[EndpointFactory[Req, Rep]]]],
newBalancer: (Activity[Set[EndpointFactory[Req, Rep]]], Boolean) => ServiceFactory[Req, Rep],
reuseEndpoints: Boolean,
rng: Rng = Rng.threadLocal,
Expand All @@ -190,14 +229,14 @@ private class TrafficDistributor[Req, Rep](
* Because balancer instances are stateful, they need to be cached across updates.
*/
private[this] def partition(
endpoints: Event[Activity.State[Set[AddressedFactory[Req, Rep]]]]
endpoints: Event[Activity.State[Set[EndpointFactory[Req, Rep]]]]
): Event[Activity.State[Iterable[WeightClass[Req, Rep]]]] = {
// Cache entries are balancer instances together with their backing collection
// which is updatable. The entries are keyed by weight class.
val init = Map.empty[Double, CachedBalancer[Req, Rep]]

val balancerDiffOps = new DiffOps[
AddressedFactory[Req, Rep],
EndpointFactory[Req, Rep],
CachedBalancer[Req, Rep]
] {
// Close balancers that don't correspond to new endpoints.
Expand All @@ -209,28 +248,26 @@ private class TrafficDistributor[Req, Rep](

// Construct new balancers from new endpoints.
def add(
factories: Set[AddressedFactory[Req, Rep]]
factories: Set[EndpointFactory[Req, Rep]]
): CachedBalancer[Req, Rep] = {
val group = factories.map(_.factory)
val weight = if (factories.isEmpty) 1D else factories.head.weight
val endpoints: BalancerEndpoints[Req, Rep] = Var(Activity.Ok(group))
val endpoints: BalancerEndpoints[Req, Rep] = Var(Activity.Ok(factories))

// we disable eager connections for non 1.0 weight class balancers. We assume the 1.0
// weight balancer to be the main balancer and because sessions are managed independently
// by each balancer, we avoid eagerly creating connections for balancers that may not be
// long-lived.
val bal = newBalancer(Activity(endpoints), weight != 1.0)
CachedBalancer(bal, endpoints, group.size)
CachedBalancer(bal, endpoints, factories.size)
}

// Update existing balancers with new endpoints.
def update(
factories: Set[AddressedFactory[Req, Rep]],
factories: Set[EndpointFactory[Req, Rep]],
cachedBalancer: CachedBalancer[Req, Rep]
): CachedBalancer[Req, Rep] = {
val group = factories.map(_.factory)
cachedBalancer.endpoints.update(Activity.Ok(group))
cachedBalancer.copy(size = group.size)
cachedBalancer.endpoints.update(Activity.Ok(factories))
cachedBalancer.copy(size = factories.size)
}
}

Expand All @@ -239,7 +276,7 @@ private class TrafficDistributor[Req, Rep](
val result = updatePartitionMap(
balancers,
activeSet,
(addressedFactory: AddressedFactory[Req, Rep]) => Seq(addressedFactory.weight),
(addressedFactory: EndpointFactory[Req, Rep]) => Seq(addressedFactory.weight),
balancerDiffOps)

// Intercept the empty balancer set and replace it with a single balancer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,11 @@ private class WeightedAperture[Req, Rep, NodeT <: ApertureNode[Req, Rep]](
"nodes" -> idxs.toSeq.sorted.map { i =>
Map[String, Any](
"index" -> i,
"name_server_weight" -> endpoints(i).factory.weight,
"weight" -> pdist.weight(i),
"address" -> endpoints(i).factory.toString,
"status" -> endpoints(i).factory.status.toString)
"status" -> endpoints(i).factory.status.toString
)
}
)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
package com.twitter.finagle.loadbalancer.distributor

import com.twitter.finagle.addr.WeightedAddress
import com.twitter.finagle.Addr
import com.twitter.finagle.Address
import com.twitter.finagle.Dtab
import com.twitter.finagle.Status
import com.twitter.logging.Logger
import com.twitter.util.Activity
import com.twitter.util.Event
import com.twitter.util.Var
import scala.collection.mutable.Builder
import scala.collection.immutable
import scala.collection.mutable
import scala.util.control.NonFatal

/**
* A collection of methods and traits used for managing Address lifecycles
Expand Down Expand Up @@ -133,28 +130,4 @@ private[finagle] object AddrLifecycle {
b += ((k, v.result))
b.result
}

def removeStaleAddresses[Req, Rep](
merged: Map[Address, AddressedFactory[Req, Rep]],
addresses: Set[Address],
eagerEviction: Boolean
): Map[Address, AddressedFactory[Req, Rep]] = {
// Remove stale cache entries. When `eagerEviction` is false cache
// entries are only removed in subsequent stream updates.
val removed: Set[Address] = merged.keySet -- addresses.map {
case WeightedAddress(addr, _) => addr
}
removed.foldLeft(merged) {
case (cache, addr) =>
cache.get(addr) match {
case Some(AddressedFactory(f, _)) if eagerEviction || f.status != Status.Open =>
try f.close()
catch {
case NonFatal(t) => log.warning(t, s"unable to close endpoint $addr")
}
cache - addr
case _ => cache
}
}
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package com.twitter.finagle.loadbalancer

import com.twitter.conversions.DurationOps._
import com.twitter.finagle._
import com.twitter.finagle.stats.{Counter, InMemoryStatsReceiver}
import com.twitter.util.{Await, Future, Time}
import com.twitter.finagle.stats.Counter
import com.twitter.finagle.stats.InMemoryStatsReceiver
import com.twitter.util.Await
import com.twitter.util.Future
import com.twitter.util.Time
import org.scalacheck.Gen
import org.scalatest.concurrent.Conductors
import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks
Expand Down Expand Up @@ -68,7 +71,7 @@ class BalancerTest extends AnyFunSuite with Conductors with ScalaCheckDrivenProp
}

def newFac(_status: Status = Status.Open) = new EndpointFactory[Unit, Unit] {
def address: Address = Address.Failed(new Exception)
val address: Address = Address.Failed(new Exception)
def remake(): Unit = ()

def apply(conn: ClientConnection): Future[Service[Unit, Unit]] = Future.never
Expand Down
Loading

0 comments on commit 4043382

Please sign in to comment.