Skip to content

Commit

Permalink
finagle-base-http: external broadcast contexts to http headers
Browse files Browse the repository at this point in the history
Problem:
Finagle will not by default propagate broadcast contexts over http outside
of its internal defaults (Retries, BackupRequest, Deadline, and Tracing information). We'd like to
support the option to include external context types.

Solution:
Support external context types via LoadService. HttpContext will pickup
LoadableHttpContext via LoadService and by default use the
`Contexts.broadcast.Key` interface to create the header (key, value) pair.
The `HttpContext` interface can be overridden to provide a custom encoding &
decoding scheme.

Result:
New api internal to `com.twitter`, `trait HttpContext` & `LoadableHttpContext`.
We can make this a public api once we're fully confident in this approach.

JIRA Issues: CSL-8894, CSL-8154

Differential Revision: https://phabricator.twitter.biz/D380407
  • Loading branch information
hamdiallam authored and jenkins committed Oct 16, 2019
1 parent c335b29 commit cc29b26
Show file tree
Hide file tree
Showing 16 changed files with 395 additions and 121 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ Runtime Behavior Changes

* finagle: Upgrade to caffeine 2.8.0 ``PHAB_ID=D384592``

Breaking API Changes
~~~~~~~~~~~~~~~~~~~~

* finagle-base-http: `c.t.f.http.codec.HttpContext` moved into `c.t.f.http.codec.context.HttpContext`
``PHAB_ID=D380407``

19.10.0
-------

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.twitter.finagle.http.codec.context

import com.twitter.finagle.context.BackupRequest
import com.twitter.util.{Try, Throw}

