Skip to content

Commit

Permalink
finagle-memcached: Move all paritioning-related code under the partit…
Browse files Browse the repository at this point in the history
…ioning package

Problem:

In order to make use of the Ketama related partitioning classes for
other protocols, it will be necessary to move them to a new top-level
finagle package that the memcache and redis protocols can share.

Solution:

Create a new top-level finagle project 'finagle-partitioning' and move all relevant classes to the new module. Update references throughout the codebase.

Result:

The memcache client partitioning classes are all collected under the
c.t.f.m.partitioning package in the finagle-partitioning module.

JIRA Issues: CSL-8740

Differential Revision: https://phabricator.twitter.biz/D359303
  • Loading branch information
slyphon authored and jenkins committed Aug 27, 2019
1 parent 368f6ac commit f27073d
Show file tree
Hide file tree
Showing 41 changed files with 434 additions and 301 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.rst
Expand Up @@ -26,6 +26,17 @@ Breaking API Changes
`private[this] count(Long, Response)` and is no longer part of the public API.
``PHAB_ID=D350733``

* finagle-partitioning: the hash-based routing that memcached uses has been relocated to a new
top-level module so that it can be used more broadly across protocols. This results
in several classes moving to the c.t.f.partitioning package:
1. The `Memcached.param.EjectFailedHost`, `KeyHasher`, and `NumReps` parameters are now
available under `c.t.f.partitioning.param`
2. The `FailureAccrualException` and `CacheNode` definitions are now in the `c.t.f.paritioning`
package.
3. The `ZkMetadata` class has moved to `c.t.f.p.zk` and the finagle-serverset module now depends
on finagle-partitioning.
``PHAB_ID=D359303``

Runtime Behavior Changes
~~~~~~~~~~~~~~~~~~~~~~~~

Expand Down
20 changes: 19 additions & 1 deletion build.sbt
Expand Up @@ -259,6 +259,7 @@ lazy val projectList = Seq[sbt.ProjectReference](
finagleZipkinCore,
finagleZipkinScribe,
finagleServersets,
finaglePartitioning,
finagleTunable,
finagleException,
finagleIntegration,
Expand Down Expand Up @@ -503,7 +504,23 @@ lazy val finagleServersets = Project(
if (scalaVersion.value.startsWith("2.12")) Seq("-no-java-comments")
else Nil
}
).dependsOn(finagleCore)
).dependsOn(finagleCore, finaglePartitioning)

lazy val finaglePartitioning = Project(
id = "finagle-partitioning",
base = file("finagle-partitioning")
).settings(
sharedSettings
).settings(
name := "finagle-partitioning",
libraryDependencies ++= Seq(
util("core"),
util("hashing")
)
).dependsOn(
finagleNetty4,
finagleCore % "compile->compile;test->test",
)

