Skip to content

Commit

Permalink
Introduce an etcd client (#303)
Browse files Browse the repository at this point in the history
* Introduce an etcd client

io.buoyant.etcd provides a client library over etcd's HTTP api. This will
facilitate #301 and #302.
  • Loading branch information
olix0r committed Apr 29, 2016
1 parent 8995e9b commit a944f2a
Show file tree
Hide file tree
Showing 12 changed files with 1,386 additions and 20 deletions.
17 changes: 17 additions & 0 deletions etcd/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Finagle Etcd client #

## `io.buoyant.etcd` ##

The `io.buoyant.etcd` package contains a generically useful Etcd
client built on finagle-http. The client supports a variety of
asynchronous `key` operations on etcd. It goes beyond the basic etcd
operations to provide a `watch` utility that models a key's (or
tree's) state as an `com.twitter.util.Activity`.

### TODO ###

- `compareAndDelete` isn't yet implemented. minor oversight.
- Use residual paths on the client to instrument chrooted key
operations
- Admin/Stats APIs
- move to gRPC api
194 changes: 194 additions & 0 deletions etcd/src/integration/scala/io/buoyant/etcd/EtcdIntegrationTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package io.buoyant.etcd

import com.twitter.conversions.time._
import com.twitter.finagle.{Http, Path}
import com.twitter.finagle.util.DefaultTimer
import com.twitter.io.Buf
import com.twitter.util.{Events => _, _}
import io.buoyant.test.{Awaits, Events}
import java.io.File
import java.util.UUID
import org.scalatest.BeforeAndAfterAll
import org.scalatest.fixture.FunSuite
import scala.sys.process.{Process, ProcessLogger}
import scala.util.Random

/**
* Etcd client integration tests.
*
* Boots an etcd instance on a random local port.
*/
class EtcdIntegrationTest extends FunSuite with Awaits with BeforeAndAfterAll {

lazy val devNull = new File("/dev/null")
lazy val etcdDir = s"/tmp/io.buoyant.etcd-${UUID.randomUUID.toString}"
def randomPort = 32000 + (Random.nextDouble * 30000).toInt
lazy val etcdPort = randomPort
lazy val etcdUrl = s"http://127.0.0.1:$etcdPort"
lazy val etcdPeerUrl = s"http://127.0.0.1:$randomPort"
lazy val etcdCmd = Seq(
"etcd",
"--data-dir", etcdDir,
"--listen-client-urls", etcdUrl,
"--advertise-client-urls", etcdUrl,
"--listen-peer-urls", etcdPeerUrl,
"--initial-advertise-peer-urls", etcdPeerUrl,
"--initial-cluster", s"default=$etcdPeerUrl",
"--force-new-cluster"
)

var process: Process = _

override def beforeAll: Unit = {
val which = Process(Seq("which", "etcd")).run(ProcessLogger(devNull))
which.exitValue match {
case 0 =>
info(s"""${etcdCmd mkString " "}""")
try {
process = Process(etcdCmd).run(ProcessLogger(devNull))
} catch {
case e: Exception => fail(s"etcd failed to start: ${e.getMessage}")
}
Thread.sleep(5000) // give some time to initialize

case _ => cancel("etcd not on the PATH")
}
}

override def afterAll: Unit = {
if (process != null) {
process.destroy()
}
Process(Seq("rm", "-rf", etcdDir)).!
}

def serverName = s"/$$/inet/127.1/$etcdPort"

private[this] implicit val timer = DefaultTimer.twitter
override def defaultWait = 1.second

type FixtureParam = Etcd

def withFixture(test: OneArgTest) = {
val client = Http.newService(serverName)
try withFixture(test.toNoArgTest(new Etcd(client)))
finally await { client.close() }
}


test("version") { etcd =>
val version = await { etcd.version() }

assert(version.etcdserver.length > 0)
info(s"release: ${version.etcdserver}")

assert(version.etcdcluster.length > 0)
info(s"internal: ${version.etcdcluster}")
}

val rootPath = Path.Utf8("it")

test("create and delete data node") { etcd =>
val key = etcd.key(rootPath)
val value = UUID.randomUUID().toString

val createOp = await { key.create(Some(Buf.Utf8(value))) }
assert(createOp.action == NodeOp.Action.Create)
createOp.node match {
case createData@Node.Data(createKey, createModified, createCreated, None, createVal) =>
assert(createKey.take(1) == key.path)
assert(createModified == createCreated)
assert(createModified == createOp.etcd.index)
assert(createVal == Buf.Utf8(value))
info("create data")

val delOp = await { etcd.key(createKey).delete() }
assert(delOp.action == NodeOp.Action.Delete)
delOp.node match {
case Node.Data(delKey, delModified, delCreated, None, delVal) =>
assert(delKey == createKey)
assert(delModified > createModified)
assert(delCreated == createCreated)
assert(delVal == Buf.Empty)

case node =>
fail(s"expected deleted data node, found $node")
}
assert(createOp.etcd.index < delOp.etcd.index)
assert(createOp.etcd.clusterId == delOp.etcd.clusterId)

delOp.prevNode match {
case Some(Node.Data(prevKey, prevModified, prevCreated, None, prevVal)) =>
assert(prevKey == createKey)
assert(prevModified == createModified)
assert(prevCreated == createCreated)
assert(prevVal == Buf.Utf8(value))
case node =>
fail(s"expected previous data node, found $node")
}
info("deleted key")

case node =>
fail(s"expected set data node, found $node")
}
}

test("get(wait), set(ttl), get(wait, idx), expire") { etcd =>
val key = etcd.key(Path.Utf8(UUID.randomUUID().toString))

info("waiting for data")
val wait = key.get(wait = true)

assert(!wait.isDefined)

val woof = Buf.Utf8("woof woof woof")
val ttl = 10.seconds
val created = await { key.set(value = Some(woof), ttl=Some(ttl)) }
assert(created.action == NodeOp.Action.Set)
created.node match {
case Node.Data(path, _, _, lease, value) =>
assert(path == key.path)
assert(lease.isDefined)
assert(value == woof)
info("setting data")

case node =>
fail(s"setting data node, found $node")
}

val set = await(wait)
assert(set.action == NodeOp.Action.Set)
set.node match {
case Node.Data(path, _, _, lease, value) =>
assert(path == created.node.key)
assert(lease.isDefined)
assert(lease.map(_.expiration) == created.node.lease.map(_.expiration))
assert(value == woof)
case node =>
fail(s"observing data $node")
}

info("waiting for expiration")
val expire = await(ttl+1.second) {
key.get(wait = true, waitIndex = Some(set.node.modifiedIndex+1))
}
assert(expire.action == NodeOp.Action.Expire)
expire.node match {
case Node.Data(path, _, _, None, Buf.Empty) =>
assert(path == created.node.key)
case node =>
fail(s"expecting expired data node, found $node")
}
assert(expire.prevNode.isDefined)
val Some(prev) = expire.prevNode
prev match {
case Node.Data(path, _, _, lease, value) =>
assert(path == created.node.key)
assert(lease == None)
assert(value == woof)
case node =>
fail(s"expecting expired data node, found $node")
}
}

}
38 changes: 38 additions & 0 deletions etcd/src/main/scala/com/twitter/finagle/buoyant/FormParams.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.twitter.finagle.buoyant

import com.twitter.finagle.http.Request
import org.jboss.netty.handler.codec.http.multipart._
import scala.collection.JavaConverters._

/**
* Finagle's request type doesn't expose any way to interact with form
* parameters. Netty provides utilities for this, but the underlying
* netty message types can't be accessed from outside of the
* finagle. SO, we just shove this into the finagle package so that we
* can do this. This should be fed back upstream and then removed.
*/
object FormParams {
type Params = Map[String, Seq[String]]

def set(req: Request, params: Seq[(String, String)]): Unit = {
val enc = new HttpPostRequestEncoder(req.httpRequest, false)
for ((k, v) <- params) {
enc.addBodyAttribute(k, v)
}
enc.finalizeRequest()
}

def get(req: Request): Params = {
val dec = new HttpPostRequestDecoder(req.httpRequest)
dec.getBodyHttpDatas().asScala.foldLeft[Params](Map.empty) {
case (attrs, attr: Attribute) =>
val k = attr.getName
val v = attr.getValue
val vals = attrs.getOrElse(k, Seq.empty) :+ v
attrs + (k -> vals)

case (attrs, _) => attrs
}
}

}
64 changes: 64 additions & 0 deletions etcd/src/main/scala/io/buoyant/etcd/ApiError.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package io.buoyant.etcd

case class ApiError(
errorCode: Int,
message: String,
cause: String,
index: Int
) extends Exception(message)

// see: https://github.com/coreos/etcd/blob/master/Documentation/v2/errorcode.md
object ApiError {

class ErrorGroup(val codes: Int*) {
def unapply(code: Int): Option[Int] =
if (codes contains code) Some(code) else None
}

val KeyNotFound = 100
val TestFailed = 101
val NotFile = 102
val NotDir = 104
val NodeExist = 105
val RootReadOnly = 107
val DirNotEmpty = 108
object Command extends ErrorGroup(
KeyNotFound,
TestFailed,
NotFile,
NotDir,
NodeExist,
RootReadOnly,
DirNotEmpty
)

val PrevValueRequired = 201
val TtlNan = 202
val IndexNan = 203
val InvalidField = 209
val InvalidForm = 210
object Content extends ErrorGroup(
PrevValueRequired,
TtlNan,
IndexNan,
InvalidField,
InvalidForm
)

val RaftInternal = 300
val LeaderElect = 301
object Raft extends ErrorGroup(
RaftInternal,
LeaderElect
)

val WatcherCleared = 400
val EventIndexCleared = 401
object Etcd extends ErrorGroup(
WatcherCleared,
EventIndexCleared
)

object Known extends ErrorGroup(Command.codes ++ Content.codes ++ Raft.codes ++ Etcd.codes: _*)
}

0 comments on commit a944f2a

Please sign in to comment.