private object HttpBackupRequest extends HttpContext {

type ContextKeyType = BackupRequest
val key = BackupRequest.Ctx

def toHeader(backupRequests: BackupRequest): String = "1"

def fromHeader(header: String): Try[BackupRequest] = {
header match {
case "1" => BackupRequest.ReturnBackupRequest
case _ =>
Throw(
new IllegalArgumentException(
"Could not extract BackupRequest from Buf. Expected \"1\""
)
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package com.twitter.finagle.http.codec.context

import com.twitter.finagle.context.Contexts
import com.twitter.finagle.http.{HeaderMap, Message}
import com.twitter.finagle.util.LoadService
import com.twitter.logging.{Level, Logger}
import com.twitter.util.{Return, Throw, Try}
import scala.collection.mutable
import scala.util.control.NonFatal

/**
* HttpContext is an interface to serialize finagle broadcast contexts
* into http headers. All HttpContext headers are prefixed with `Finagle-Ctx-`.
*
* See [[LoadableHttpContext]] to include custom finagle broadcast contexts via
* [[com.twitter.finagle.util.LoadService]].
*/
private[twitter] trait HttpContext {
type ContextKeyType

def key: Contexts.broadcast.Key[ContextKeyType]

def id: String = key.id

@volatile private[this] var _headerKey: String = null
final def headerKey: String = {
// There's a race condition here for setting _headerKey that
// will occur on startup and worst-case causes a few extra string
// allocations
if (_headerKey == null)
_headerKey = HttpContext.Prefix + id

_headerKey
}

def toHeader(value: ContextKeyType): String
def fromHeader(header: String): Try[ContextKeyType]

}

object HttpContext {

private val log = Logger(getClass.getName)

private[codec] val Prefix = "Finagle-Ctx-"

private[this] val knownContextTypes: Array[HttpContext] = {
Array[HttpContext](
HttpDeadline,
HttpRetries,
HttpBackupRequest
)
}

private[this] val loadedContextTypes: Array[HttpContext] = {
val loaded: mutable.ArrayBuffer[HttpContext] = scala.collection.mutable.ArrayBuffer.empty
val ctxKeys: mutable.Set[String] = mutable.Set.empty ++ knownContextTypes.map(_.headerKey).toSet

LoadService[LoadableHttpContext]().foreach { httpCtx =>
if (ctxKeys.contains(httpCtx.headerKey)) {
log.warning(s"skipping duplicate http header context key ${httpCtx.headerKey}")
} else {
ctxKeys += httpCtx.headerKey
loaded += httpCtx
}
}

loaded.toArray
}

// we differentiate between known and loaded context types for the `write` case, where the loaded
// types cannot be added to the HeaderMap using `addUnsafe`. The key/value pairs for these must be validated
private[this] val allContextTypes: Array[HttpContext] = knownContextTypes ++ loadedContextTypes

/**
* Remove the Deadline header from the Message. May be used
* when it is not desirable for clients to be able to set
* bogus or expired Deadline headers in an HTTP request.
*/
def removeDeadline(msg: Message): Unit =
msg.headerMap.remove(HttpDeadline.headerKey)

/**
* Read Finagle-Ctx header pairs from the given message for Contexts. This includes "Deadline",
* "Retries", and "BackupRequests" Finagle contexts, as well as any contexts loaded via
* [[LoadableHttpContext]]
*
* and run `fn`.
*/
private[http] def read[R](msg: Message)(fn: => R): R = {
var ctxValues: List[Contexts.broadcast.KeyValuePair[_]] = Nil

var i: Int = 0
while (i < allContextTypes.length) {
val contextType = allContextTypes(i)

// mutate `ctxValues` if the corresponding header is present and we can decode
// the header successfully
msg.headerMap.get(contextType.headerKey) match {
case Some(header) =>
contextType.fromHeader(header) match {
case Return(ctxVal) =>
ctxValues = Contexts.broadcast.KeyValuePair(contextType.key, ctxVal) :: ctxValues
case Throw(_) =>
if (log.isLoggable(Level.DEBUG))
log.debug(s"could not unmarshal ${contextType.key} from the header value")
}
case None =>
}

i += 1
}

Contexts.broadcast.let(ctxValues)(fn)
}

/**
* Write Finagle-Ctx header pairs to the given message for Contexts. This includes "Deadline",
* "Retries", and "BackupRequests" Finagle contexts, as well as any contexts loaded via
* [[LoadableHttpContext]]
*/
private[http] def write(msg: Message): Unit = {
var i = 0
while (i < knownContextTypes.length) {
val contextType = knownContextTypes(i)
writeToHeader(contextType, msg.headerMap, isSafe = true)

i += 1
}

i = 0
while (i < loadedContextTypes.length) {
val contextType = loadedContextTypes(i)
writeToHeader(contextType, msg.headerMap, isSafe = false)

i += 1
}
}

private[this] def writeToHeader(
contextType: HttpContext,
headerMap: HeaderMap,
isSafe: Boolean
): Unit = {
Contexts.broadcast.get(contextType.key) match {
case Some(ctxVal) =>
if (isSafe) {
headerMap.addUnsafe(contextType.headerKey, contextType.toHeader(ctxVal))
} else {
try {
headerMap.add(contextType.headerKey, contextType.toHeader(ctxVal))
} catch {
case NonFatal(exc) =>
if (log.isLoggable(Level.DEBUG))
log.debug(s"unable to add ${contextType.key} to the header")
}
}

case None =>
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.twitter.finagle.http.codec.context

import com.twitter.finagle.context.Deadline
import com.twitter.util.{Return, Throw, Time, Try}
import scala.util.control.NonFatal

private object HttpDeadline extends HttpContext {

type ContextKeyType = Deadline
val key = Deadline

def toHeader(deadline: Deadline): String = {
deadline.timestamp.inNanoseconds + " " + deadline.deadline.inNanoseconds
}

def fromHeader(header: String): Try[Deadline] = {
try {
val values = header.split(' ')
val timestamp = values(0).toLong
val deadline = values(1).toLong

Return(
Deadline(Time.fromNanoseconds(timestamp), Time.fromNanoseconds(deadline))
)
} catch {
case NonFatal(e) => Throw(e)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.twitter.finagle.http.codec.context

import com.twitter.finagle.context.{Contexts, Retries}
import com.twitter.util.{Return, Throw, Try}
import scala.util.control.NonFatal

private object HttpRetries extends HttpContext {

type ContextKeyType = Retries
val key: Contexts.broadcast.Key[Retries] = Retries

def toHeader(retries: Retries): String = {
retries.attempt.toString
}

def fromHeader(header: String): Try[Retries] = {
try {
Return(Retries(header.toInt))
} catch {
case NonFatal(e) => Throw(new NumberFormatException)
}
}
}
Loading

0 comments on commit cc29b26

Please sign in to comment.