Skip to content

Commit

Permalink
Restructure
Browse files Browse the repository at this point in the history
  • Loading branch information
takezoe committed Feb 17, 2017
1 parent f0f029f commit 6e9de42
Show file tree
Hide file tree
Showing 21 changed files with 284 additions and 220 deletions.
@@ -0,0 +1,7 @@
package jp.co.bizreach.trace

/**
* Created by nishiyama on 2016/12/08.
*/
case class TraceData(span: brave.Span)

This file was deleted.

@@ -1,4 +1,4 @@
package jp.co.bizreach.trace.zipkin
package jp.co.bizreach.trace

object ZipkinTraceConfig {
val zipkinAkkaName = "zipkin-trace-context"
Expand Down
@@ -0,0 +1,83 @@
package jp.co.bizreach.trace

import brave.propagation.Propagation
import brave.{Span, Tracer}

import scala.concurrent.{ExecutionContext, Future}
import scala.util.Failure

/**
* Created by nishiyama on 2016/12/08.
*/
trait ZipkinTraceServiceLike {

type ServerSpan = Span

implicit val executionContext: ExecutionContext
val tracer: Tracer


def serverReceived[A](spanName: String, span: Span): ServerSpan = {
span.name(spanName).kind(Span.Kind.SERVER).start()
}

def serverSend(span: ServerSpan, tags: (String, String)*): ServerSpan = {
Future {
tags.foreach { case (key, value) => span.tag(key, value) }
span.finish()
}
span
}

// Request Headers => Span
def newSpan[A](headers: A)(getHeader: (A, String) => Option[String]): Span = {
val contextOrFlags = Propagation.B3_STRING.extractor(new Propagation.Getter[A, String] {
def get(carrier: A, key: String): String = getHeader(carrier, key).orNull
}).extract(headers)

Option(contextOrFlags.context())
.map(tracer.newChild)
.getOrElse(tracer.newTrace(contextOrFlags.samplingFlags()))
}

// Span => Request Headers
def toMap(span: Span): Map[String, String] = {
val data = collection.mutable.Map[String, String]()

Propagation.B3_STRING.injector(new Propagation.Setter[collection.mutable.Map[String, String], String] {
def put(carrier: collection.mutable.Map[String, String], key: String, value: String): Unit = carrier += key -> value
}).inject(span.context(), data)

data.toMap
}

def toMap(traceData: TraceData): Map[String, String] = {
toMap(traceData.span)
}

// TODO implement this method!
def trace[A](traceName: String, tags: (String, String)*)(f: TraceData => A)(implicit parentData: TraceData): A = ???

def traceFuture[A](traceName: String, tags: (String, String)*)(f: TraceData => Future[A])(implicit parentDate: TraceData): Future[A] = {
sample(traceName, parentDate.span, tags: _*)(s => f(TraceData(s)))
}

private[trace] def sample[A](name: String, parent: Span, tags: (String, String)*)(f: Span => Future[A]): Future[A] = {
val childSpan = tracer.newChild(parent.context()).name(name).kind(Span.Kind.CLIENT)
tags.foreach { case (key, value) => childSpan.tag(key, value) }
childSpan.start()

val result = f(childSpan)

result.onComplete {
case Failure(t) =>
childSpan.tag("failed", s"Finished with exception: ${t.getMessage}")
childSpan.finish()
case _ =>
childSpan.finish()
}

result
}

}

This file was deleted.

This file was deleted.

@@ -0,0 +1,55 @@
package jp.co.bizreach.trace.play23

import jp.co.bizreach.trace.TraceData
import play.api.Application
import play.api.libs.iteratee.Enumerator
import play.api.libs.ws._

import scala.concurrent.Future

object TraceWS {

def url(spanName: String, url: String)(implicit app: Application, traceData: TraceData): play.api.libs.ws.WSRequestHolder = {
new TraceWSRequest(spanName, WS.url(url).withHeaders(ZipkinTraceService.toMap(traceData).toSeq: _*), traceData)
}

}

class TraceWSRequest(spanName: String, request: WSRequestHolder, traceData: TraceData) extends WSRequestHolder {
override val url: String = request.url
override val method: String = request.method
override val body: WSBody = request.body
override val headers: Map[String, Seq[String]] = request.headers
override val queryString: Map[String, Seq[String]] = request.queryString
override val calc: Option[WSSignatureCalculator] = request.calc
override val auth: Option[(String, String, WSAuthScheme)] = request.auth
override val followRedirects: Option[Boolean] = request.followRedirects
override val requestTimeout: Option[Int] = request.requestTimeout
override val virtualHost: Option[String] = request.virtualHost
override val proxyServer: Option[WSProxyServer] = request.proxyServer

override def sign(calc: SignatureCalculator): WSRequestHolder = new TraceWSRequest(spanName, request.sign(calc), traceData)

override def withAuth(username: String, password: String, scheme: WSAuthScheme): WSRequestHolder = new TraceWSRequest(spanName, request.withAuth(username, password, scheme), traceData)

override def withHeaders(hdrs: (String, String)*): WSRequestHolder = new TraceWSRequest(spanName, request.withHeaders(hdrs:_*), traceData)

override def withQueryString(parameters: (String, String)*): WSRequestHolder = new TraceWSRequest(spanName, request.withQueryString(parameters:_*), traceData)

override def withFollowRedirects(follow: Boolean): WSRequestHolder = new TraceWSRequest(spanName, request.withFollowRedirects(follow), traceData)

override def withRequestTimeout(timeout: Int): WSRequestHolder = new TraceWSRequest(spanName, request.withRequestTimeout(timeout), traceData)

override def withVirtualHost(vh: String): WSRequestHolder = new TraceWSRequest(spanName, request.withVirtualHost(vh), traceData)

override def withProxyServer(proxyServer: WSProxyServer): WSRequestHolder = new TraceWSRequest(spanName, request.withProxyServer(proxyServer), traceData)

override def withBody(body: WSBody): WSRequestHolder = new TraceWSRequest(spanName, request.withBody(body), traceData)

override def withMethod(method: String): WSRequestHolder = new TraceWSRequest(spanName, request.withMethod(method), traceData)

override def execute(): Future[Response] = ZipkinTraceService.traceFuture(spanName){ _ => request.execute() }(traceData)

override def stream(): Future[(WSResponseHeaders, Enumerator[Array[Byte]])] = request.stream()

}
@@ -1,7 +1,7 @@
package jp.co.bizreach.trace.play23

import brave.Tracer
import jp.co.bizreach.trace.zipkin._
import jp.co.bizreach.trace._
import play.api.{Play, Configuration}
import play.api.libs.concurrent.Akka
import zipkin.reporter.AsyncReporter
Expand Down
Expand Up @@ -15,21 +15,20 @@ class ZipkinTraceFilter extends Filter {
private val reqHeaderToSpanName: RequestHeader => String = ZipkinTraceFilter.ParamAwareRequestNamer

def apply(nextFilter: (RequestHeader) => Future[Result])(req: RequestHeader): Future[Result] = {
ZipkinTraceService.serverReceived(
traceName = reqHeaderToSpanName(req),
span = ZipkinTraceService.toSpan(req.headers)((headers, key) => headers.get(key))
).flatMap { serverSpan =>
val result = nextFilter(req.copy(headers = new Headers {
protected val data: Seq[(String, Seq[String])] = {
req.headers.toMap ++ ZipkinTraceService.toMap(serverSpan).map { case (key, value) => key -> Seq(value) }
}.toSeq
}))
result.onComplete {
case Failure(t) => ZipkinTraceService.serverSend(serverSpan, "failed" -> s"Finished with exception: ${t.getMessage}")
case _ => ZipkinTraceService.serverSend(serverSpan)
}
result
val serverSpan = ZipkinTraceService.serverReceived(
spanName = reqHeaderToSpanName(req),
span = ZipkinTraceService.newSpan(req.headers)((headers, key) => headers.get(key))
)
val result = nextFilter(req.copy(headers = new Headers {
protected val data: Seq[(String, Seq[String])] = {
req.headers.toMap ++ ZipkinTraceService.toMap(serverSpan).map { case (key, value) => key -> Seq(value) }
}.toSeq
}))
result.onComplete {
case Failure(t) => ZipkinTraceService.serverSend(serverSpan, "failed" -> s"Finished with exception: ${t.getMessage}")
case _ => ZipkinTraceService.serverSend(serverSpan)
}
result
}
}

Expand Down
@@ -1,26 +1,18 @@
package jp.co.bizreach.trace.play23.implicits

import jp.co.bizreach.trace.{TraceCassette, TraceImplicits}
import jp.co.bizreach.trace.play23.ZipkinTraceService
import jp.co.bizreach.trace.zipkin.ZipkinTraceCassette
import play.api.libs.ws._
import jp.co.bizreach.trace.TraceData
import play.api.mvc.RequestHeader

/**
* Created by nishiyama on 2016/12/08.
*/
trait ZipkinTraceImplicits extends TraceImplicits {
trait ZipkinTraceImplicits {

implicit def request2trace(implicit req: RequestHeader): TraceCassette = {
ZipkinTraceCassette(
span = ZipkinTraceService.toSpan(req.headers)((headers, key) => headers.get(key))
implicit def request2trace(implicit req: RequestHeader): TraceData = {
TraceData(
span = ZipkinTraceService.newSpan(req.headers)((headers, key) => headers.get(key))
)
}

implicit class RichWSRequest(r: WSRequestHolder) {
def withTraceHeader(cassette: TraceCassette): WSRequestHolder = {
r.withHeaders(ZipkinTraceService.toMap(cassette).toSeq: _*)
}
}

}

0 comments on commit 6e9de42

Please sign in to comment.