lazy val finagleTunable = Project(
id = "finagle-tunable",
Expand Down Expand Up @@ -610,6 +627,7 @@ lazy val finagleMemcached = Project(
finagleNetty4,
finagleCore % "compile->compile;test->test",
finagleServersets,
finaglePartitioning,
finagleStats,
finagleToggle
)
Expand Down
2 changes: 2 additions & 0 deletions finagle-memcached/src/main/scala/BUILD
Expand Up @@ -26,6 +26,7 @@ scala_library(
"finagle/finagle-memcached/src/main/java:pants-workaround",
"finagle/finagle-memcached/src/main/resources",
"finagle/finagle-netty4",
"finagle/finagle-partitioning",
"finagle/finagle-serversets",
"finagle/finagle-serversets/src/main/java",
"finagle/finagle-serversets/src/main/thrift:thrift-java",
Expand All @@ -41,6 +42,7 @@ scala_library(
exports = [
"3rdparty/jvm/com/twitter/bijection:core",
"finagle/finagle-core/src/main/scala",
"finagle/finagle-partitioning",
"util/util-core/src/main/scala",
],
)
Expand Up @@ -36,6 +36,13 @@ import com.twitter.finagle.param.{
Tracer => _,
_
}
import com.twitter.finagle.partitioning.param.{EjectFailedHost, KeyHasher, NumReps}
import com.twitter.finagle.partitioning.{
CacheNode,
KetamaClientKey,
KetamaFailureAccrualFactory,
NodeHealth
}
import com.twitter.finagle.pool.SingletonPool
import com.twitter.finagle.server.{Listener, ServerInfo, StackServer, StdStackServer}
import com.twitter.finagle.service._
Expand Down Expand Up @@ -184,38 +191,6 @@ object Memcached extends finagle.Client[Command, Response] with finagle.Server[C
private[this] val toggle = Toggles(UsePartitioningMemcachedClientToggle)
private[this] def UsePartitioningMemcachedClient = toggle(ServerInfo().id.hashCode)

/**
* Memcached specific stack params.
*/
object param {
case class EjectFailedHost(v: Boolean) {
def mk(): (EjectFailedHost, Stack.Param[EjectFailedHost]) =
(this, EjectFailedHost.param)
}

object EjectFailedHost {
implicit val param = Stack.Param(EjectFailedHost(false))
}

case class KeyHasher(hasher: hashing.KeyHasher) {
def mk(): (KeyHasher, Stack.Param[KeyHasher]) =
(this, KeyHasher.param)
}

object KeyHasher {
implicit val param = Stack.Param(KeyHasher(hashing.KeyHasher.KETAMA))
}

case class NumReps(reps: Int) {
def mk(): (NumReps, Stack.Param[NumReps]) =
(this, NumReps.param)
}

object NumReps {
implicit val param = Stack.Param(NumReps(KetamaPartitionedClient.DefaultNumReps))
}
}

object Client {

private[Memcached] val ProtocolLibraryName = "memcached"
Expand Down Expand Up @@ -331,7 +306,7 @@ object Memcached extends finagle.Client[Command, Response] with finagle.Server[C
val Logger(logger) = params[Logger]
val label0 = if (label == "") params[Label].label else label

val param.KeyHasher(hasher) = params[param.KeyHasher]
val KeyHasher(hasher) = params[KeyHasher]
registerClient(label0, hasher.toString)

def partitionAwareFinagleClient() = {
Expand All @@ -350,7 +325,7 @@ object Memcached extends finagle.Client[Command, Response] with finagle.Server[C
logger.info(s"Using the old memcached client: $destination")

val finagle.param.Stats(sr) = params[finagle.param.Stats]
val param.NumReps(numReps) = params[param.NumReps]
val NumReps(numReps) = params[NumReps]

val scopedSr = sr.scope(label0)
val healthBroker = new Broker[NodeHealth]
Expand Down Expand Up @@ -395,14 +370,14 @@ object Memcached extends finagle.Client[Command, Response] with finagle.Server[C
* cache host from a separate mechanism that's based on a global view.
*/
def withEjectFailedHost(eject: Boolean): Client =
configured(param.EjectFailedHost(eject))
configured(EjectFailedHost(eject))

/**
* Defines the hash function to use for partitioned clients when
* mapping keys to partitions.
*/
def withKeyHasher(hasher: hashing.KeyHasher): Client =
configured(param.KeyHasher(hasher))
configured(KeyHasher(hasher))

/**
* Duplicate each node across the hash ring according to `reps`.
Expand All @@ -411,7 +386,7 @@ object Memcached extends finagle.Client[Command, Response] with finagle.Server[C
* details.
*/
def withNumReps(reps: Int): Client =
configured(param.NumReps(reps))
configured(NumReps(reps))

/**
* Configures the number of concurrent `connections` a single endpoint has.
Expand Down
@@ -1,33 +1,14 @@
package com.twitter.finagle.memcached

import _root_.java.net.{SocketAddress, InetSocketAddress}
import _root_.java.net.{InetSocketAddress, SocketAddress}
import com.twitter.finagle.{Addr, Address, Group, Resolver}
import com.twitter.finagle.common.zookeeper._
import com.twitter.finagle.stats.{ClientStatsReceiver, StatsReceiver, NullStatsReceiver}
import com.twitter.finagle.zookeeper.{ZkGroup, DefaultZkClientFactory}
import com.twitter.finagle.partitioning.{CacheNode, CacheNodeMetadata}
import com.twitter.finagle.stats.{ClientStatsReceiver, NullStatsReceiver, StatsReceiver}
import com.twitter.finagle.zookeeper.{DefaultZkClientFactory, ZkGroup}
import com.twitter.thrift.Status.ALIVE
import com.twitter.util._

object CacheNode {

/**
* Utility method for translating a `CacheNode` to an `Address`
* (used when constructing a `Name` representing a `Cluster`).
*/
private[memcached] val toAddress: CacheNode => Address = {
case CacheNode(host, port, weight, key) =>
val metadata = CacheNodeMetadata.toAddrMetadata(CacheNodeMetadata(weight, key))
Address.Inet(new InetSocketAddress(host, port), metadata)
}
}

// Type definition representing a cache node
case class CacheNode(host: String, port: Int, weight: Int, key: Option[String] = None)
extends SocketAddress {
// Use overloads to keep the same ABI
def this(host: String, port: Int, weight: Int) = this(host, port, weight, None)
}

/**
* Indicates that an error occurred while resolving a cache address.
* See [[com.twitter.finagle.memcached.TwitterCacheResolver]] for details.
Expand Down

0 comments on commit f27073d

Please sign in to comment.