Skip to content

Commit

Permalink
finagle-thrift: Public the experimental partitioning APIs
Browse files Browse the repository at this point in the history
Problem/Solution

1. This set of APIs is tested from simulations and exposed to experiments.

2. The `responseMergerRegistry` and `requestMergerRegistry` are called
per-request, they should be `val`s to avoid evaluate every time.

JIRA Issues: CSL-9213

Differential Revision: https://phabricator.twitter.biz/D503436
  • Loading branch information
yufangong authored and jenkins committed Jun 22, 2020
1 parent fb39d24 commit bf1d47b
Show file tree
Hide file tree
Showing 7 changed files with 20 additions and 16 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.rst
Expand Up @@ -41,6 +41,10 @@ Breaking API Changes
New Features
~~~~~~~~~~~~

* finagle-thrift: Expose `c.t.f.thrift.exp.partitioning.PartitioningStrategy`,
the bundled PartitioningStrategy APIs are public for experiments.
``PHAB_ID=D503436``

* finagle-http: Add LoadBalancedHostFilter to allow setting host header after LoadBalancer
``PHAB_ID=D498954``

Expand Down
Expand Up @@ -6,7 +6,7 @@ import com.twitter.scrooge.{ThriftMethodIface, ThriftStructIface}
import com.twitter.util.{Future, Try}
import scala.collection.mutable

private[partitioning] object PartitioningStrategy {
object PartitioningStrategy {

/**
*
Expand Down Expand Up @@ -147,20 +147,20 @@ sealed trait PartitioningStrategy {
* For message fan-out cases.
* @see [[ResponseMerger]]
*/
def responseMergerRegistry(): ResponseMergerRegistry = ResponseMergerRegistry.create()
val responseMergerRegistry: ResponseMergerRegistry = ResponseMergerRegistry.create()
}

private[partitioning] sealed trait HashingPartitioningStrategy extends PartitioningStrategy {
sealed trait HashingPartitioningStrategy extends PartitioningStrategy {

/**
* A RequestMergerRegistry implemented by client to supply [[RequestMerger]]s.
* For message fan-out cases.
* @see [[RequestMerger]]
*/
def requestMergerRegistry(): RequestMergerRegistry = RequestMergerRegistry.create()
val requestMergerRegistry: RequestMergerRegistry = RequestMergerRegistry.create()
}

private[partitioning] sealed trait CustomPartitioningStrategy extends PartitioningStrategy {
sealed trait CustomPartitioningStrategy extends PartitioningStrategy {

/**
* Gets the logical partition identifier from a host identifier, host identifiers are derived
Expand All @@ -180,7 +180,7 @@ private[partitioning] sealed trait CustomPartitioningStrategy extends Partitioni
}
private[partitioning] object Disabled extends PartitioningStrategy

private[partitioning] object ClientHashingStrategy {
object ClientHashingStrategy {

/**
* Thrift requests not specifying hashing keys will fall in here. This allows a
Expand All @@ -195,7 +195,7 @@ private[partitioning] object ClientHashingStrategy {
/**
* An API to set a consistent hashing partitioning strategy for a Thrift/ThriftMux Client.
*/
private[partitioning] abstract class ClientHashingStrategy extends HashingPartitioningStrategy {
abstract class ClientHashingStrategy extends HashingPartitioningStrategy {
// input: original thrift request
// output: a Map of hashing keys and split requests
type ToPartitionedMap = PartialFunction[ThriftStructIface, Map[Any, ThriftStructIface]]
Expand All @@ -211,7 +211,7 @@ private[partitioning] abstract class ClientHashingStrategy extends HashingPartit
def getHashingKeyAndRequest: ToPartitionedMap
}

private[partitioning] object ClientCustomStrategy {
object ClientCustomStrategy {

/**
* Thrift requests not specifying partition ids will fall in here. This allows a
Expand All @@ -230,7 +230,7 @@ private[partitioning] object ClientCustomStrategy {
/**
* An API to set a custom partitioning strategy for a Thrift/ThriftMux Client.
*/
private[partitioning] abstract class ClientCustomStrategy extends CustomPartitioningStrategy {
abstract class ClientCustomStrategy extends CustomPartitioningStrategy {
// input: original thrift request
// output: Future Map of partition ids and split requests
type ToPartitionedMap = PartialFunction[ThriftStructIface, Future[Map[Int, ThriftStructIface]]]
Expand Down
Expand Up @@ -102,7 +102,7 @@ class ThriftCustomPartitioningService[Req, Rep](
val responseMerger = customStrategy match {
case clientCustomStrategy: ClientCustomStrategy =>
ClientDeserializeCtx.get.rpcName.flatMap { rpcName =>
clientCustomStrategy.responseMergerRegistry().get(rpcName)
clientCustomStrategy.responseMergerRegistry.get(rpcName)
} match {
case Some(merger) => merger
case None =>
Expand Down
Expand Up @@ -68,7 +68,7 @@ final private[partitioning] class ThriftHashingPartitioningService[Req, Rep](
val requestMerger: String => Option[RequestMerger[ThriftStructIface]] = { rpcName: String =>
hashingStrategy match {
case clientHashingStrategy: ClientHashingStrategy =>
clientHashingStrategy.requestMergerRegistry().get(rpcName)
clientHashingStrategy.requestMergerRegistry.get(rpcName)
}
}

Expand Down Expand Up @@ -101,7 +101,7 @@ final private[partitioning] class ThriftHashingPartitioningService[Req, Rep](
val responseMerger = hashingStrategy match {
case clientHashingStrategy: ClientHashingStrategy =>
ClientDeserializeCtx.get.rpcName.flatMap { rpcName =>
clientHashingStrategy.responseMergerRegistry().get(rpcName)
clientHashingStrategy.responseMergerRegistry.get(rpcName)
} match {
case Some(merger) => merger
case None =>
Expand Down
Expand Up @@ -229,7 +229,7 @@ abstract class PartitionAwareClientEndToEndTest extends FunSuite {
Future.value(partitionIdAndRequest)
}

override def responseMergerRegistry: ResponseMergerRegistry = {
override val responseMergerRegistry: ResponseMergerRegistry = {
ResponseMergerRegistry.create.add(GetBoxes, getBoxesRepMerger)
}
}
Expand Down Expand Up @@ -265,7 +265,7 @@ abstract class PartitionAwareClientEndToEndTest extends FunSuite {
Future.value(partitionIdAndRequest)
}

override def responseMergerRegistry: ResponseMergerRegistry = {
override val responseMergerRegistry: ResponseMergerRegistry = {
ResponseMergerRegistry.create.add(GetBoxes, getBoxesRepMerger)
}

Expand Down
Expand Up @@ -30,7 +30,7 @@ class ThriftCustomPartitioningServiceTest
Future.value(idsAndRequests)
}

override def responseMergerRegistry: PartitioningStrategy.ResponseMergerRegistry =
override val responseMergerRegistry: PartitioningStrategy.ResponseMergerRegistry =
ResponseMergerRegistry.create.add(AMethod, aResponseMerger)

}
Expand Down
Expand Up @@ -29,7 +29,7 @@ class ThriftHashingPartitioningServiceTest
override val requestMergerRegistry: RequestMergerRegistry =
RequestMergerRegistry.create.add(AMethod, aRequestMerger)

override def responseMergerRegistry: ResponseMergerRegistry =
override val responseMergerRegistry: ResponseMergerRegistry =
ResponseMergerRegistry.create.add(AMethod, aResponseMerger)
}

Expand Down

0 comments on commit bf1d47b

Please sign in to comment.