Skip to content

Commit

Permalink
finagle/finagle-http: propagate contexts over HTTP
Browse files Browse the repository at this point in the history
Problem
Contexts are not sent over HTTP, which means deadlines are not propagated.

Solution
Deadlines are sent in Finagle-Ctx request headers.

RB_ID=778123
TBR=true
  • Loading branch information
jcrossley authored and jenkins committed Dec 14, 2015
1 parent 6e75832 commit 24fe827
Show file tree
Hide file tree
Showing 10 changed files with 238 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,14 @@ object Deadline extends Contexts.broadcast.Key[Deadline]("com.twitter.finagle.De
}

def tryUnmarshal(body: Buf): Try[Deadline] = {
if (body.length != 16)
return Throw(new IllegalArgumentException("Invalid body"))

if (body.length != 16)
return Throw(new IllegalArgumentException(
s"Invalid body. Length ${body.length} but required 16"))

val bytes = Buf.ByteArray.Owned.extract(body)
val timestamp = ByteArrays.get64be(bytes, 0)
val deadline = ByteArrays.get64be(bytes, 8)

Return(Deadline(Time.fromNanoseconds(timestamp), Time.fromNanoseconds(deadline)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ final class MarshalledContext extends Context {
* and unmarshaller.
*/

abstract class Key[A](id: String) {
abstract class Key[A](val id: String) {
/**
* A unique identifier defining this marshaller. This is
* transmitted together with marshalled values in order to
Expand Down
13 changes: 7 additions & 6 deletions finagle-http/src/main/scala/com/twitter/finagle/Http.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import com.twitter.finagle.client._
import com.twitter.finagle.dispatch.GenSerialClientDispatcher
import com.twitter.finagle.http.{HttpClientTraceInitializer, HttpServerTraceInitializer, HttpTransport, Request, Response}
import com.twitter.finagle.http.codec.{HttpClientDispatcher, HttpServerDispatcher}
import com.twitter.finagle.http.filter.{DtabFilter, HttpNackFilter}
import com.twitter.finagle.http.filter.{ClientContextFilter, DtabFilter, HttpNackFilter, ServerContextFilter}
import com.twitter.finagle.netty3._
import com.twitter.finagle.param.{ProtocolLibrary, Stats}
import com.twitter.finagle.server._
Expand Down Expand Up @@ -121,10 +121,10 @@ object Http extends Client[Request, Response] with HttpRichClient
): Client = copy(stack, params)

protected def newDispatcher(transport: Transport[Any, Any]): Service[Request, Response] =
new HttpClientDispatcher(
transport,
params[Stats].statsReceiver.scope(GenSerialClientDispatcher.StatsScope)
)
(new ClientContextFilter[Request, Response])
.andThen(new HttpClientDispatcher(
transport,
params[Stats].statsReceiver.scope(GenSerialClientDispatcher.StatsScope)))

def withTls(cfg: Netty3TransporterTLSConfig): Client =
configured(Transport.TLSClientEngine(Some(cfg.newEngine)))
Expand Down Expand Up @@ -212,9 +212,10 @@ object Http extends Client[Request, Response] with HttpRichClient
protected def newDispatcher(transport: Transport[In, Out],
service: Service[Request, Response]) = {
val dtab = new DtabFilter.Finagle[Request]
val context = new ServerContextFilter[Request, Response]
val Stats(stats) = params[Stats]

new HttpServerDispatcher(new HttpTransport(transport), dtab andThen service, stats.scope("dispatch"))
new HttpServerDispatcher(new HttpTransport(transport), dtab andThen context andThen service, stats.scope("dispatch"))
}

protected def copy1(
Expand Down
13 changes: 8 additions & 5 deletions finagle-http/src/main/scala/com/twitter/finagle/http/Codec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.twitter.conversions.storage._
import com.twitter.finagle._
import com.twitter.finagle.dispatch.GenSerialClientDispatcher
import com.twitter.finagle.http.codec._
import com.twitter.finagle.http.filter.{DtabFilter, HttpNackFilter}
import com.twitter.finagle.http.filter.{ClientContextFilter, DtabFilter, HttpNackFilter, ServerContextFilter}
import com.twitter.finagle.stats.StatsReceiver
import com.twitter.finagle.tracing._
import com.twitter.finagle.transport.Transport
Expand Down Expand Up @@ -126,12 +126,13 @@ case class Http(

override def prepareConnFactory(
underlying: ServiceFactory[Request, Response]
): ServiceFactory[Request, Response] = {
): ServiceFactory[Request, Response] =
// Note: This is a horrible hack to ensure that close() calls from
// ExpiringService do not propagate until all chunks have been read
// Waiting on CSL-915 for a proper fix.
underlying.map(new DelayedReleaseService(_))
}
underlying.map( u =>
(new ClientContextFilter[Request, Response])
.andThen(new DelayedReleaseService(u)))

override def newClientTransport(ch: Channel, statsReceiver: StatsReceiver): Transport[Any,Any] =
new HttpTransport(super.newClientTransport(ch, statsReceiver))
Expand Down Expand Up @@ -197,7 +198,9 @@ case class Http(
override def prepareConnFactory(
underlying: ServiceFactory[Request, Response]
): ServiceFactory[Request, Response] =
(new HttpNackFilter).andThen(new DtabFilter.Finagle[Request]).andThen(underlying)
(new HttpNackFilter).andThen(new DtabFilter.Finagle[Request])
.andThen(new ServerContextFilter[Request, Response])
.andThen(underlying)

override def newTraceInitializer =
if (_enableTracing) new HttpServerTraceInitializer[Request, Response]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.twitter.finagle.http.codec

import com.twitter.finagle.{Dtab, Failure}
import com.twitter.finagle.{Deadline, Dtab, Failure}
import com.twitter.finagle.context.Contexts
import com.twitter.finagle.dispatch.GenSerialClientDispatcher
import com.twitter.finagle.http.{Fields, ReaderUtils, Request, Response}
import com.twitter.finagle.http.filter.HttpNackFilter
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.twitter.finagle.http.codec

import com.twitter.finagle._
import com.twitter.finagle.context.Contexts
import com.twitter.finagle.http.Message
import com.twitter.logging.Logger
import com.twitter.util.{NonFatal, Time}

private[http] object HttpContext {

private[this] val Prefix = "Finagle-Ctx-"
private[this] val DeadlineHeaderKey = Prefix+Deadline.id

private val log = Logger(getClass.getName)

private[this] def marshalDeadline(deadline: Deadline): String =
deadline.timestamp.inNanoseconds + " " + deadline.deadline.inNanoseconds

private[this] def unmarshalDeadline(header: String): Option[Deadline] =
try {
val values = header.split(' ')
val timestamp = values(0).toLong
val deadline = values(1).toLong
Some(Deadline(Time.fromNanoseconds(timestamp), Time.fromNanoseconds(deadline)))
} catch {
case NonFatal(exc) =>
log.debug(s"Could not unmarshall Deadline from header value: ${header}")
None
}

/**
* Read Finagle-Ctx header pairs from the given message for Contexts:
* - Deadline
* and run `fn`.
*/
def read[R](msg: Message)(fn: => R): R =
msg.headerMap.get(DeadlineHeaderKey) match {
case Some(str) =>
unmarshalDeadline(str) match {
case Some(deadline) => Contexts.broadcast.let(Deadline, deadline)(fn)
case None => fn
}
case None =>
fn
}

/**
* Write Finagle-Ctx header pairs into the given message for Contexts:
* - Deadline
*/
def write(msg: Message): Unit =
Contexts.broadcast.get(Deadline) match {
case Some(deadline) =>
msg.headerMap.set(DeadlineHeaderKey, marshalDeadline(deadline))
case None =>
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.twitter.finagle.http.filter

import com.twitter.finagle._
import com.twitter.finagle.context.Contexts
import com.twitter.finagle.http.Request
import com.twitter.finagle.http.codec.HttpContext
import com.twitter.util.Future

/**
* Sets the following Context values from the request headers:
* - request deadline
*/
private[finagle] class ServerContextFilter[Req <: Request, Rep]
extends SimpleFilter[Req, Rep] {

def apply(req: Req, service: Service[Req, Rep]): Future[Rep] =
HttpContext.read(req)(service(req))
}

/**
* Sets the following header values for the request Context:
* - request deadline
*/
private[finagle] class ClientContextFilter[Req <: Request, Rep]
extends SimpleFilter[Req, Rep] {

def apply(req: Req, service: Service[Req, Rep]): Future[Rep] = {
HttpContext.write(req)
service(req)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,26 @@ class EndToEndTest extends FunSuite with BeforeAndAfter {
client.close()
}

test(name + ": context") {
val writtenDeadline = Deadline.ofTimeout(5.seconds)
val service = new HttpService {
def apply(request: Request) = {
val deadline = Contexts.broadcast.get(Deadline).get
assert(deadline.deadline == writtenDeadline.deadline)
val response = Response(request)
Future.value(response)
}
}

Contexts.broadcast.let(Deadline, writtenDeadline) {
val req = Request()
val client = connect(service)
val res = Await.result(client(Request("/")))
assert(res.status == Status.Ok)
client.close()
}
}

test(name + ": stream") {
def service(r: Reader) = new HttpService {
def apply(request: Request) = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.twitter.finagle.http.codec

import com.twitter.conversions.time._
import com.twitter.finagle._
import com.twitter.finagle.context.Contexts
import com.twitter.finagle.Deadline
import com.twitter.finagle.http.{Message, Method, Request, Version}
import org.junit.runner.RunWith
import org.scalatest.FunSuite
import org.scalatest.junit.JUnitRunner

@RunWith(classOf[JUnitRunner])
class HttpContextTest extends FunSuite {

def newMsg(): Message = Request(Version.Http11, Method.Get, "/")

test("written request context matches read request context") {
val m = newMsg()
val writtenDeadline = Deadline.ofTimeout(5.seconds)
Contexts.broadcast.let(Deadline, writtenDeadline) {
HttpContext.write(m)

// Clear the deadline value in the context
Contexts.broadcast.letClear(Deadline) {

// ensure the deadline was cleared
assert(Contexts.broadcast.get(Deadline) == None)

HttpContext.read(m) {
val readDeadline = Contexts.broadcast.get(Deadline).get
assert(writtenDeadline == readDeadline)
}
}
}
}

test("invalid context header value causes context to not be set") {
val m = newMsg()
m.headers.set("Finagle-Ctx-com.twitter.finagle.foo", ",,,");
HttpContext.read(m) {
assert(Contexts.broadcast.marshal.isEmpty)
}
}

test("when there are no context headers, reading returns an empty iterator") {
val m = newMsg()
HttpContext.read(m) {
assert(Contexts.broadcast.marshal.isEmpty)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.twitter.finagle.http.filter

import com.twitter.conversions.time._
import com.twitter.finagle.{Deadline, Service}
import com.twitter.finagle.context.Contexts
import com.twitter.finagle.http.{Status, Response, Request}
import com.twitter.finagle.http.codec.HttpContext
import com.twitter.util.{Await, Future}
import org.junit.runner.RunWith
import org.scalatest.FunSuite
import org.scalatest.junit.JUnitRunner

@RunWith(classOf[JUnitRunner])
class ContextFilterTest extends FunSuite {

test("parses Finagle-Ctx headers") {
val writtenDeadline = Deadline.ofTimeout(5.seconds)
val service =
new ClientContextFilter[Request, Response] andThen
new ServerContextFilter[Request, Response] andThen
Service.mk[Request, Response] { req =>
assert(Contexts.broadcast.get(Deadline).get == writtenDeadline)
Future.value(Response())
}

Contexts.broadcast.let(Deadline, writtenDeadline) {
val req = Request()
HttpContext.write(req)

// Clear the deadline value in the context
Contexts.broadcast.letClear(Deadline) {
// ensure the deadline was cleared
assert(Contexts.broadcast.get(Deadline) == None)

val rsp = Await.result(service(req))
assert(rsp.status == Status.Ok)
}
}
}

test("does not set incorrectly encoded context headers") {
val service =
new ClientContextFilter[Request, Response] andThen
new ServerContextFilter[Request, Response] andThen
Service.mk[Request, Response] { _ =>
assert(Contexts.broadcast.marshal.isEmpty)
Future.value(Response())
}

val req = Request()
req.headers().add("Finagle-Ctx-com.twitter.finagle.Deadline", "foo")

val rsp = Await.result(service(req))
assert(rsp.status == Status.Ok)
}
}

0 comments on commit 24fe827

Please sign in to comment.