Skip to content

Commit

Permalink
Instrument watches in the io.l5d.namerd interpreter (#1943)
Browse files Browse the repository at this point in the history
It can be difficult to know which watches the Namerd interpreter has cached, which of them are active, and if they are receiving updates from the Namerd. This has made issues with the Namerd extremely difficult to diagnose and debug.

We instrument the Activities in the Namerd interpreter to keep track of metadata about themselves. This metadata is exposed though an admin endpoint.  We also remove the existing NamerdHandler since this information is now available elsewhere (the dtab interface and this state endpoint).

Fixes #1913 

Signed-off-by: Alex Leong <alex@buoyant.io>
  • Loading branch information
adleong committed May 23, 2018
1 parent 49229b7 commit d10220e
Show file tree
Hide file tree
Showing 14 changed files with 412 additions and 142 deletions.
@@ -1,3 +1,4 @@
io.buoyant.config.types.ByteBufferSerializer
io.buoyant.config.types.DirectorySerializer
io.buoyant.config.types.DtabSerializer
io.buoyant.config.types.FileSerializer
Expand Down
@@ -0,0 +1,18 @@
package io.buoyant.config.types

import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.SerializerProvider
import com.twitter.io.Buf
import io.buoyant.config.ConfigSerializer
import java.nio.ByteBuffer

class ByteBufferSerializer extends ConfigSerializer[ByteBuffer] {
override def serialize(
value: ByteBuffer,
jgen: JsonGenerator,
provider: SerializerProvider
): Unit = {
val Buf.Utf8(s) = Buf.ByteBuffer.Shared(value)
jgen.writeString(s)
}
}

This file was deleted.

Expand Up @@ -120,14 +120,7 @@ case class NamerdInterpreterConfig(
val ns = namespace.getOrElse("default")
val Label(routerLabel) = params[Label]

new ThriftNamerClient(iface, ns, backoffs, stats) with Admin.WithHandlers with Admin.WithNavItems {
val handler = new NamerdHandler(Seq(routerLabel -> config), Map(routerLabel -> this))

override def adminHandlers: Seq[Handler] =
Seq(Handler("/namerd", handler, css = Seq("delegator.css")))

override def navItems: Seq[NavItem] = Seq(NavItem("namerd", "namerd"))
}
new ThriftNamerClient(iface, ns, backoffs, stats)
}
}

Expand Down
3 changes: 3 additions & 0 deletions linkerd/docs/interpreter.md
Expand Up @@ -39,6 +39,9 @@ interpreter uses Namerd's long-poll thrift interface
(`io.l5d.thriftNameInterpreter`). Note that the protocol that the interpreter
uses to talk to Namerd is unrelated to the protocols of Linkerd's routers.

The internal state of the Namerd interpreter can be viewed at the
admin endpoint: `/interpreter_state/io.l5d.namerd/<namespace>.json`.

Key | Default Value | Description
--- | ------------- | -----------
dst | _required_ | A Finagle path locating the Namerd service.
Expand Down
Expand Up @@ -11,7 +11,9 @@ case class VarState[T](
lastStoppedAt: Option[String],
lastUpdatedAt: Option[String],
value: T
)
) {
def map[U](f: T => U): VarState[U] = copy(value = f(value))
}

