Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

finagle-base-http: add MapBackedHeaderMap #805

Closed
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ package com.twitter.finagle.http.headers
import com.twitter.finagle.http.HeaderMap

object DefaultHeaderMap {
def apply(headers: (String, String)*): HeaderMap = HashBackedHeaderMap(headers: _*)
def apply(headers: (String, String)*): HeaderMap = JTreeMapBackedHeaderMap(headers: _*)
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private final class HashBackedHeaderMap extends http.HeaderMap {
}
}

private object HashBackedHeaderMap {
object HashBackedHeaderMap {

/** Construct a new `HeaderMap` with the header list
*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package com.twitter.finagle.http.headers

import com.twitter.finagle.http.HeaderMap
import scala.annotation.tailrec
import scala.collection.mutable
import scala.collection.AbstractIterator
import com.twitter.finagle.http.HeaderMap

private[http] sealed class Header private (final val name: String, final val value: String)
extends HeaderMap.NameValue {
Expand All @@ -14,6 +16,21 @@ private[http] sealed class Header private (final val name: String, final val val
private[http] object Header {

final class Root private[Header] (name: String, value: String) extends Header(name, value) {
self =>

def iterator: Iterator[HeaderMap.NameValue] =
if (next == null) Iterator.single(this)
else {
new AbstractIterator[HeaderMap.NameValue] {
var cur: Header = self
martijnhoekstra marked this conversation as resolved.
Show resolved Hide resolved
def hasNext: Boolean = cur != null
def next(): HeaderMap.NameValue = {
var n = cur
cur = n.next
n
}
}
}

// We want to keep a reference to the last element of this linked list
// so we don't need to traverse the whole list to add an element.
Expand Down Expand Up @@ -49,4 +66,65 @@ private[http] object Header {
/** Create a new root node */
def root(name: String, value: String): Root =
new Root(name, value)

def uniqueNames(orig: Iterator[Header]): Iterator[String] = new Iterator[String] {
private[this] val it = orig
private[this] var current: Iterator[String] = Iterator.empty

private[this] def collectUnique(from: Header): Iterator[String] = new Iterator[String] {

// We need to keep `_next` current so that we can reliably determine `.hasNext`.
// That means that if `_next` is not null, it is, in fact, the next value.
private[this] var nextUnique = from

// We keep a set of observed names so that we don't accidentally return
// duplicates. Another way to have done this would be to simply add all names
// to a set and return an iterator over that set, but this way is more lazy.
private[this] var seen = Set.empty + from.name

def hasNext: Boolean = nextUnique != null

def next(): String = {
if (!hasNext) throw new NoSuchElementException

val result = nextUnique.name
// We then need to determine the next `_next` so we can
// again have a sane `.hasNext`.
prepareNext()
result
}

@tailrec
private[this] def prepareNext(): Unit = {
nextUnique = nextUnique.next
if (nextUnique != null) {
val prevSize = seen.size
seen = seen + nextUnique.name
if (seen.size == prevSize) {
// already observed. Try again.
prepareNext()
}
}
}
}

def hasNext: Boolean =
it.hasNext || current.hasNext

def next(): String = {
if (current.isEmpty) {
val hs = it.next()
if (hs.next == null) {
// A shortcut to see if we have only a single entry for this name
hs.name
} else {
// slow path: we need to make a sub-iterator
current = collectUnique(hs)
current.next()
}
} else {
current.next()
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ import scala.collection.mutable
// - iteration by gaining access to `entriesIterator` (protected method).
// - get/add functions by providing custom hashCode and equals methods for a key
private[http] final class HeadersHash extends mutable.HashMap[String, Header.Root] {

private def hashChar(c: Char): Int =
if (c >= 'A' && c <= 'Z') c + 32
else c
import HeadersHash._

// Adopted from Netty 3 HttpHeaders.
override protected def elemHashCode(key: String): Int = {
Expand Down Expand Up @@ -68,67 +65,9 @@ private[http] final class HeadersHash extends mutable.HashMap[String, Header.Roo
array.iterator
}

def uniqueNamesIterator: Iterator[String] = new Iterator[String] {
private[this] val it = copiedEntitiesIterator
private[this] var current: Iterator[String] = Iterator.empty

private[this] def collectUnique(from: Header): Iterator[String] = new Iterator[String] {

// We need to keep `_next` current so that we can reliably determine `.hasNext`.
// That means that if `_next` is not null, it is, in fact, the next value.
private[this] var nextUnique = from

// We keep a set of observed names so that we don't accidentally return
// duplicates. Another way to have done this would be to simply add all names
// to a set and return an iterator over that set, but this way is more lazy.
private[this] var seen = Set.empty + from.name

def hasNext: Boolean = nextUnique != null

def next(): String = {
if (!hasNext) throw new NoSuchElementException

val result = nextUnique.name
// We then need to determine the next `_next` so we can
// again have a sane `.hasNext`.
prepareNext()
result
}

@tailrec
private[this] def prepareNext(): Unit = {
nextUnique = nextUnique.next
if (nextUnique != null) {
val prevSize = seen.size
seen = seen + nextUnique.name
if (seen.size == prevSize) {
// already observed. Try again.
prepareNext()
}
}
}
}

def hasNext: Boolean =
it.hasNext || current.hasNext

def next(): String = {
if (current.isEmpty) {
val hs = it.next()
if (hs.next == null) {
// A shortcut to see if we have only a single entry for this name
hs.name
} else {
// slow path: we need to make a sub-iterator
current = collectUnique(hs)
current.next()
}
} else {
current.next()
}
}
}

def uniqueNamesIterator: Iterator[String] =
Header.uniqueNames(copiedEntitiesIterator)

def flattenIterator: Iterator[(String, String)] =
flattenedNameValueIterator.map(nv => (nv.name, nv.value))

Expand Down Expand Up @@ -181,3 +120,9 @@ private[http] final class HeadersHash extends mutable.HashMap[String, Header.Roo
def removeAll(key: String): Unit = remove(key)

}

object HeadersHash {
final def hashChar(c: Char): Int =
if (c >= 'A' && c <= 'Z') c + 32
else c
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package com.twitter.finagle.http.headers

import com.twitter.finagle.http.HeaderMap
import scala.annotation.tailrec
import java.util.function.BiConsumer
import scala.collection.AbstractIterator

/**
* Mutable, thread-safe [[HeaderMap]] implementation, backed by
* a mutable [[Map[String, Header]]], where the map key
* is forced to lower case
*/
final private class JTreeMapBackedHeaderMap extends HeaderMap {
import HeaderMap._

// In general, Map's that are not thread safe are not
// durable to concurrent modification and can result in infinite loops
// and exceptions.
// As such, we synchronize on the underlying collection when performing
// accesses to avoid this. In the common case of no concurrent access,
// this should be cheap.
val underlying: java.util.TreeMap[String, Header.Root] =
martijnhoekstra marked this conversation as resolved.
Show resolved Hide resolved
new java.util.TreeMap[String, Header.Root](JTreeMapBackedHeaderMap.SharedComparitor)
martijnhoekstra marked this conversation as resolved.
Show resolved Hide resolved

private def foreachConsumer[U](f: ((String, String)) => U):
BiConsumer[String, Header.Root] = new BiConsumer[String, Header.Root](){
def accept(key: String, header: Header.Root): Unit = header.iterator.foreach(
nv => f(nv.name, nv.value)
)
}

override def foreach[U](f: ((String, String)) => U): Unit =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was talking to @bryce-anderson and we think this might open us to ConcurrentModificationException issues, since we don't synchronize while iterating, and it iterates over the underlying map directly. it seems like we might have a few options here:

  1. instead of TreeMap use a ConcurrentSkipListMap
  2. synchronize foreach on underlying
  3. remove the optimization, instead synchronize and copy the iterator when we call foreach

The risk of synchronizing foreach is that we're going to lock on someone else's lambda, but it might be worth just doing it for now, since it will still let us use your optimization at very little cost in the common case (single-threaded access). On the other hand, it would be pretty cool to use a ConcurrentSkipListMap 😈. how do you feel about it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I wish I never removed the abstraction to swap out the underlying Map implementation.

But let's take a step back. How often do people really concurrently iterate over headers and also mutate the header map and don't synchronize that access because they don't care whether or not the modified data should or shouldn't be part of the iteration? Is that a use-case you want to facilitate in the first place?

Thinking about those questions, my conclusion is that throwing a ConcurrentModificationException that makes the user aware of the situation/race condition/potential bug is better than synchronizing and not being deterministic in what happens first. If the user really wants that for some unforeseen, unspoken and unspeakable reason, they can always still externally synchronize (maybe with a ReadWriteLock or something).

I think in terms of common usage patterns, that's the fastest and most reasonable thing to do too. Even if a ConcurrentSkipListMap would be pretty cool.

If we do want to get overboard and fancy, it should be validated with benchmarks that reflect real-world usage patterns, including the real-world concurrent patterns, because otherwise we're not really measuring anything close to reality.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the good outcome is that they get a ConcurrentModificationException–a bad outcome might be an infinite loop or worse. See https://ivoanjo.me/blog/2018/07/21/writing-to-a-java-treemap-concurrently-can-lead-to-an-infinite-loop-during-reads/

I think correctness needs to be our #1 concern. I agree with you that it's extremely unlikely–that's one of the reasons I'm willing to just wrap it in a synchronized block and call it a day.

I'm not sure I understand your concern about not being deterministic about what happens first–that's always going to be the case with concurrent use of a data structure. My concern is that HeaderMap used to be threadsafe, so people may have been already using it assuming it was threadsafe, and then one day we merge this in and their application breaks in an extremely subtle way. If this was a new abstraction, I would be open to considering, "HeaderMap is not threadsafe, all concurrent access must be synchronized by the user" but I don't think that's the right way forward given the circumstances.

My bet is the same as yours, that it's extremely unusual to access it concurrently, so I don't think we need to benchmark it. But I do think that we should protect against it. Locking is pretty cheap when there's no concurrent access, so I don't think it should be that painful to just lock it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jikes! I didn't realize the concurrent case could lead to non-termination. That makes not synchronizing a non-starter, and the below just an aside for academic curiosity.

The concern about concurrent iteration and modification (specifically those two) is that the concept of iteration of all headers and concurrently modifying those headers makes no sense -- either you want those in or out, but you always care, or you wouldn't iteratate them. You must synchronise externally for the operation to be meaningful and correct.

That's not always the case for two concurrent modifications that will never modify the same key, or concurrently getting a key, and modifying some keys that you know to be different to the keys you're concurrently getting.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've run into this non-termination before... It was a nightmare, especially considering it was happening on a Netty thread.

Normally I'd agree that we'd prefer a ConcurrentModificationException and we've tried that in the past but many users are doing the wrong thing in benign places (think reusing a request to do warmup requests or something) and it just wasn't worth the hassle to introduce the behavior change.

I think the solution you've already committed of simply synchronizing the foreach call is fine since you're right: this shouldn't be happening very often, if at all. 🤞

underlying.forEach(foreachConsumer(f))

// ---- HeaderMap -----

// Validates key and value.
def add(key: String, value: String): this.type = {
validateName(key)
addUnsafe(key, foldReplacingValidateValue(key, value))
}

// Validates key and value.
def set(key: String, value: String): this.type = {
validateName(key)
setUnsafe(key, foldReplacingValidateValue(key, value))
}

// ---- Map/MapLike -----

def -=(key: String): this.type = removed(key)
def +=(kv: (String, String)): this.type = set(kv._1, kv._2)

/**
* Underlying headers eagerly copied to an array, without synchronizing
* on the underlying collection.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe add a note that since it's not synchronized, it must be synchronized by the calling method? I think it might be safe to always synchronize here, since java synchronized does reentrancy pretty cheaply (I think)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the note -- I also think that re-entrance is pretty cheap, but I wasn't entirely sure, and it's called only a handful of times, and only private[this] I'm happy to change it if you see those trade-offs differently.

*/
private[this] def copyHeaders: Iterator[Header.Root] = underlying.values.toArray(new Array[Header.Root](underlying.size)).iterator

def iterator: Iterator[(String, String)] =
nameValueIterator.map(nv => (nv.name, nv.value))

override def keys: Set[String] = keysIterator.toSet

override def keysIterator: Iterator[String] = underlying.synchronized {
Header.uniqueNames(copyHeaders)
}

private[finagle] final override def nameValueIterator: Iterator[HeaderMap.NameValue] =
underlying.synchronized {
copyHeaders.flatMap(_.iterator)
}

def getAll(key: String): Seq[String] = underlying.synchronized {
underlying.get(key) match {
case null => Nil
case r: Header.Root => r.values
}
}

// Does not validate key and value.
def addUnsafe(key: String, value: String): this.type = underlying.synchronized {
underlying.get(key) match {
case null => underlying.put(key, Header.root(key, value))
case h => h.add(key, value)
}
this
}

// Does not validate key and value.
def setUnsafe(key: String, value: String): this.type = underlying.synchronized {
underlying.put(key, Header.root(key, value))
this
}

def get(key: String): Option[String] = underlying.synchronized {
underlying.get(key) match {
case null => None
case h => Some(h.value)
}
}

def removed(key: String): this.type = underlying.synchronized {
underlying.remove(key)
this
}
}


object JTreeMapBackedHeaderMap {

val SharedComparitor = new java.util.Comparator[String] {
martijnhoekstra marked this conversation as resolved.
Show resolved Hide resolved
def compare(key1: String, key2: String): Int = {
// Shorter strings are always less, regardless of their content
val lenthDiff = key1.length - key2.length
if (lenthDiff != 0) lenthDiff
else {
@tailrec
def go(i: Int): Int = {
if (i == key1.length) 0 // end, they are equal.
else {
val char1 = HeadersHash.hashChar(key1.charAt(i))
val char2 = HeadersHash.hashChar(key2.charAt(i))
val diff = char1 - char2
if (diff == 0) go(i + 1)
else diff
}
}
go(0)
}
}
}

def apply(headers: (String, String)*): HeaderMap = {
val result = new JTreeMapBackedHeaderMap
headers.foreach(t => result.add(t._1, t._2))
result
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package com.twitter.finagle.http.headers

import com.twitter.finagle.http.HeaderMap

class DefaultHeaderMapTest extends AbstractHeaderMapTest {
final def newHeaderMap(headers: (String, String)*): HeaderMap = DefaultHeaderMap(headers: _*)
class HashBackedHeaderMapTest extends AbstractHeaderMapTest {
final def newHeaderMap(headers: (String, String)*): HeaderMap = HashBackedHeaderMap(headers: _*)
}

class JTreeMapBackedHeaderMapTest extends AbstractHeaderMapTest {
final def newHeaderMap(headers: (String, String)*): HeaderMap = JTreeMapBackedHeaderMap(headers: _*)
}