Permalink
Browse files

Consul: Add tag weight configuration applied on node addresses (#1653)

* Consul: Add tag weight configuration applied on node addresses

The consul namer can now be configured to set weights on bound addresses
based on tag-weights pairs in consul namer configuration.

* Consul: Add documentation for weights, use Seq over Set, and fix formatting.

* ConsulTest: Use Seq over Set
  • Loading branch information...
elecnix authored and adleong committed Nov 7, 2017
1 parent 42a00da commit 718514fb1d4b86153820880162d3c9559e115725
View
@@ -181,6 +181,18 @@ dtab: |
/svc => /#/io.l5d.consul/dc1/prod;
```
> Optionally, define node weight using tags:
```yaml
namers:
- kind: io.l5d.consul
weights:
- tag: experimental
weight: 0.1
- tag: primary
weight: 5.0
```
linkerd provides support for service discovery via [Consul](https://www.consul.io/).
Key | Default Value | Description
@@ -196,6 +208,7 @@ setHost | `false` | If `true`, HTTP requests resolved by Consul will have their
consistencyMode | `default` | Select between [Consul API consistency modes](https://www.consul.io/docs/agent/http.html) such as `default`, `stale` and `consistent`.
failFast | `false` | If `false`, disable fail fast and failure accrual for Consul client. Keep it `false` when using a local agent but change it to `true` when talking directly to an HA Consul API.
preferServiceAddress | `true` | If `true` use the service address if defined and default to the node address. If `false` always use the node address.
weights | none | List of tag-weight configurations, for adjusting the weights of node addresses. When a node matches more than one tag, it gets the highest matching weight. In the absence of match or configuration, nodes get a default weight of `1.0`.
### Consul Path Parameters
@@ -7,8 +7,7 @@ import com.twitter.finagle.tracing.NullTracer
import io.buoyant.config.types.Port
import io.buoyant.consul.utils.RichConsulClient
import io.buoyant.consul.v1
import io.buoyant.consul.v1.ConsistencyMode
import io.buoyant.consul.v1.HealthStatus
import io.buoyant.consul.v1.{ConsistencyMode, HealthStatus}
import io.buoyant.namer.{NamerConfig, NamerInitializer}
/**
@@ -28,6 +27,9 @@ import io.buoyant.namer.{NamerConfig, NamerInitializer}
* consistencyMode: default
* failFast: false
* preferServiceAddress: true
* weights:
* - tag: primary
* weight: 100
* </pre>
*/
class ConsulInitializer extends NamerInitializer {
@@ -37,6 +39,8 @@ class ConsulInitializer extends NamerInitializer {
object ConsulInitializer extends ConsulInitializer
case class TagWeight(tag: String, weight: Double)
case class ConsulConfig(
host: Option[String],
port: Option[Port],
@@ -47,7 +51,8 @@ case class ConsulConfig(
setHost: Option[Boolean] = None,
consistencyMode: Option[ConsistencyMode] = None,
failFast: Option[Boolean] = None,
preferServiceAddress: Option[Boolean] = None
preferServiceAddress: Option[Boolean] = None,
weights: Option[Seq[TagWeight]] = None
) extends NamerConfig {
@JsonIgnore
@@ -84,16 +89,21 @@ case class ConsulConfig(
}
val agent = v1.AgentApi(service)
val tagWeights: Map[String, Double] = weights match {
case Some(ws) => ws.map(tw => tw.tag -> tw.weight).toMap
case None => Map.empty
}
val stats = params[param.Stats].statsReceiver.scope(prefix.show.stripPrefix("/"))
includeTag match {
case Some(true) =>
ConsulNamer.tagged(
prefix, consul, agent, setHost.getOrElse(false), consistencyMode, preferServiceAddress, stats
prefix, consul, agent, setHost.getOrElse(false), consistencyMode, preferServiceAddress, tagWeights, stats
)
case _ =>
ConsulNamer.untagged(
prefix, consul, agent, setHost.getOrElse(false), consistencyMode, preferServiceAddress, stats
prefix, consul, agent, setHost.getOrElse(false), consistencyMode, preferServiceAddress, tagWeights, stats
)
}
}
@@ -14,9 +14,10 @@ object ConsulNamer {
setHost: Boolean = false,
consistency: Option[v1.ConsistencyMode] = None,
preferServiceAddress: Option[Boolean] = None,
weights: Map[String, Double] = Map.empty,
stats: StatsReceiver = NullStatsReceiver
): Namer = {
val lookup = new LookupCache(consulApi, agentApi, setHost, consistency, preferServiceAddress, stats)
val lookup = new LookupCache(consulApi, agentApi, setHost, consistency, preferServiceAddress, weights, stats)
new TaggedNamer(lookup, prefix)
}
@@ -27,9 +28,10 @@ object ConsulNamer {
setHost: Boolean = false,
consistency: Option[v1.ConsistencyMode] = None,
preferServiceAddress: Option[Boolean] = None,
weights: Map[String, Double] = Map.empty,
stats: StatsReceiver = NullStatsReceiver
): Namer = {
val lookup = new LookupCache(consulApi, agentApi, setHost, consistency, preferServiceAddress, stats)
val lookup = new LookupCache(consulApi, agentApi, setHost, consistency, preferServiceAddress, weights, stats)
new UntaggedNamer(lookup, prefix)
}
@@ -1,5 +1,6 @@
package io.buoyant.namer.consul
import com.twitter.finagle.Address.Inet
import com.twitter.finagle._
import com.twitter.finagle.stats.{NullStatsReceiver, StatsReceiver}
import com.twitter.util._
@@ -17,6 +18,7 @@ private[consul] class LookupCache(
setHost: Boolean = false,
consistency: Option[v1.ConsistencyMode] = None,
preferServiceAddress: Option[Boolean] = None,
weights: Map[String, Double] = Map.empty,
stats: StatsReceiver = NullStatsReceiver
) {
@@ -37,6 +39,7 @@ private[consul] class LookupCache(
domainOption,
consistency = consistency,
preferServiceAddress = preferServiceAddress,
weights,
serviceStats
)
log.debug("consul ns %s service %s found + %s", dc, key, residual.show)
@@ -5,6 +5,8 @@ import com.twitter.finagle.stats.StatsReceiver
import com.twitter.util._
import io.buoyant.consul.v1
import io.buoyant.namer.Metadata
import java.net.InetSocketAddress
import scala.util.control.NoStackTrace
private[consul] case class SvcKey(name: String, tag: Option[String]) {
@@ -34,6 +36,7 @@ private[consul] object SvcAddr {
domain: Option[String],
consistency: Option[v1.ConsistencyMode] = None,
preferServiceAddress: Option[Boolean] = None,
tagWeights: Map[String, Double] = Map.empty,
stats: Stats
): Var[Addr] = {
val meta = mkMeta(key, datacenter, domain)
@@ -45,7 +48,7 @@ private[consul] object SvcAddr {
blockingIndex = index,
consistency = consistency,
retry = true
).map(indexedToAddresses(preferServiceAddress))
).map(indexedToAddresses(preferServiceAddress, tagWeights))
// Start by fetching the service immediately, and then long-poll
// for service updates.
@@ -150,34 +153,45 @@ private[consul] object SvcAddr {
Addr.Metadata(Metadata.authority -> authority)
}
private[this] def indexedToAddresses(preferServiceAddress: Option[Boolean]): v1.Indexed[Seq[v1.ServiceNode]] => v1.Indexed[Set[Address]] = {
private[this] def indexedToAddresses(preferServiceAddress: Option[Boolean], tagWeights: Map[String, Double]): v1.Indexed[Seq[v1.ServiceNode]] => v1.Indexed[Set[Address]] = {
case v1.Indexed(nodes, idx) =>
val addrs = preferServiceAddress match {
case Some(false) => nodes.flatMap(serviceNodeToNodeAddr).toSet
case _ => nodes.flatMap(serviceNodeToAddr).toSet
case Some(false) => nodes.flatMap(serviceNodeToNodeAddr(_, tagWeights)).toSet
case _ => nodes.flatMap(serviceNodeToAddr(_, tagWeights)).toSet
}
v1.Indexed(addrs, idx)
}
/**
* Prefer service IPs to node IPs. Invalid addresses are ignored.
*/
private val serviceNodeToAddr: v1.ServiceNode => Traversable[Address] = { n =>
private def serviceNodeToAddr(n: v1.ServiceNode, w: Map[String, Double]): Traversable[Address] =
(n.Address, n.ServiceAddress, n.ServicePort) match {
case (_, Some(ip), Some(port)) if !ip.isEmpty => Try(Address(ip, port)).toOption
case (Some(ip), _, Some(port)) if !ip.isEmpty => Try(Address(ip, port)).toOption
case (_, Some(ip), Some(port)) if !ip.isEmpty => weightedAddress(ip, port, n, w)
case (Some(ip), _, Some(port)) if !ip.isEmpty => weightedAddress(ip, port, n, w)
case _ => None
}
}
/**
* Always use node IPs. Invalid addresses are ignored.
*/
private val serviceNodeToNodeAddr: v1.ServiceNode => Traversable[Address] = { n =>
private def serviceNodeToNodeAddr(n: v1.ServiceNode, w: Map[String, Double]): Traversable[Address] =
(n.Address, n.ServicePort) match {
case (Some(ip), Some(port)) if !ip.isEmpty => Try(Address(ip, port)).toOption
case (Some(ip), Some(port)) if !ip.isEmpty => weightedAddress(ip, port, n, w)
case _ => None
}
/**
* Apply weight to the address, taking the heaviest tag of the service.
*/
private[this] def weightedAddress(ip: String, port: Int, n: v1.ServiceNode, w: Map[String, Double]) = {
val weight = n.ServiceTags.map(_.flatMap(w.get)) match {
case None => 1.0
case Some(Nil) => 1.0
case Some(ws) => ws.max
}
val meta = Addr.Metadata((Metadata.endpointWeight, weight))
Try(Address.Inet(new InetSocketAddress(ip, port), meta)).toOption
}
private[this] val ServiceRelease =
@@ -647,4 +647,43 @@ class ConsulNamerTest extends FunSuite with Awaits {
assert(stats.counters.get(Seq("service", "closes")).contains(1))
assert(stats.counters.get(Seq("service", "errors")) == None)
}
test("Namer returns weighted bound address metadata when service has configured weight tag") {
class TestApi extends CatalogApi(null, "/v1") {
override def serviceNodes(
serviceName: String,
datacenter: Option[String],
tag: Option[String] = None,
blockingIndex: Option[String] = None,
consistency: Option[ConsistencyMode] = None,
retry: Boolean = false
): Future[Indexed[Seq[ServiceNode]]] = blockingIndex match {
case Some("0") | None =>
val node = testServiceNode.copy(ServiceTags = Some(Seq("production", "primary")))
Future.value(Indexed[Seq[ServiceNode]](Seq(node), Some("1")))
case _ => Future.never //don't respond to blocking index calls
}
}
val stats = new InMemoryStatsReceiver
val namer = ConsulNamer.untagged(
Path.read("/test"),
new TestApi(),
new TestAgentApi("consul.acme.co"),
weights = Map("primary" -> 100),
stats = stats
)
@volatile var state: Activity.State[NameTree[Name]] = Activity.Pending
namer.lookup(Path.read("/dc1/servicename/residual")).states respond { state = _ }
assertOnAddrs(state) { (addrs, _) =>
assert(addrs.size == 1)
val inet = addrs.head.asInstanceOf[Address.Inet]
assert(inet.metadata == Map(Metadata.endpointWeight -> 100))
()
}
assert(stats.counters.get(Seq("service", "opens")).contains(1))
assert(stats.counters.get(Seq("service", "updates")).contains(1))
assert(stats.counters.get(Seq("lookups")).contains(1))
}
}
@@ -46,6 +46,9 @@ class ConsulTest extends FunSuite {
|consistencyMode: stale
|failFast: true
|preferServiceAddress: false
|weights:
| - tag: primary
| weight: 100
""".stripMargin
val mapper = Parser.objectMapper(yaml, Iterable(Seq(ConsulInitializer)))
@@ -60,6 +63,7 @@ class ConsulTest extends FunSuite {
assert(consul.consistencyMode == Some(ConsistencyMode.Stale))
assert(consul.failFast == Some(true))
assert(consul.preferServiceAddress == Some(false))
assert(consul.weights == Some(Seq(TagWeight("primary", 100.0))))
assert(!consul.disabled)
}
}

0 comments on commit 718514f

Please sign in to comment.