Skip to content

Commit

Permalink
finagle-http: RecordSchema.Record is now thread-safe
Browse files Browse the repository at this point in the history
Problem

`c.t.f.http.collection.RecordSchema.Record` is not thread-safe, and
this differs from most other public facing APIs in Finagle.

Solution

Push the thread-safety down into `Record` to prefer correctness. It
is not likely that this code is used in any highly contended manner
across threads.

JIRA Issues: CSL-8270

Differential Revision: https://phabricator.twitter.biz/D325700
  • Loading branch information
kevinoliver authored and jenkins committed Jun 11, 2019
1 parent 40431bb commit 4e343f0
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 29 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ Bug Fixes
* finagle-http: Finagle now properly sets the `Transport.peerCertificate` local context
when using HTTP/2. ``PHAB_ID=D324392``

* finagle-http: `c.t.f.http.collection.RecordSchema.Record` is now thread-safe.
``PHAB_ID=D325700``

* finagle-zipkin-core: Fix a race condition which could cause a span to get logged
missing some annotations. ``PHAB_ID=D319367``

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,30 @@ import java.util.IdentityHashMap
*/
final class RecordSchema {

private[RecordSchema] final class Entry(var value: Any) {
var locked: Boolean = false
private[RecordSchema] final class Entry(@volatile var value: Any) {
@volatile var locked: Boolean = false
}

/**
* Record is an instance of a [[com.twitter.finagle.http.collection.RecordSchema RecordSchema]] declaration.
* Records are mutable; the `update` method assigns or reassigns a value to a given field. If
* the user requires that a field's assigned value is never reassigned later, the user can `lock`
* that field.
*
* '''Note that this implementation is not synchronized.''' If multiple threads access a record
* concurrently, and at least one of the threads modifies the record, it ''must'' be synchronized
* externally.
*/
final class Record private[RecordSchema] (
// note: modifications to `fields` must be synchronized as `IdentityHashMap` itself
// is not thread-safe.
fields: IdentityHashMap[Field[_], Entry] = new IdentityHashMap[Field[_], Entry]) {

private[this] def getOrInitializeEntry(field: Field[_]): Entry = {
var entry = fields.get(field)
if (entry eq null) {
entry = new Entry(field.default())
fields.put(field, entry)
fields.synchronized {
var entry = fields.get(field)
if (entry eq null) {
entry = new Entry(field.default())
fields.put(field, entry)
}
entry
}
entry
}

/**
Expand Down Expand Up @@ -99,15 +99,17 @@ final class RecordSchema {
*/
@throws(classOf[IllegalStateException])
def update[A](field: Field[A], value: A): Record = {
val entry = fields.get(field)
if (entry eq null) {
fields.put(field, new Entry(value))
} else if (entry.locked) {
throw new IllegalStateException(
s"attempt to assign $value to a locked field (with current value ${entry.value})"
)
} else {
entry.value = value
fields.synchronized {
val entry = fields.get(field)
if (entry eq null) {
fields.put(field, new Entry(value))
} else if (entry.locked) {
throw new IllegalStateException(
s"attempt to assign $value to a locked field (with current value ${entry.value})"
)
} else {
entry.value = value
}
}
this
}
Expand All @@ -131,16 +133,18 @@ final class RecordSchema {
update(field, value).lock(field)

private[this] def copyFields(): IdentityHashMap[Field[_], Entry] = {
val newFields = new IdentityHashMap[Field[_], Entry]
val iter = fields.entrySet().iterator()
while (iter.hasNext()) {
val kv = iter.next()
val entry = kv.getValue()
val newEntry = new Entry(entry.value)
newEntry.locked = entry.locked
newFields.put(kv.getKey(), newEntry)
fields.synchronized {
val newFields = new IdentityHashMap[Field[_], Entry]
val iter = fields.entrySet().iterator()
while (iter.hasNext()) {
val kv = iter.next()
val entry = kv.getValue()
val newEntry = new Entry(entry.value)
newEntry.locked = entry.locked
newFields.put(kv.getKey(), newEntry)
}
newFields
}
newFields
}

/**
Expand Down

0 comments on commit 4e343f0

Please sign in to comment.