From a944f2a88497bf76486686aa8d949dd34a325c98 Mon Sep 17 00:00:00 2001 From: Oliver Gould Date: Fri, 29 Apr 2016 15:30:55 -0700 Subject: [PATCH] Introduce an etcd client (#303) * Introduce an etcd client io.buoyant.etcd provides a client library over etcd's HTTP api. This will facilitate #301 and #302. --- etcd/README.md | 17 + .../io/buoyant/etcd/EtcdIntegrationTest.scala | 194 ++++++++ .../twitter/finagle/buoyant/FormParams.scala | 38 ++ .../main/scala/io/buoyant/etcd/ApiError.scala | 64 +++ .../src/main/scala/io/buoyant/etcd/Etcd.scala | 115 +++++ etcd/src/main/scala/io/buoyant/etcd/Key.scala | 284 +++++++++++ .../src/main/scala/io/buoyant/etcd/Node.scala | 104 +++++ .../main/scala/io/buoyant/etcd/NodeOp.scala | 111 +++++ .../test/scala/io/buoyant/etcd/KeyTest.scala | 441 ++++++++++++++++++ project/Base.scala | 7 +- project/Deps.scala | 6 + project/LinkerdBuild.scala | 25 +- 12 files changed, 1386 insertions(+), 20 deletions(-) create mode 100644 etcd/README.md create mode 100644 etcd/src/integration/scala/io/buoyant/etcd/EtcdIntegrationTest.scala create mode 100644 etcd/src/main/scala/com/twitter/finagle/buoyant/FormParams.scala create mode 100644 etcd/src/main/scala/io/buoyant/etcd/ApiError.scala create mode 100644 etcd/src/main/scala/io/buoyant/etcd/Etcd.scala create mode 100644 etcd/src/main/scala/io/buoyant/etcd/Key.scala create mode 100644 etcd/src/main/scala/io/buoyant/etcd/Node.scala create mode 100644 etcd/src/main/scala/io/buoyant/etcd/NodeOp.scala create mode 100644 etcd/src/test/scala/io/buoyant/etcd/KeyTest.scala diff --git a/etcd/README.md b/etcd/README.md new file mode 100644 index 0000000000..5f87936187 --- /dev/null +++ b/etcd/README.md @@ -0,0 +1,17 @@ +# Finagle Etcd client # + +## `io.buoyant.etcd` ## + +The `io.buoyant.etcd` package contains a generically useful Etcd +client built on finagle-http. The client supports a variety of +asynchronous `key` operations on etcd. It goes beyond the basic etcd +operations to provide a `watch` utility that models a key's (or +tree's) state as an `com.twitter.util.Activity`. + +### TODO ### + +- `compareAndDelete` isn't yet implemented. minor oversight. +- Use residual paths on the client to instrument chrooted key +operations +- Admin/Stats APIs +- move to gRPC api diff --git a/etcd/src/integration/scala/io/buoyant/etcd/EtcdIntegrationTest.scala b/etcd/src/integration/scala/io/buoyant/etcd/EtcdIntegrationTest.scala new file mode 100644 index 0000000000..62fc3cf424 --- /dev/null +++ b/etcd/src/integration/scala/io/buoyant/etcd/EtcdIntegrationTest.scala @@ -0,0 +1,194 @@ +package io.buoyant.etcd + +import com.twitter.conversions.time._ +import com.twitter.finagle.{Http, Path} +import com.twitter.finagle.util.DefaultTimer +import com.twitter.io.Buf +import com.twitter.util.{Events => _, _} +import io.buoyant.test.{Awaits, Events} +import java.io.File +import java.util.UUID +import org.scalatest.BeforeAndAfterAll +import org.scalatest.fixture.FunSuite +import scala.sys.process.{Process, ProcessLogger} +import scala.util.Random + +/** + * Etcd client integration tests. + * + * Boots an etcd instance on a random local port. + */ +class EtcdIntegrationTest extends FunSuite with Awaits with BeforeAndAfterAll { + + lazy val devNull = new File("/dev/null") + lazy val etcdDir = s"/tmp/io.buoyant.etcd-${UUID.randomUUID.toString}" + def randomPort = 32000 + (Random.nextDouble * 30000).toInt + lazy val etcdPort = randomPort + lazy val etcdUrl = s"http://127.0.0.1:$etcdPort" + lazy val etcdPeerUrl = s"http://127.0.0.1:$randomPort" + lazy val etcdCmd = Seq( + "etcd", + "--data-dir", etcdDir, + "--listen-client-urls", etcdUrl, + "--advertise-client-urls", etcdUrl, + "--listen-peer-urls", etcdPeerUrl, + "--initial-advertise-peer-urls", etcdPeerUrl, + "--initial-cluster", s"default=$etcdPeerUrl", + "--force-new-cluster" + ) + + var process: Process = _ + + override def beforeAll: Unit = { + val which = Process(Seq("which", "etcd")).run(ProcessLogger(devNull)) + which.exitValue match { + case 0 => + info(s"""${etcdCmd mkString " "}""") + try { + process = Process(etcdCmd).run(ProcessLogger(devNull)) + } catch { + case e: Exception => fail(s"etcd failed to start: ${e.getMessage}") + } + Thread.sleep(5000) // give some time to initialize + + case _ => cancel("etcd not on the PATH") + } + } + + override def afterAll: Unit = { + if (process != null) { + process.destroy() + } + Process(Seq("rm", "-rf", etcdDir)).! + } + + def serverName = s"/$$/inet/127.1/$etcdPort" + + private[this] implicit val timer = DefaultTimer.twitter + override def defaultWait = 1.second + + type FixtureParam = Etcd + + def withFixture(test: OneArgTest) = { + val client = Http.newService(serverName) + try withFixture(test.toNoArgTest(new Etcd(client))) + finally await { client.close() } + } + + + test("version") { etcd => + val version = await { etcd.version() } + + assert(version.etcdserver.length > 0) + info(s"release: ${version.etcdserver}") + + assert(version.etcdcluster.length > 0) + info(s"internal: ${version.etcdcluster}") + } + + val rootPath = Path.Utf8("it") + + test("create and delete data node") { etcd => + val key = etcd.key(rootPath) + val value = UUID.randomUUID().toString + + val createOp = await { key.create(Some(Buf.Utf8(value))) } + assert(createOp.action == NodeOp.Action.Create) + createOp.node match { + case createData@Node.Data(createKey, createModified, createCreated, None, createVal) => + assert(createKey.take(1) == key.path) + assert(createModified == createCreated) + assert(createModified == createOp.etcd.index) + assert(createVal == Buf.Utf8(value)) + info("create data") + + val delOp = await { etcd.key(createKey).delete() } + assert(delOp.action == NodeOp.Action.Delete) + delOp.node match { + case Node.Data(delKey, delModified, delCreated, None, delVal) => + assert(delKey == createKey) + assert(delModified > createModified) + assert(delCreated == createCreated) + assert(delVal == Buf.Empty) + + case node => + fail(s"expected deleted data node, found $node") + } + assert(createOp.etcd.index < delOp.etcd.index) + assert(createOp.etcd.clusterId == delOp.etcd.clusterId) + + delOp.prevNode match { + case Some(Node.Data(prevKey, prevModified, prevCreated, None, prevVal)) => + assert(prevKey == createKey) + assert(prevModified == createModified) + assert(prevCreated == createCreated) + assert(prevVal == Buf.Utf8(value)) + case node => + fail(s"expected previous data node, found $node") + } + info("deleted key") + + case node => + fail(s"expected set data node, found $node") + } + } + + test("get(wait), set(ttl), get(wait, idx), expire") { etcd => + val key = etcd.key(Path.Utf8(UUID.randomUUID().toString)) + + info("waiting for data") + val wait = key.get(wait = true) + + assert(!wait.isDefined) + + val woof = Buf.Utf8("woof woof woof") + val ttl = 10.seconds + val created = await { key.set(value = Some(woof), ttl=Some(ttl)) } + assert(created.action == NodeOp.Action.Set) + created.node match { + case Node.Data(path, _, _, lease, value) => + assert(path == key.path) + assert(lease.isDefined) + assert(value == woof) + info("setting data") + + case node => + fail(s"setting data node, found $node") + } + + val set = await(wait) + assert(set.action == NodeOp.Action.Set) + set.node match { + case Node.Data(path, _, _, lease, value) => + assert(path == created.node.key) + assert(lease.isDefined) + assert(lease.map(_.expiration) == created.node.lease.map(_.expiration)) + assert(value == woof) + case node => + fail(s"observing data $node") + } + + info("waiting for expiration") + val expire = await(ttl+1.second) { + key.get(wait = true, waitIndex = Some(set.node.modifiedIndex+1)) + } + assert(expire.action == NodeOp.Action.Expire) + expire.node match { + case Node.Data(path, _, _, None, Buf.Empty) => + assert(path == created.node.key) + case node => + fail(s"expecting expired data node, found $node") + } + assert(expire.prevNode.isDefined) + val Some(prev) = expire.prevNode + prev match { + case Node.Data(path, _, _, lease, value) => + assert(path == created.node.key) + assert(lease == None) + assert(value == woof) + case node => + fail(s"expecting expired data node, found $node") + } + } + +} diff --git a/etcd/src/main/scala/com/twitter/finagle/buoyant/FormParams.scala b/etcd/src/main/scala/com/twitter/finagle/buoyant/FormParams.scala new file mode 100644 index 0000000000..842ebae760 --- /dev/null +++ b/etcd/src/main/scala/com/twitter/finagle/buoyant/FormParams.scala @@ -0,0 +1,38 @@ +package com.twitter.finagle.buoyant + +import com.twitter.finagle.http.Request +import org.jboss.netty.handler.codec.http.multipart._ +import scala.collection.JavaConverters._ + +/** + * Finagle's request type doesn't expose any way to interact with form + * parameters. Netty provides utilities for this, but the underlying + * netty message types can't be accessed from outside of the + * finagle. SO, we just shove this into the finagle package so that we + * can do this. This should be fed back upstream and then removed. + */ +object FormParams { + type Params = Map[String, Seq[String]] + + def set(req: Request, params: Seq[(String, String)]): Unit = { + val enc = new HttpPostRequestEncoder(req.httpRequest, false) + for ((k, v) <- params) { + enc.addBodyAttribute(k, v) + } + enc.finalizeRequest() + } + + def get(req: Request): Params = { + val dec = new HttpPostRequestDecoder(req.httpRequest) + dec.getBodyHttpDatas().asScala.foldLeft[Params](Map.empty) { + case (attrs, attr: Attribute) => + val k = attr.getName + val v = attr.getValue + val vals = attrs.getOrElse(k, Seq.empty) :+ v + attrs + (k -> vals) + + case (attrs, _) => attrs + } + } + +} diff --git a/etcd/src/main/scala/io/buoyant/etcd/ApiError.scala b/etcd/src/main/scala/io/buoyant/etcd/ApiError.scala new file mode 100644 index 0000000000..0dd9448075 --- /dev/null +++ b/etcd/src/main/scala/io/buoyant/etcd/ApiError.scala @@ -0,0 +1,64 @@ +package io.buoyant.etcd + +case class ApiError( + errorCode: Int, + message: String, + cause: String, + index: Int +) extends Exception(message) + +// see: https://github.com/coreos/etcd/blob/master/Documentation/v2/errorcode.md +object ApiError { + + class ErrorGroup(val codes: Int*) { + def unapply(code: Int): Option[Int] = + if (codes contains code) Some(code) else None + } + + val KeyNotFound = 100 + val TestFailed = 101 + val NotFile = 102 + val NotDir = 104 + val NodeExist = 105 + val RootReadOnly = 107 + val DirNotEmpty = 108 + object Command extends ErrorGroup( + KeyNotFound, + TestFailed, + NotFile, + NotDir, + NodeExist, + RootReadOnly, + DirNotEmpty + ) + + val PrevValueRequired = 201 + val TtlNan = 202 + val IndexNan = 203 + val InvalidField = 209 + val InvalidForm = 210 + object Content extends ErrorGroup( + PrevValueRequired, + TtlNan, + IndexNan, + InvalidField, + InvalidForm + ) + + val RaftInternal = 300 + val LeaderElect = 301 + object Raft extends ErrorGroup( + RaftInternal, + LeaderElect + ) + + val WatcherCleared = 400 + val EventIndexCleared = 401 + object Etcd extends ErrorGroup( + WatcherCleared, + EventIndexCleared + ) + + object Known extends ErrorGroup(Command.codes ++ Content.codes ++ Raft.codes ++ Etcd.codes: _*) +} + diff --git a/etcd/src/main/scala/io/buoyant/etcd/Etcd.scala b/etcd/src/main/scala/io/buoyant/etcd/Etcd.scala new file mode 100644 index 0000000000..a2ad67fc5b --- /dev/null +++ b/etcd/src/main/scala/io/buoyant/etcd/Etcd.scala @@ -0,0 +1,115 @@ +package io.buoyant.etcd + +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.DefaultScalaModule +import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper +import com.twitter.finagle.{Path, Service} +import com.twitter.finagle.buoyant.FormParams +import com.twitter.finagle.http.{MediaType, Message, Method, Request, Response, Status} +import com.twitter.io.Buf +import com.twitter.util._ + +case class Version(etcdserver: String, etcdcluster: String) + +case class UnexpectedResponse( + method: Method, + uri: String, + params: Seq[(String, String)], + status: Status, + state: Etcd.State +) extends Exception({ + val ps = params.map { case (k, v) => s"($k -> $v)" }.mkString(", ") + s"""$method $uri [$ps] $status""" +}) + +object Etcd { + + private[etcd] val keysPrefixPath = Path.Utf8("v2", "keys") + private[etcd] val versionPath = Path.Utf8("version") + + private[etcd] def mkReq( + path: Path, + method: Method = Method.Get, + params: Seq[(String, String)] = Nil + ): Request = { + method match { + case Method.Post | Method.Put => + val req = Request(method, path.show) + req.contentType = MediaType.WwwForm + FormParams.set(req, params) + req + + case _ => + val req = Request(path.show, params: _*) + req.method = method + req + } + } + + private[etcd] val mapper = new ObjectMapper with ScalaObjectMapper + mapper.registerModule(DefaultScalaModule) + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + def readJson[T: Manifest](buf: Buf): Try[T] = { + val Buf.ByteArray.Owned(bytes, begin, end) = Buf.ByteArray.coerce(buf) + Try(mapper.readValue[T](bytes, begin, end - begin)) + } + + /** + * An Etcd cluster's state as described in response headers. + * + * `index` reflects the X-Etcd-Index value as described at + * https://coreos.com/etcd/docs/latest/api.html. + */ + case class State( + index: Long, + clusterId: String = "" + ) + + object State { + private[etcd] object Headers { + val ClusterId = "x-etcd-cluster-id" + val EtcdIndex = "x-etcd-index" + } + + private[etcd] def mk(msg: Message): State = { + val index = msg.headerMap.get(Headers.EtcdIndex) + .flatMap { i => Try(i.toLong).toOption } + .getOrElse(0L) + + val id = msg.headerMap.getOrElse(Headers.ClusterId, "") + + State(index, id) + } + } +} + +/** + * An etcd client. + * + * The client service is responsible for setting the Host: header, etc... + */ +class Etcd(client: Service[Request, Response]) extends Closable { + + import Etcd._ + + /** Fetch etcd server version info */ + def version(): Future[Version] = { + val req = mkReq(versionPath) + req.headerMap("accept") = MediaType.Json + + client(req).flatMap { rsp => + rsp.status match { + case Status.Ok => + Future.const(readJson[Version](rsp.content)) + + case status => + Future.exception(UnexpectedResponse(req.method, req.uri, Nil, status, Etcd.State(0))) + } + } + } + + def key(k: Path): Key = new Key(k, client) + def key(k: String): Key = key(Path.read(k)) + + def close(deadline: Time) = client.close() +} diff --git a/etcd/src/main/scala/io/buoyant/etcd/Key.scala b/etcd/src/main/scala/io/buoyant/etcd/Key.scala new file mode 100644 index 0000000000..d166a5d71d --- /dev/null +++ b/etcd/src/main/scala/io/buoyant/etcd/Key.scala @@ -0,0 +1,284 @@ +package io.buoyant.etcd + +import com.twitter.conversions.time._ +import com.twitter.finagle.{Path, Service} +import com.twitter.finagle.http._ +import com.twitter.finagle.service.Backoff +import com.twitter.io.Buf +import com.twitter.util._ + +case class BackoffsExhausted(key: Path, throwable: Throwable) + extends Exception(key.show, throwable) + +object Key { + + /* + * Helpers for building request params + */ + + private val Params = Seq.empty[(String, String)] + + private def trueParam(name: String, cond: Boolean): Option[(String, String)] = + if (cond) Some(name -> "true") else None + + private def falseParam(name: String, cond: Boolean): Option[(String, String)] = + if (!cond) Some(name -> "false") else None + + private[this] val dirParam = ("dir" -> "true") + private def dirOrValueParam(v: Option[Buf]): (String, String) = v match { + case None => dirParam + case Some(buf) => + val Buf.Utf8(v) = buf + ("value" -> v) + } + + private def ttlParam(ttl: Option[Duration]): Option[(String, String)] = ttl.map { + case Duration.Zero => "ttl" -> "" + case ttl => "ttl" -> ttl.inSeconds.toString + } + +} + +class Key(key: Path, client: Service[Request, Response]) { + + import Etcd._ + import Key._ + + def path: Path = key + + def key(path: Path): Key = new Key(key ++ path, client) + def key(name: String): Key = key(Path.read(name)) + + private[this] lazy val uriPath = keysPrefixPath ++ key + + /** + * Set the contents of a key. + * + * If `recursive` is true and this key is a directory, the returned + * Node contains the entire tree of children. + * + * If `wait` is true and `waitIndex` is specified, a response is not + * received until the node (or one of its children, if `recursive` + * is specified) is updated. If `wait` is true and `waitIndex` is + * not specified, a response is not received until the next update + * to this node (or its children, if `recursive`). + * + * If `quorum` is specified, etcd will ensure that the etcd instance + * is at quorum with the cluster. + */ + def get( + recursive: Boolean = false, + wait: Boolean = false, + waitIndex: Option[Long] = None, + quorum: Boolean = false + ): Future[NodeOp] = { + val params = Params ++ + trueParam("recursive", recursive) ++ + trueParam("quorum", quorum) ++ + trueParam("wait", wait) ++ + waitIndex.map("waitIndex" -> _.toString) + val req = mkReq(uriPath, params = params) + req.accept = MediaType.Json + client(req).flatMap { rsp => Future.const(NodeOp.mk(req, rsp, key, params)) } + } + + /** + * Set the contents of a key. + * + * If value is None, the key is treated as a directory. In order to + * unset the value of a data node, use `Some(Buf.Empty)`. + * + * Optionally, a `ttl` may be specified to inform etcd to remove the + * node after some time period (only second-granularity is supported + * by etcd). + * + * If `prevExist` is true, the node operation will fail if the node + * does not already exist. + */ + def set( + value: Option[Buf], + ttl: Option[Duration] = None, + prevExist: Boolean = false + ): Future[NodeOp] = { + val params = Params ++ + ttlParam(ttl) ++ + trueParam("prevExist", prevExist) :+ + dirOrValueParam(value) + val req = mkReq(uriPath, Method.Put, params) + client(req).flatMap { rsp => Future.const(NodeOp.mk(req, rsp, key, params)) } + } + + /** + * Create a new key. + * + * If the key exists, an error is returned. + * + * If value is None, the key is treated as a directory. In order to + * create an empty data node, use `Some(Buf.Empty)`. + * + * Optionally, a `ttl` may be specified to inform etcd to remove the + * node after some time period (only second-granularity is supported + * by etcd). + */ + def create( + value: Option[Buf], + ttl: Option[Duration] = None + ): Future[NodeOp] = { + val params = Params ++ + ttlParam(ttl) :+ + dirOrValueParam(value) + val req = mkReq(uriPath, Method.Post, params) + client(req).flatMap { rsp => Future.const(NodeOp.mk(req, rsp, key, params)) } + } + + /** + * Set the node's data if the provided preconditions apply to the + * existing state of the node. + * + * If `prevIndex` is specified, the current node must have the + * provided `index` value. + * + * If `prevValue` is specified, the current node must have the + * provided value. + * + * If `prevExist` is false, the node is not required to exist. + */ + def compareAndSwap( + value: Buf, + prevIndex: Option[Long] = None, + prevValue: Option[Buf] = None, + prevExist: Boolean = true + ): Future[NodeOp] = { + require(prevIndex.isDefined || prevValue.isDefined || !prevExist) + + val Buf.Utf8(vstr) = value + val params = Seq("value" -> vstr) ++ + prevIndex.map("prevIndex" -> _.toString) ++ + prevValue.map { case Buf.Utf8(v) => "prevValue" -> v } ++ + falseParam("prevExist", prevExist) + + val req = mkReq(uriPath, Method.Put, params) + client(req).flatMap { rsp => Future.const(NodeOp.mk(req, rsp, key, params)) } + } + + /** + * Delete a node. + * + * If `dir` is not true and the key is a directory, this will + * operation fail. + * + * If `dir` and `recursive` are true, the entire tree is deleted. + */ + def delete( + dir: Boolean = false, + recursive: Boolean = false + ): Future[NodeOp] = { + val params = Params ++ + trueParam("dir", dir) ++ + trueParam("recursive", recursive) + val req = mkReq(uriPath, Method.Delete, params) + client(req).flatMap { rsp => Future.const(NodeOp.mk(req, rsp, key, params)) } + } + + /** Find the greatest index referenced in a tree of nodes. */ + private[this] def getIndex(node: Node): Long = node match { + case Node.Data(_, idx, _, _, _) => idx + case Node.Dir(_, idx, _, _, nodes) => nodes.foldLeft(idx)(_ max getIndex(_)) + } + + /** + * An Event constructed by watching an etcd key. + * + * If `recursive` is true, the key's subtree is observed for + * changes. + * + * When an unexpected error is encountered communicating with the + * API, the failure is published on the Event and the `backoff` + * stream is used to compute the time to wait before retrying. If + * the `backoff` stream is exhausted or a fatal error is + * encountered, it is reported and polling stops. + * + * The Event is not reference-counted, so each observer initiates + * its own polling loop. This ensures that the initial state of a + * tree is reported properly. + */ + def events( + recursive: Boolean = false, + backoff: Stream[Duration] = Backoff.exponentialJittered(10.millis, 10.minutes) + ): Event[Try[NodeOp]] = new Event[Try[NodeOp]] { + private[this] val origBackoff = backoff + + def register(witness: Witness[Try[NodeOp]]) = { + @volatile var closed = false + + def loop(idx: Option[Long], backoff: Stream[Duration]): Future[Unit] = + if (!closed) { + get(recursive, wait = idx.isDefined, waitIndex = idx).transform { + case note@Return(op) => + witness.notify(note) + loop(Some(getIndex(op.node) + 1), origBackoff) + + case note@Throw(ApiError(ApiError.KeyNotFound, _, _, idx)) => + witness.notify(note) + loop(Some(idx + 1), origBackoff) + + case note@Throw(ApiError(ApiError.EventIndexCleared, _, _, idx)) => + loop(Some(idx + 1), origBackoff) + + case note@Throw(NonFatal(e)) => + backoff match { + case wait #:: backoff => + witness.notify(note) + loop(None, backoff) + + case _ => + witness.notify(Throw(BackoffsExhausted(key, e))) + Future.Unit + } + + case note@Throw(_) => + witness.notify(note) + Future.Unit + } + } else Future.Unit + + val pending = loop(None, origBackoff) + + Closable.make { _ => + closed = true + pending.raise(new FutureCancelledException) + Future.Unit + } + } + + } + + // TODO: removed until a concrete use for this has been identified + // to determine whether this makes sense. + /* + * Watch a single node for updates. + * + * Observation of the returned Activity is reference-counted so that + * multiple concurrent observers will not incur unnecessary requests + * against the API. + */ + // def watch(backoff: Stream[Duration] = Stream.empty): Activity[NodeOp] = { + // val event = events(false, backoff) + // val states = Var.async[Activity.State[NodeOp]](Activity.Pending) { state => + // event.respond { op => + // state() = op match { + // case Return(op) => Activity.Ok(op) + // case Throw(e) => Activity.Failed(e) + // } + // } + // } + // Activity(states) + // } + + // TODO + // + // Slightly more complicated since it requires aggregating/updating + // state across updates. + // + // def watchTree(backoff: Stream[Duration] = Stream.empty) +} diff --git a/etcd/src/main/scala/io/buoyant/etcd/Node.scala b/etcd/src/main/scala/io/buoyant/etcd/Node.scala new file mode 100644 index 0000000000..ec5c6cf2d6 --- /dev/null +++ b/etcd/src/main/scala/io/buoyant/etcd/Node.scala @@ -0,0 +1,104 @@ +package io.buoyant.etcd + +import com.twitter.conversions.time._ +import com.twitter.finagle.Path +import com.twitter.io.Buf +import com.twitter.util.{Duration, Return, Throw, Time, Try} +import org.joda.time.DateTime +import org.joda.time.format.ISODateTimeFormat + +sealed trait Node { + def key: Path + def modifiedIndex: Long + def createdIndex: Long + def lease: Option[Node.Lease] +} + +object Node { + case class Lease(expiration: Time, ttl: Duration) + + case class Dir( + key: Path, + modifiedIndex: Long, + createdIndex: Long, + lease: Option[Lease] = None, + nodes: Seq[Node] = Seq.empty + ) extends Node { + def :+(node: Node): Dir = + copy(nodes = nodes :+ node) + } + + case class Data( + key: Path, + modifiedIndex: Long, + createdIndex: Long, + lease: Option[Lease] = None, + value: Buf = Buf.Empty + ) extends Node + + /** + * Representation of a Node, as returned from etcd. + */ + private[etcd] case class Json( + key: String, + modifiedIndex: Long, + createdIndex: Long, + dir: Boolean = false, + value: Option[String] = None, + nodes: Option[Seq[Json]] = None, + expiration: Option[String] = None, + ttl: Option[Int] = None + ) { + + def toLease: Try[Option[Lease]] = + for { + expiration <- Try { + expiration.map { e => + val ms = ISODateTimeFormat.dateTime.parseDateTime(e).getMillis + Time.fromMilliseconds(ms) + } + } + ttl <- Try(ttl.map(_.seconds)) + } yield for { + e <- expiration + t <- ttl + } yield Lease(e, t) + + def toNode: Try[Node] = { + val k = if (key == null || key == "") "/" else key + Try(Path.read(key)).flatMap { key => + toLease.flatMap { lease => + if (dir) { + val init = Dir(key, modifiedIndex, createdIndex, lease) + nodes.getOrElse(Seq.empty).foldLeft[Try[Dir]](Return(init)) { + case (e@Throw(_), _) => e + case (Return(tree), n) => n.toNode.map(tree :+ _) + } + } else { + val buf = value.map(Buf.Utf8(_)) getOrElse Buf.Empty + Return(Data(key, modifiedIndex, createdIndex, lease, buf)) + } + } + } + } + + } + + private[etcd] object Json { + + private[this] def toIsoDate(t: Time): String = + ISODateTimeFormat.dateTime.print(t.inMillis) + + def apply(node: Node): Json = node match { + case Data(key, modified, created, lease, Buf.Utf8(value)) => + Json(key.show, modified, created, false, Some(value), None, + lease.map { l => toIsoDate(l.expiration) }, + lease.map(_.ttl.inSeconds)) + + case Dir(key, modified, created, lease, nodes) => + Json(key.show, modified, created, true, None, Some(nodes.map(Json(_))), + lease.map { l => toIsoDate(l.expiration) }, + lease.map(_.ttl.inSeconds)) + } + } +} diff --git a/etcd/src/main/scala/io/buoyant/etcd/NodeOp.scala b/etcd/src/main/scala/io/buoyant/etcd/NodeOp.scala new file mode 100644 index 0000000000..a15c274207 --- /dev/null +++ b/etcd/src/main/scala/io/buoyant/etcd/NodeOp.scala @@ -0,0 +1,111 @@ +package io.buoyant.etcd + +import com.twitter.finagle.Path +import com.twitter.finagle.http._ +import com.twitter.util._ + +case class NodeOp( + action: NodeOp.Action, + node: Node, + etcd: Etcd.State, + prevNode: Option[Node] = None +) + +object NodeOp { + + sealed abstract class Action(val name: String) { + override def toString = name + } + + object Action { + + object Create extends Action("create") + object CompareAndSwap extends Action("compareAndSwap") + object CompareAndDelete extends Action("compareAndDelete") + object Delete extends Action("delete") + object Expire extends Action("expire") + object Get extends Action("get") + object Set extends Action("set") + object Update extends Action("update") + object Watch extends Action("watch") + + val All: Seq[Action] = Seq( + Create, + CompareAndSwap, + CompareAndDelete, + Delete, + Expire, + Get, + Set, + Update, + Watch + ) + + val ByName: Map[String, Action] = + All.map(a => a.name -> a).toMap + + case class Invalid(name: String) + extends Exception(s"Invalid action: $name") + + def mk(name: String): Try[Action] = + Try.orThrow(ByName.get(name)) { () => Invalid(name) } + } + + private[etcd] def mk( + req: Request, + rsp: Response, + key: Path, + params: Seq[(String, String)] = Seq.empty + ): Try[NodeOp] = { + val state = Etcd.State.mk(rsp) + (req.method, rsp.status) match { + case (Method.Get | Method.Head | Method.Delete, Status.Ok) => + Etcd.readJson[Json](rsp.content).flatMap(_.toNodeOp(state)) + + case (Method.Put | Method.Post, Status.Created) => + Etcd.readJson[Json](rsp.content).flatMap(_.toNodeOp(state)) + + case (method, status) => + Etcd.readJson[ApiError](rsp.content).transform { + case Return(ae) => Throw(ae) + case Throw(_) => Throw(UnexpectedResponse(method, req.uri, params, status, state)) + } + } + } + + /** + * Representation of a Node operation, as returned from etcd. + */ + private[etcd] case class Json( + action: String, + node: Option[Node.Json] = None, + prevNode: Option[Node.Json] = None + ) { + + def toNodeOp(etcd: Etcd.State): Try[NodeOp] = + Action.mk(action).flatMap { action => + node match { + case None => + Throw(new Exception("node not specified")) + + case Some(node) => + node.toNode match { + case Throw(e) => Throw(e) + + case Return(node) => + val prev = prevNode match { + case None => Return(None) + case Some(prev) => prev.toNode.map(Some(_)) + } + prev.map(NodeOp(action, node, etcd, _)) + } + } + } + } + + private[etcd] object Json { + def apply(op: NodeOp): Json = + Json(op.action.name, Some(Node.Json(op.node)), op.prevNode.map(Node.Json(_))) + } + +} diff --git a/etcd/src/test/scala/io/buoyant/etcd/KeyTest.scala b/etcd/src/test/scala/io/buoyant/etcd/KeyTest.scala new file mode 100644 index 0000000000..8d93577ecc --- /dev/null +++ b/etcd/src/test/scala/io/buoyant/etcd/KeyTest.scala @@ -0,0 +1,441 @@ +package io.buoyant.etcd + +import com.twitter.conversions.time._ +import com.twitter.finagle.{Filter, Path, Service} +import com.twitter.finagle.buoyant.FormParams +import com.twitter.finagle.http.{Message, Method, Request, Response, Status} +import com.twitter.io.Buf +import com.twitter.util.{Events => _, _} +import io.buoyant.test.Events +import org.scalatest._ + +class KeyTest extends FunSuite with ParallelTestExecution { + + private[this]type Params = Map[String, Seq[String]] + + private[this] def getParam(params: Params, param: String): Option[String] = + params.get(param).flatMap(_.headOption) + + def writeJson[T](t: T): Buf = Buf.ByteArray.Owned(Etcd.mapper.writeValueAsBytes(t)) + + /* + * Mock the etcd server using the provided (simplified) request handler. + * If the handler returns a NodeOp, it is properly encoded into the response. + * If it returns a Version, it is encoded. + * If it returns a Future, the value is treated as described above once the future is satisfied. + */ + private[this] def mkClient(handle: PartialFunction[(Method, Path, Map[String, Seq[String]]), Any]) = + Service.mk[Request, Response] { req => + val (path, params) = req.method match { + case Method.Post | Method.Put => + val path = Path.read(req.uri) + val params = FormParams.get(req) + (path, params) + + case _ => + val path = Path.read(req.path) + val params = req.params.keys.foldLeft(Map.empty[String, Seq[String]]) { + case (params, k) => params + (k -> req.params.getAll(k).toSeq) + } + (path, params) + } + + val rsp = Response() + rsp.version = req.version + + val k = (req.method, path, params) + if (handle.isDefinedAt(k)) { + Future(handle(k)).flatMap(serve(req.method, rsp, _)) handle { + case e@ApiError(code, _, _, index) => + rsp.status = code match { + case ApiError.KeyNotFound => Status.NotFound + case ApiError.NodeExist => Status.Forbidden + case _ => Status.BadRequest + } + addState(Etcd.State(index), rsp) + rsp.content = writeJson(e) + rsp + } + } else { + rsp.status = Status.InternalServerError + info(s"method=${req.method} path=$path params=$params") + Future.value(rsp) + } + } + + private[this] def mkEtcd( + filter: Filter[Request, Response, Request, Response] = Filter.identity + )(handle: PartialFunction[(Method, Path, Params), Any]) = + new Etcd(filter andThen mkClient(handle)) + + private[this] def mkKey( + key: Path, + filter: Filter[Request, Response, Request, Response] = Filter.identity + )(handle: PartialFunction[(Method, Params), Any]): Key = { + val uri = Path.Utf8("v2", "keys") ++ key + val etcd = mkEtcd(filter) { + case (method, `uri`, params) if handle.isDefinedAt(method, params) => + handle((method, params)) + } + etcd.key(key) + } + + private[this] def serve(method: Method, rsp: Response, v: Any): Future[Response] = + v match { + case f: Future[_] => + f.flatMap(serve(method, rsp, _)) + + case Some(v) => + serve(method, rsp, v) + + case op: NodeOp => + rsp.content = writeJson(NodeOp.Json(op)) + addState(op.etcd, rsp) + rsp.status = (rsp.status, method) match { + case (Status.Ok, Method.Put | Method.Post) => Status.Created + case (status, _) => status + } + Future.value(rsp) + + case v: Version => + rsp.content = writeJson(v) + Future.value(rsp) + + case idk => + rsp.status = Status.InternalServerError + rsp.contentString = idk.toString + Future.value(rsp) + } + + private[this] def addState(etcd: Etcd.State, msg: Message): Unit = { + msg.headerMap("X-Etcd-Index") = etcd.index.toString + msg.headerMap("X-Etcd-Cluster-Id") = etcd.clusterId + } + + private[this] object Recursive { + def unapply(params: Params): Option[Boolean] = + Some(getParam(params, "recursive").exists(_ == "true")) + } + private[this] object DirParam { + def unapply(params: Params): Option[Boolean] = + Some(getParam(params, "dir").exists(_ == "true")) + } + private[this] object DirRecursive { + def unapply(params: Params): Option[(Boolean, Boolean)] = { + val DirParam(dir) = params + val Recursive(recursive) = params + Some((dir, recursive)) + } + } + + private[this] object Watch { + def unapply(params: Params): Option[(Boolean, Option[Int])] = { + val Recursive(recursive) = params + val idx = getParam(params, "wait").find(_ == "true") flatMap { _ => + getParam(params, "waitIndex") flatMap { waitIndex => + Try(waitIndex.toInt).toOption + } + } + Some((recursive, idx)) + } + } + + test("Etcd.version") { + val uri = Path.Utf8("version") + val version = Version("a.b.c.d", "a") + val etcd = mkEtcd() { case (Method.Get, `uri`, _) => version } + val v = etcd.version() + assert(Await.result(v, 250.millis) == version) + } + + test("Key.delete") { + val path = Path.read("/some/test/path") + val op = NodeOp( + NodeOp.Action.Delete, + Node.Data(path, 124, 100), + Etcd.State(124), + Some(Node.Data(path, 123, 100, value = Buf.Utf8("I like dogs"))) + ) + val key = mkKey(path) { case (Method.Delete, _) => op } + val del = key.delete() + assert(Await.result(del, 250.millis) == op) + } + + test("Key.delete: dir") { + val path = Path.read("/some/test/path") + val op = NodeOp( + NodeOp.Action.Delete, + Node.Dir(path, 124, 100), + Etcd.State(124), + Some(Node.Dir(path, 123, 100)) + ) + + val key = mkKey(path) { case (Method.Delete, DirParam(true)) => op } + val del = key.delete(dir = true) + assert(Await.result(del, 250.millis) == op) + } + + test("Key.delete: tree") { + val path = Path.read("/some/test/path") + val op = NodeOp( + NodeOp.Action.Delete, + Node.Dir(path, 124, 100), + Etcd.State(124), + Some(Node.Dir(path, 123, 100, nodes = Seq( + Node.Data(path ++ Path.Utf8("child"), 123, 123) + ))) + ) + + val key = mkKey(path) { case (Method.Delete, DirRecursive(true, true)) => op } + val del = key.delete(dir = true, recursive = true) + assert(Await.result(del, 250.millis) == op) + } + + test("Key.get: data") { + val data = Node.Data(Path.read("/some/test/path"), 123, 100, value = Buf.Utf8("I like dogs")) + val op = NodeOp(NodeOp.Action.Get, data, Etcd.State(123)) + val key = mkKey(op.node.key) { case (Method.Get, _) => op } + assert(Await.result(key.get(), 250.millis) == op) + } + + test("Key.get: quorum") { + val data = Node.Data(Path.read("/some/test/path"), 123, 100, value = Buf.Utf8("I like dogs")) + val op = NodeOp(NodeOp.Action.Get, data, Etcd.State(123)) + val key = mkKey(op.node.key) { + case (Method.Get, params) if getParam(params, "quorum").exists(_ == "true") => op + } + val get = key.get(quorum = true) + assert(Await.result(get, 250.millis) == op) + } + + test("Key.get: dir") { + val op = NodeOp( + NodeOp.Action.Get, + Node.Dir(Path.read("/some"), 123, 100, nodes = Seq( + Node.Dir(Path.read("/some/test"), 123, 100), + Node.Data(Path.read("/some/data"), 111, 111) + )), + Etcd.State(123) + ) + + val key = mkKey(op.node.key) { case (Method.Get, _) => op } + val get = key.get() + assert(Await.result(get, 250.millis) == op) + } + + test("Key.get: dir: recursive") { + val op = NodeOp( + NodeOp.Action.Get, + Node.Dir(Path.read("/some"), 123, 100, nodes = Seq( + Node.Dir(Path.read("/some/test"), 123, 100, nodes = Seq( + Node.Data(Path.read("/some/test/path"), 123, 100) + )), + Node.Data(Path.read("/some/data"), 111, 111) + )), + Etcd.State(123) + ) + + val key = mkKey(op.node.key) { + case (Method.Get, Recursive(true)) => op + } + val get = key.get(recursive = true) + assert(Await.result(get, 250.millis) == op) + } + + test("Key.set: data") { + val op = NodeOp(NodeOp.Action.Set, Node.Data(Path.Utf8("k"), 1, 1, value = Buf.Utf8("v")), Etcd.State(1)) + val key = mkKey(op.node.key) { + case (Method.Put, params) if getParam(params, "value").exists(_ == "v") => op + } + val set = key.set(Some(Buf.Utf8("v"))) + assert(Await.result(set, 250.millis) == op) + } + + test("Key.set: data ttl") { + val op = NodeOp( + NodeOp.Action.Set, + Node.Data(Path.Utf8("k"), 1, 1, value = Buf.Utf8("v")), Etcd.State(1) + ) + val ttl = 17.minutes + val key = mkKey(op.node.key) { + case (Method.Put, params) => + assert(getParam(params, "value") == Some("v")) + assert(getParam(params, "ttl") == Some(ttl.inSeconds.toString)) + assert(getParam(params, "prevExist") == Some("true")) + op + } + val set = key.set( + Some(Buf.Utf8("v")), + ttl = Some(ttl), + prevExist = true + ) + assert(Await.result(set, 250.millis) == op) + } + + test("Key.set: dir") { + val op = NodeOp(NodeOp.Action.Set, Node.Dir(Path.Utf8("dir"), 1, 1), Etcd.State(1)) + val key = mkKey(op.node.key) { + case (Method.Put, DirParam(true)) => op + } + val set = key.set(None) + assert(Await.result(set, 250.millis) == op) + } + + test("Key.compareAndSwap: fails requirement") { + val key = mkKey(Path.Utf8("caskey")) { + case (_, _) => + Future(fail("should not have called the web service")) + } + intercept[IllegalArgumentException] { + key.compareAndSwap(Buf.Utf8("newval")) + } + } + + test("Key.compareAndSwap: prevIndex") { + val path = Path.Utf8("caskey") + val op = NodeOp( + NodeOp.Action.CompareAndSwap, + Node.Data(path, 124, 123, None, Buf.Utf8("newval")), + Etcd.State(123), + Some(Node.Data(path, 123, 123, None, Buf.Utf8("oldval"))) + ) + val params = Map( + "prevIndex" -> Seq("123"), + "value" -> Seq("newval") + ) + val key = mkKey(path) { case (Method.Put, `params`) => op } + val cas = key.compareAndSwap(Buf.Utf8("newval"), prevIndex = Some(123)) + val casOp = Await.result(cas, 250.millis) + assert(casOp == op) + } + + test("Key.compareAndSwap: prevValue") { + val path = Path.Utf8("caskey") + val op = NodeOp( + NodeOp.Action.CompareAndSwap, + Node.Data(path, 124, 123, None, Buf.Utf8("newval")), + Etcd.State(123), + Some(Node.Data(path, 123, 123, None, Buf.Utf8("oldval"))) + ) + val params = Map( + "prevValue" -> Seq("oldval"), + "value" -> Seq("newval") + ) + val key = mkKey(path) { case (Method.Put, `params`) => op } + val cas = key.compareAndSwap(Buf.Utf8("newval"), prevValue = Some(Buf.Utf8("oldval"))) + val casOp = Await.result(cas, 250.millis) + assert(casOp == op) + } + + test("Key.compareAndSwap: prevExist=false") { + val path = Path.Utf8("caskey") + val op = NodeOp( + NodeOp.Action.CompareAndSwap, + Node.Data(path, 124, 123, None, Buf.Utf8("newval")), + Etcd.State(123), + Some(Node.Data(path, 123, 123, None, Buf.Utf8("oldval"))) + ) + val params = Map( + "prevExist" -> Seq("false"), + "value" -> Seq("newval") + ) + val key = mkKey(path) { case (Method.Put, `params`) => op } + val cas = key.compareAndSwap(Buf.Utf8("newval"), prevExist = false) + val casOp = Await.result(cas, 250.millis) + assert(casOp == op) + } + + test("Key.create: data") { + val base = Path.Utf8("base") + val op = NodeOp(NodeOp.Action.Create, Node.Data(Path.Utf8("base", "1"), 1, 1, None, Buf.Utf8("dogs")), Etcd.State(1)) + val key = mkKey(base) { + case (Method.Post, params) if getParam(params, "value").exists(_ == "dogs") => op + } + val create = key.create(Some(Buf.Utf8("dogs"))) + assert(Await.result(create, 250.millis) == op) + } + + test("Key.create: dir") { + val base = Path.Utf8("base") + val op = NodeOp(NodeOp.Action.Create, Node.Dir(Path.Utf8("base", "1"), 1, 1), Etcd.State(1)) + val key = mkKey(base) { + case (Method.Post, DirParam(true)) => op + } + val create = key.create(None) + assert(Await.result(create, 250.millis) == op) + } + + test("Key.create: ttl") { + val base = Path.Utf8("base") + val ttl = 10.seconds + val op = NodeOp( + NodeOp.Action.Create, + Node.Dir(Path.Utf8("base", "1"), 1, 1, Some(Node.Lease(ttl.fromNow, ttl))), + Etcd.State(1) + ) + val key = mkKey(base) { + case (Method.Post, params) if getParam(params, "ttl").exists(_ == "10") => op + } + val create = key.create(None, Some(10.seconds)) + assert(Await.result(create, 250.millis) == op) + } + + test("Key.events: recursively") { + val base = Path.read("/base") + val newKey = base ++ Path.Utf8("1") + + val ops = Seq( + NodeOp( + NodeOp.Action.Get, + Node.Dir(base, 0, 0, None, Seq(Node.Data(base ++ Path.Utf8("bah"), 0, 0))), + Etcd.State(0) + ), + NodeOp( + NodeOp.Action.Create, + Node.Dir(newKey, 1, 0, Some(Node.Lease(30.seconds.fromNow, 30.seconds))), + Etcd.State(0) + ), + NodeOp( + NodeOp.Action.Expire, + Node.Dir(newKey, 2, 0, None), + Etcd.State(0), + Some(Node.Dir(newKey, 1, 0, None)) + ) + ) + val responses = (0 to ops.size).map(_ => new Promise[NodeOp]) + val requested = (0 until responses.size).map(_ => new Promise[Unit]) + + @volatile var currentIndex = 0L + val key = mkKey(base) { + case (Method.Get, Watch(true, None)) if currentIndex < responses.length => + val idx = currentIndex.toInt + requested(idx).setDone() + responses(idx) + + case (Method.Get, Watch(true, Some(idx))) if idx < responses.length => + requested(idx).setDone() + responses(idx) + } + + val before = Events.take(4, key.events(recursive = true)) + val after = ops.zip(responses).zip(requested).zipWithIndex.foldLeft(before) { + case (Events.None(), _) => + fail("events underflow") + + case (events, (((op, response), requested), idx)) => + currentIndex = op.etcd.index + assert(requested.isDefined) + response.setValue(op) + val (state, nextEvents) = Await.result(events.next(), 250.millis) + assert(state == Return(op)) + nextEvents + } + + // closes properly + assert(after.size == 1) + assert(requested.last.isDefined) + assert(!responses.last.isDefined) + Await.result(after.close(), 250.millis) + assert(responses.last.isInterrupted != None) + } +} diff --git a/project/Base.scala b/project/Base.scala index feb19bc425..0db77f668e 100644 --- a/project/Base.scala +++ b/project/Base.scala @@ -178,7 +178,8 @@ class Base extends Build { /** Enables e2e test config for a project with basic dependencies */ def withE2e(): Project = project - .configs(EndToEndTest).settings(inConfig(EndToEndTest)(Defaults.testSettings)) + .configs(EndToEndTest) + .settings(inConfig(EndToEndTest)(Defaults.testSettings)) .dependsOn(testUtil % EndToEndTest) def withExamples(runtime: Project, configs: Seq[(Configuration, Configuration)]): Project = { @@ -193,9 +194,7 @@ class Base extends Build { def withIntegration(): Project = project .configs(IntegrationTest) - .settings(inConfig(IntegrationTest)( - Defaults.testSettings :+ (parallelExecution := false) - )) + .settings(inConfig(IntegrationTest)(Defaults.testSettings :+ (parallelExecution := false))) .dependsOn(testUtil % IntegrationTest) /** Writes build metadata into the projects resources */ diff --git a/project/Deps.scala b/project/Deps.scala index 06c865f700..c2dc6a2760 100644 --- a/project/Deps.scala +++ b/project/Deps.scala @@ -30,6 +30,12 @@ object Deps { val jacksonYaml = "com.fasterxml.jackson.dataformat" % "jackson-dataformat-yaml" % jacksonVersion + // parses a variety of timestamp formats (like RFC3339) + val jodaTime = Seq( + "joda-time" % "joda-time" % "2.7", + "org.joda" % "joda-convert" % "1.7" + ) + // testing. duh. val scalatest = "org.scalatest" %% "scalatest" % "2.2.4" } diff --git a/project/LinkerdBuild.scala b/project/LinkerdBuild.scala index 9a035394b0..6b1731e255 100644 --- a/project/LinkerdBuild.scala +++ b/project/LinkerdBuild.scala @@ -5,30 +5,22 @@ import sbtdocker.DockerKeys._ import sbtunidoc.Plugin._ import pl.project13.scala.sbt.JmhPlugin -/** - * Project layout. - * - * - consul/ -- consul client - * - k8s/ -- finagle kubernetes client - * - marathon/ -- marathon client - * - router/ -- finagle router libraries - * - namer/ -- name resolution - * - config/ -- configuration utilities - * - linkerd/ -- linkerd runtime and modules - * - namerd/ -- namerd runtime and modules - * - test-util/ -- async test helpers; provided by [[Base]] - */ object LinkerdBuild extends Base { val Minimal = config("minimal") val Bundle = config("bundle") extend Minimal - val k8s = projectDir("k8s") + val consul = projectDir("consul") .withTwitterLib(Deps.finagle("http")) .withLibs(Deps.jackson) .withTests() - val consul = projectDir("consul") + val etcd = projectDir("etcd") + .withTwitterLib(Deps.finagle("http")) + .withLibs(Deps.jackson ++ Deps.jodaTime) + .withTests().withIntegration() + + val k8s = projectDir("k8s") .withTwitterLib(Deps.finagle("http")) .withLibs(Deps.jackson) .withTests() @@ -529,10 +521,12 @@ object LinkerdBuild extends Base { // Unified documentation via the sbt-unidoc plugin val all = project("all", file(".")) + .settings(unidocSettings) .aggregate( admin, configCore, consul, + etcd, k8s, marathon, testUtil, @@ -542,5 +536,4 @@ object LinkerdBuild extends Base { Namerd.all, Router.all ) - .settings(unidocSettings) }