Skip to content

Commit

Permalink
Merge pull request #728 from openzipkin/json-api
Browse files Browse the repository at this point in the history
Converts the communication between web and query to http
  • Loading branch information
adriancole committed Sep 28, 2015
2 parents d1df85b + 7868354 commit 3a2cd65
Show file tree
Hide file tree
Showing 16 changed files with 827 additions and 437 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
@@ -1,5 +1,5 @@
#Sat, 15 Aug 2015 15:15:18 +0000
version=1.12.1-SNAPSHOT
version=1.13.0-SNAPSHOT
group=io.zipkin
repo=https://github.com/openzipkin/zipkin
description=A distributed tracing system
Expand Down
Expand Up @@ -3,22 +3,20 @@ package com.twitter.zipkin.json
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.module.SimpleModule
import com.fasterxml.jackson.databind.{ObjectMapper, JsonSerializer, SerializerProvider}
import com.fasterxml.jackson.databind.{JsonSerializer, ObjectMapper, SerializerProvider}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.twitter.zipkin.common._

object ZipkinJson {

object ZipkinJson extends ObjectMapper with ScalaObjectMapper {
val module = new SimpleModule("ZipkinJson")
.addSerializer(classOf[Endpoint], serializer(JsonService))
.addSerializer(classOf[Annotation], serializer(JsonAnnotation))
.addSerializer(classOf[BinaryAnnotation], serializer(JsonBinaryAnnotation))
.addSerializer(classOf[Span], serializer(JsonSpan))

val mapper = new ObjectMapper()
.registerModule(new DefaultScalaModule())
.registerModule(module)
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
.addSerializer(classOf[Endpoint], serializer(JsonService))
.addSerializer(classOf[Annotation], serializer(JsonAnnotation))
.addSerializer(classOf[BinaryAnnotation], serializer(JsonBinaryAnnotation))
.addSerializer(classOf[Span], serializer(JsonSpan))
registerModule(new DefaultScalaModule())
registerModule(module)
setSerializationInclusion(JsonInclude.Include.NON_NULL)

private[this] def serializer[T, R](converter: (T) => R) = {
new JsonSerializer[T] {
Expand Down
Expand Up @@ -12,7 +12,7 @@ import org.scalatest.{FunSuite, Matchers}
* Tests that who how data is serialized, so that subtle code changes don't break users.
*/
class ZipkinJsonTest extends FunSuite with Matchers {
val mapper = ZipkinJson.mapper
val mapper = ZipkinJson

val web = Endpoint((192 << 24 | 168 << 16 | 1), 8080, "zipkin-web")
val query = Endpoint((192 << 24 | 168 << 16 | 1), 9411, "zipkin-query")
Expand Down
Expand Up @@ -16,10 +16,10 @@
package com.twitter.zipkin.builder

import com.twitter.finagle.ListeningServer
import com.twitter.finagle.tracing.{NullTracer, DefaultTracer}
import com.twitter.finagle.tracing.{DefaultTracer, NullTracer}
import com.twitter.finagle.zipkin.thrift.RawZipkinTracer
import com.twitter.ostrich.admin.RuntimeEnvironment
import com.twitter.zipkin.query.ZipkinQueryServerFactory
import com.twitter.zipkin.query.ZipkinQueryServer
import com.twitter.zipkin.storage.Store

case class QueryServiceBuilder(
Expand All @@ -28,9 +28,6 @@ case class QueryServiceBuilder(
) extends Builder[RuntimeEnvironment => ListeningServer] {

def apply(): (RuntimeEnvironment) => ListeningServer = (runtime: RuntimeEnvironment) => {
serverBuilder.apply().apply(runtime)
val store = storeBuilder.apply()

// If a scribe host is configured, send all traces to it, otherwise disable tracing
val scribeHost = sys.env.get("SCRIBE_HOST")
val scribePort = sys.env.get("SCRIBE_PORT")
Expand All @@ -40,8 +37,15 @@ case class QueryServiceBuilder(
NullTracer
}

object UseOnceFactory extends com.twitter.app.App with ZipkinQueryServerFactory
UseOnceFactory.queryServicePort.parse(serverBuilder.serverAddress.getHostAddress + ":" + serverBuilder.serverPort)
UseOnceFactory.newQueryServer(store.spanStore, store.dependencies, serverBuilder.statsReceiver)
val store = storeBuilder.apply()
object Main extends ZipkinQueryServer(store.spanStore, store.dependencies) {
def admin = adminHttpServer
override val defaultFinatraHttpPort = serverBuilder.serverAddress.getHostAddress + ":" + serverBuilder.serverPort
override val defaultHttpPort = serverBuilder.adminPort
}
Main.nonExitingMain(Array(
"-local.doc.root", "/"
))
Main.admin
}
}
16 changes: 12 additions & 4 deletions zipkin-query/build.gradle
@@ -1,8 +1,16 @@
repositories {
mavenCentral()
}

dependencies {
compile project(':zipkin-scrooge')
compile "com.twitter.finatra:finatra-http_${scalaInterfaceVersion}:${commonVersions.finatra}"
compile "com.twitter.finatra:finatra-slf4j_${scalaInterfaceVersion}:${commonVersions.finatra}"

compile "com.twitter:util-core_${scalaInterfaceVersion}:${commonVersions.twitterUtil}"
compile "com.twitter:finagle-core_${scalaInterfaceVersion}:${commonVersions.finagle}"
// for ZipkinQueryServiceFactory
compile "com.twitter:finagle-thriftmux_${scalaInterfaceVersion}:${commonVersions.finagle}"
testCompile "com.twitter.finatra:finatra-http_${scalaInterfaceVersion}:${commonVersions.finatra}:tests"
testCompile "com.twitter.finatra:finatra-jackson_${scalaInterfaceVersion}:${commonVersions.finatra}:tests"
testCompile "com.twitter.inject:inject-core_${scalaInterfaceVersion}:${commonVersions.finatra}:tests"
testCompile "com.twitter.inject:inject-modules_${scalaInterfaceVersion}:${commonVersions.finatra}:tests"
testCompile "com.twitter.inject:inject-app_${scalaInterfaceVersion}:${commonVersions.finatra}:tests"
testCompile "com.twitter.inject:inject-server_${scalaInterfaceVersion}:${commonVersions.finatra}:tests"
}
@@ -0,0 +1,105 @@
/*
* Copyright 2012 Twitter Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.zipkin.query

import com.twitter.finagle.httpx.Request
import com.twitter.finatra.annotations.Flag
import com.twitter.util.{Time, TwitterDateFormat}
import com.twitter.zipkin.storage.SpanStore
import com.twitter.zipkin.thriftscala.QueryRequest
import java.util.{Calendar, Date}
import javax.inject.Inject

// TODO: rewrite me into a normal finatra case class
class QueryExtractor @Inject()(spanStore: SpanStore,
@Flag("zipkin.queryService.durationBatchSize") defaultQueryLimit: Int) {
val fmt = TwitterDateFormat("MM-dd-yyyy'T'HH:mm:ss.SSSZ")

private[this] val dateFormat = TwitterDateFormat("MM-dd-yyyy")
private[this] val timeFormat = TwitterDateFormat("HH:mm")

def getDate(req: Request): Option[Date] =
req.params.get("date").map(dateFormat.parse)

def getDateStr(req: Request): String = {
val date = getDate(req).getOrElse(Calendar.getInstance().getTime)
dateFormat.format(date)
}

def getLimit(req: Request): Option[Int] = {
req.params.get("limit").map(_.toInt)
}

def getLimitStr(req: Request): String = {
getLimit(req).getOrElse(defaultQueryLimit).toString
}

def getTime(req: Request): Option[Date] =
req.params.get("time").map(timeFormat.parse)

def getTimeStr(req: Request): String = {
val time = getTime(req).getOrElse(Calendar.getInstance().getTime)
timeFormat.format(time)
}

def getTimestampStr(req: Request): String = {
getTimestamp(req).getOrElse(Time.now.inMicroseconds).toString
}

def getTimestamp(req: Request): Option[Long] = {
req.params.getLong("timestamp")
}

/**
* Takes a `Request` and produces the correct `QueryRequest` depending
* on the GET parameters present
*/
def apply(req: Request): Option[QueryRequest] = req.params.get("serviceName").filterNot(_ == "") map { serviceName =>
val spanName = req.params.get("spanName") filterNot { n => n == "all" || n == "" }

val timestamp = getTimestamp(req).getOrElse(Time.now.inMicroseconds)

val (annotations, binaryAnnotations) = req.params.get("annotationQuery") map { query =>
var anns = Seq.empty[String]
var binAnns = scala.collection.mutable.Map[String, String]()

query.split(" and ") foreach { ann =>
ann.split("=").toList match {
case "" :: Nil =>
case key :: value :: Nil =>
binAnns += key -> value
case key :: Nil =>
anns +:= key
case _ =>
}
}

( (if (anns.isEmpty) None else Some(anns)),
(if (binAnns.isEmpty) None else Some(binAnns))
)
} getOrElse {
(None, None)
}
val limit = getLimit(req).getOrElse(defaultQueryLimit)
QueryRequest(serviceName, spanName, annotations, binaryAnnotations, timestamp, limit, adjustClockSkew(req))
}

def adjustClockSkew(req: Request): Boolean =
req.params.get("adjust_clock_skew") match {
case Some("false") => false
case _ => true
}
}
Expand Up @@ -17,41 +17,42 @@
package com.twitter.zipkin.query

import com.google.common.base.Charsets.UTF_8
import com.twitter.finagle.stats.{DefaultStatsReceiver, Stat, StatsReceiver}
import com.twitter.finagle.tracing.{Trace => FTrace}
import com.twitter.logging.Logger
import com.twitter.util.Future
import com.twitter.zipkin.common.Span
import com.twitter.zipkin.conversions.thrift._
import com.twitter.zipkin.query.adjusters._
import com.twitter.zipkin.query.constants._
import com.twitter.zipkin.storage._
import com.twitter.zipkin.thriftscala
import com.twitter.zipkin.thriftscala.Dependencies
import com.twitter.zipkin.thriftscala.QueryRequest
import java.nio.ByteBuffer
import javax.inject.Inject

class ThriftQueryService(
spanStore: SpanStore,
dependencyStore: DependencyStore = new NullDependencyStore,
traceDurationFetchBatchSize: Int = 500,
stats: StatsReceiver = DefaultStatsReceiver.scope("ThriftQueryService"),
log: Logger = Logger.get("ThriftQueryService")
) extends thriftscala.ZipkinQuery[Future] with thriftscala.DependencyStore[Future] {
// TODO: this class has a lot of tech debt, and also hints spanstore needs to be redone to not require a preparatory id fetch.
class QueryTraceIds @Inject()(spanStore: SpanStore) extends ((thriftscala.QueryRequest) => Future[Seq[Long]]) {

private[this] val methodStats = stats.scope("perMethod")
private val timeSkewAdjuster = new TimeSkewAdjuster()
override def apply(qr: QueryRequest) = {
val sliceQueries = Seq[Option[Seq[SliceQuery]]](
qr.spanName.map { n => Seq(SpanSliceQuery(n)) },
qr.annotations.map { _.map { AnnotationSliceQuery(_, None) } },
qr.binaryAnnotations.map { _.map { e => AnnotationSliceQuery(e._1, Some(ByteBuffer.wrap(e._2.getBytes(UTF_8)))) }(collection.breakOut) }
).flatten.flatten

private[this] def opt[T](param: T): Option[T] = param match {
case null | "" => None
case s => Some(s)
}
sliceQueries match {
case Nil =>
spanStore.getTraceIdsByName(qr.serviceName, None, qr.endTs, qr.limit) flatMap {
queryResponse(_, qr)
}

case slice :: Nil =>
querySlices(sliceQueries, qr) flatMap { ids => queryResponse(ids.flatten, qr) }

private[this] def adjustedTraces(spans: Seq[Seq[Span]], adjustClockSkew: Boolean): Seq[Trace] = {
val traces = spans.map(Trace(_))
if (adjustClockSkew) {
traces.map(t => timeSkewAdjuster.adjust(_))
case _ =>
// TODO: timestamps endTs is the wrong name for all this
querySlices(sliceQueries, qr.copy(limit = 1)) flatMap { ids =>
val ts = padTimestamp(ids.flatMap(_.map(_.timestamp)).reduceOption(_ min _).getOrElse(0))
querySlices(sliceQueries, qr.copy(endTs = ts)) flatMap { ids =>
queryResponse(traceIdsIntersect(ids), qr)
}
}
}
traces
}

private[this] def padTimestamp(timestamp: Long): Long =
Expand Down Expand Up @@ -92,93 +93,4 @@ class ThriftQueryService(
case s =>
Future.exception(new Exception("Uknown SliceQuery: %s".format(s)))
})

private[this] def handle[T](name: String)(f: => Future[T]): Future[T] = {
val errorStats = methodStats.scope("errors")

val ret = try {
Stat.timeFuture(methodStats.stat(name))(f)
} catch {
case e: Exception => Future.exception(e)
}

ret rescue { case e: Exception =>
log.error(e, "%s error".format(name))
errorStats.counter(name).incr()
errorStats.scope(name).counter(e.getClass.getName).incr()
Future.exception(thriftscala.QueryException(e.toString))
}
}

private[this] val noServiceNameError = Future.exception(thriftscala.QueryException("No service name provided"))

private[this] def handleQuery[T](name: String, qr: thriftscala.QueryRequest)(f: => Future[T]): Future[T] = {
if (!opt(qr.serviceName).isDefined) noServiceNameError else {
FTrace.recordBinary("serviceName", qr.serviceName)
FTrace.recordBinary("endTs", qr.endTs)
FTrace.recordBinary("limit", qr.limit)
handle(name)(f)
}
}

def traceIds(qr: thriftscala.QueryRequest): Future[Seq[Long]] = {
val sliceQueries = Seq[Option[Seq[SliceQuery]]](
qr.spanName.map { n => Seq(SpanSliceQuery(n)) },
qr.annotations.map { _.map { AnnotationSliceQuery(_, None) } },
qr.binaryAnnotations.map { _.map { e => AnnotationSliceQuery(e._1, Some(ByteBuffer.wrap(e._2.getBytes(UTF_8)))) }(collection.breakOut) }
).flatten.flatten

sliceQueries match {
case Nil =>
spanStore.getTraceIdsByName(qr.serviceName, None, qr.endTs, qr.limit) flatMap {
queryResponse(_, qr)
}

case slice :: Nil =>
querySlices(sliceQueries, qr) flatMap { ids => queryResponse(ids.flatten, qr) }

case _ =>
// TODO: timestamps endTs is the wrong name for all this
querySlices(sliceQueries, qr.copy(limit = 1)) flatMap { ids =>
val ts = padTimestamp(ids.flatMap(_.map(_.timestamp)).reduceOption(_ min _).getOrElse(0))
querySlices(sliceQueries, qr.copy(endTs = ts)) flatMap { ids =>
queryResponse(traceIdsIntersect(ids), qr)
}
}
}
}

override def getTraces(qr: thriftscala.QueryRequest): Future[Seq[thriftscala.Trace]] =
handleQuery("getTraces", qr) {
traceIds(qr).flatMap(getTracesByIds(_, qr.adjustClockSkew))
}

override def getTracesByIds(traceIds: Seq[Long], adjustClockSkew: Boolean = true): Future[Seq[thriftscala.Trace]] =
handle("getTracesByIds") {
if (traceIds.isEmpty) {
return Future.value(Seq.empty)
}
FTrace.recordBinary("numIds", traceIds.length)
spanStore.getSpansByTraceIds(traceIds) map { adjustedTraces(_, adjustClockSkew).map(_.toThrift) }
}

override def getServiceNames: Future[Set[String]] =
handle("getServiceNames") {
spanStore.getAllServiceNames
}

override def getSpanNames(serviceName: String): Future[Set[String]] =
handle("getSpanNames") {
spanStore.getSpanNames(serviceName)
}

override def getDependencies(startTime: Option[Long], endTime: Option[Long]) =
handle("getDependencies") {
dependencyStore.getDependencies(startTime, endTime).map(_.toThrift)
}

override def storeDependencies(dependencies: Dependencies) =
handle("storeDependencies") {
dependencyStore.storeDependencies(dependencies.toDependencies)
}
}

0 comments on commit 3a2cd65

Please sign in to comment.