/**
* InstrumentedVar holds a Var.async and also records metadata about it such as whether it is
Expand Down
@@ -0,0 +1,4 @@
io.buoyant.namerd.iface.AddrReqSerializer
io.buoyant.namerd.iface.AddrSerializer
io.buoyant.namerd.iface.BindReqSerializer
io.buoyant.namerd.iface.BoundSerializer
@@ -0,0 +1,23 @@
package io.buoyant.namerd.iface

import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.SerializerProvider
import io.buoyant.config.ConfigSerializer
import io.buoyant.namerd.iface.{thriftscala => thrift}

class AddrReqSerializer extends ConfigSerializer[thrift.AddrReq] {
import ByteBufferSerializers._

override def serialize(
value: thrift.AddrReq,
gen: JsonGenerator,
provider: SerializerProvider
): Unit = {
gen.writeStartObject()
gen.writeStringField("name", path(value.name.name))
gen.writeStringField("stamp", stamp(value.name.stamp))
gen.writeStringField("namespace", value.name.ns)
gen.writeStringField("clientId", path(value.clientId))
gen.writeEndObject()
}
}
@@ -0,0 +1,44 @@
package io.buoyant.namerd.iface

import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.SerializerProvider
import io.buoyant.config.ConfigSerializer
import io.buoyant.namerd.iface.{thriftscala => thrift}

class AddrSerializer extends ConfigSerializer[thrift.Addr] {
import ByteBufferSerializers._

override def serialize(
value: thrift.Addr,
gen: JsonGenerator,
provider: SerializerProvider
): Unit = {

gen.writeStartObject()
gen.writeStringField("stamp", stamp(value.stamp))
value.value match {
case bound: thrift.AddrVal.Bound =>
gen.writeStringField("type", "bound")
gen.writeArrayFieldStart("addresses")
for (address <- bound.bound.addresses) {
gen.writeStartObject()
gen.writeStringField("ip", ipv4(address.ip))
gen.writeNumberField("port", address.port)
for (meta <- address.meta) {
gen.writeObjectFieldStart("meta")
for (authority <- meta.authority) gen.writeStringField("authority", authority)
for (nodeName <- meta.nodeName) gen.writeStringField("nodeName", nodeName)
for (endpointAddrWeight <- meta.endpointAddrWeight) gen.writeNumberField("endpointAddrWeight", endpointAddrWeight)
gen.writeEndObject()
}
gen.writeEndObject()
}
gen.writeEndArray()
case neg: thrift.AddrVal.Neg =>
gen.writeStringField("type", "neg")
case unknown: thrift.AddrVal.UnknownUnionField =>
gen.writeStringField("type", "unknown")
}
gen.writeEndObject()
}
}
@@ -0,0 +1,24 @@
package io.buoyant.namerd.iface

import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.SerializerProvider
import io.buoyant.config.ConfigSerializer
import io.buoyant.namerd.iface.{thriftscala => thrift}

class BindReqSerializer extends ConfigSerializer[thrift.BindReq] {
import ByteBufferSerializers._

override def serialize(
value: thrift.BindReq,
gen: JsonGenerator,
provider: SerializerProvider
): Unit = {
gen.writeStartObject()
gen.writeStringField("name", path(value.name.name))
gen.writeStringField("dtab", value.dtab)
gen.writeStringField("stamp", stamp(value.name.stamp))
gen.writeStringField("namespace", value.name.ns)
gen.writeStringField("clientId", path(value.clientId))
gen.writeEndObject()
}
}
@@ -0,0 +1,64 @@
package io.buoyant.namerd.iface

import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.SerializerProvider
import io.buoyant.config.ConfigSerializer
import io.buoyant.namerd.iface.{thriftscala => thrift}

class BoundSerializer extends ConfigSerializer[thrift.Bound] {
import ByteBufferSerializers._

def serializeNode(node: thrift.BoundNode, gen: JsonGenerator): Unit = {
gen.writeStartObject()
node match {
case alt: thrift.BoundNode.Alt =>
gen.writeStringField("type", "alt")
gen.writeFieldName("alt")
gen.writeArray(alt.alt.toArray, 0, alt.alt.length)
case empty: thrift.BoundNode.Empty =>
gen.writeStringField("type", "empty")
case fail: thrift.BoundNode.Fail =>
gen.writeStringField("type", "fail")
case leaf: thrift.BoundNode.Leaf =>
gen.writeStringField("type", "leaf")
gen.writeStringField("id", path(leaf.leaf.id))
gen.writeStringField("residual", path(leaf.leaf.residual))
case neg: thrift.BoundNode.Neg =>
gen.writeStringField("type", "neg")
case weighted: thrift.BoundNode.Weighted =>
gen.writeStringField("type", "union")
gen.writeArrayFieldStart("weighted")
for (node <- weighted.weighted) {
gen.writeStartObject()
gen.writeNumberField("weight", node.weight)
gen.writeNumberField("id", node.id)
gen.writeEndObject()
}
gen.writeEndArray()
case unknown: thrift.BoundNode.UnknownUnionField =>
gen.writeStringField("type", "unknown")
}
gen.writeEndObject()
}

override def serialize(
value: thrift.Bound,
gen: JsonGenerator,
provider: SerializerProvider
): Unit = {
gen.writeStartObject()
gen.writeStringField("stamp", stamp(value.stamp))
gen.writeStringField("namespace", value.ns)
gen.writeObjectFieldStart("tree")
gen.writeFieldName("root")
serializeNode(value.tree.root, gen)
gen.writeObjectFieldStart("nodes")
for ((id, node) <- value.tree.nodes) {
gen.writeFieldName(id.toString)
serializeNode(node, gen)
}
gen.writeEndObject()
gen.writeEndObject()
gen.writeEndObject()
}
}
@@ -0,0 +1,21 @@
package io.buoyant.namerd.iface

import com.twitter.io.Buf
import java.nio.ByteBuffer

object ByteBufferSerializers {

def utf8(bb: ByteBuffer): String =
Buf.Utf8.unapply(Buf.ByteBuffer.Shared(bb)).get

def path(path: Seq[ByteBuffer]): String =
path.map(utf8).mkString("/", "/", "")

def stamp(bb: ByteBuffer): String =
bb.duplicate().getLong.toString

def ipv4(bb: ByteBuffer): String = {
val dup = bb.duplicate()
s"${dup.get()}.${dup.get()}.${dup.get()}.${dup.get()}"
}
}
@@ -0,0 +1,33 @@
package io.buoyant.namerd.iface

import com.fasterxml.jackson.annotation.JsonIgnore
import com.twitter.util.{Return, Throw, Time, Try}

class PollState[Req, Rep] {

@JsonIgnore
def recordApiCall(req: Req): Unit = synchronized {
request = Some(req)
lastRequestAt = Some(Time.now.toString)
}

@JsonIgnore
def recordResponse(rep: Try[Rep]): Unit = synchronized {
lastResponseAt = Some(Time.now.toString)
rep match {
case Return(r) =>
response = Some(r)
error = None
case Throw(e) =>
error = Some(e)
response = None
}
}

// These fields exist to be serialized.
protected var request: Option[Req] = None
protected var lastRequestAt: Option[String] = None
protected var response: Option[Rep] = None
protected var lastResponseAt: Option[String] = None
protected var error: Option[Throwable] = None
}

0 comments on commit d10220e

Please sign in to comment.