Skip to content

Commit

Permalink
util: Use a state machine in Reader.fromSeq
Browse files Browse the repository at this point in the history
Problem

Some race conditions exist in `Reader.fromSeq`

Solution

Use a state machine to monitor the state and update `closep` only when state is
updated

Result

No more race conditions in `Reader.fromSeq`, and keep one state machine for
`Reader.fromFuture` and `Reader.fromSeq`

JIRA Issues: CSL-7659

Differential Revision: https://phabricator.twitter.biz/D280723
  • Loading branch information
jyanJing authored and jenkins committed Mar 19, 2019
1 parent f06508e commit db8ad2a
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 106 deletions.
104 changes: 104 additions & 0 deletions util-core/src/main/scala/com/twitter/io/FutureReader.scala
@@ -0,0 +1,104 @@
package com.twitter.io

import com.twitter.util.{Future, Promise, Return, Throw}
import java.util.concurrent.atomic.AtomicReference

/**
* We want to ensure that this reader always satisfies these invariants:
* 1. Satisfied closep is always aligned with the state
* 2. Reading from a discarded reader will always return ReaderDiscardedException
* 3. Reading from a fully read reader will always return None
* 4. Reading from a failed reader will always return the exception it has thrown
*
* We achieved this with a state machine where any access to the state is synchronized,
* and by ensuring that we always verify the update of state succeeded before setting closep,
* and by preventing changing the state when the reader is failed, fully read, or discarded
*/
private[io] final class FutureReader[A](fa: Future[A]) extends Reader[A] {
import FutureReader._

private[this] val closep = Promise[StreamTermination]()
private[this] val state = new AtomicReference[State](State.Idle)

def read(): Future[Option[A]] = {
state.get() match {
case State.Idle =>
if (state.compareAndSet(State.Idle, State.Reading)) {
fa.map(Some.apply).respond {
case t: Throw[_] =>
if (state.compareAndSet(State.Reading, State.Failed)) {
closep.update(t.cast[StreamTermination])
}
case Return(_) =>
// it is safe to return the value even when the `state` is not updated, which could happen
// when the `state` is updated to `Discarded` in `discard()`, in that case, we still retain
// the execution order because `read()` happens before `discard()`
state.compareAndSet(State.Reading, State.Read)
}
} else {
// when `state` is updated to `Reading` or `Discarded`
read()
}
case State.Reading =>
// multiple outstanding reads are not allowed
Future.exception(new IllegalStateException("read() while read is pending"))
case State.Read =>
// update would fail if `state` is already updated to `Discarded` in `discard()`
if (state.compareAndSet(State.Read, State.FullyRead)) {
closep.update(StreamTermination.FullyRead.Return)
}
closep.flatMap {
case StreamTermination.FullyRead => Future.None
case StreamTermination.Discarded => Future.exception(new ReaderDiscardedException)
}
case State.Failed =>
// closep is guaranteed to be an exception, flatMap should never be triggered but return the exception
closep.flatMap(_ => Future.None)
case State.FullyRead =>
Future.None
case State.Discarded =>
Future.exception(new ReaderDiscardedException)
}
}

def discard(): Unit = {
if (state.compareAndSet(State.Idle, State.Discarded) || state
.compareAndSet(State.Read, State.Discarded) || state.compareAndSet(
State.Reading,
State.Discarded)) {
closep.update(StreamTermination.Discarded.Return)
fa.raise(new ReaderDiscardedException)
}
}

def onClose: Future[StreamTermination] = closep
}

object FutureReader {

/**
* Indicates reader state when the reader is created via FutureReader
*/
sealed trait State
object State {

/** Indicates the reader is ready to be read. */
case object Idle extends State

/** Indicates the reader has been read. */
case object Read extends State

/** Indicates a reading is in progress. */
case object Reading extends State

/** Indicates an exception occurred during reading */
case object Failed extends State

/** Indicates the EOS has been observed. */
case object FullyRead extends State

/** Indicates the reader has been discarded. */
case object Discarded extends State

}
}
110 changes: 4 additions & 106 deletions util-core/src/main/scala/com/twitter/io/Reader.scala
Expand Up @@ -3,7 +3,6 @@ package com.twitter.io
import com.twitter.concurrent.AsyncStream
import com.twitter.util._
import java.io.{File, FileInputStream, InputStream}
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.mutable.ListBuffer

Expand Down Expand Up @@ -212,63 +211,9 @@ object Reader {
/**
* Construct a `Reader` from a `Future`
*
* We want to ensure that this reader always satisfies these invariants:
* 1. Satisfied closep is always aligned with the state
* 2. Reading from a discarded reader will always return ReaderDiscardedException
* 3. Reading from a fully read reader will always return None
* 4. Reading from an exceptioned reader will always return the exception it has thrown
*
* We achieved this with a state machine where any access to the state is synchronized,
* and by ensuring that we always verify the update of state succeeded before setting closep,
* and by preventing changing the state when the reader is exceptioned, fully read, or discarded
*
* @note Multiple outstanding reads are not allowed on this reader
*
*/
def fromFuture[A](fa: Future[A]): Reader[A] = new Reader[A] {

private[this] val closep = Promise[StreamTermination]()
private[this] val state = new AtomicReference[State](State.Idle)

final def read(): Future[Option[A]] = {
state.get() match {
case State.Idle =>
fa.map(Some.apply).respond {
case t: Throw[_] =>
if (state.compareAndSet(State.Idle, State.Exception)) {
closep.update(t.cast[StreamTermination])
}
case Return(_) =>
state.compareAndSet(State.Idle, State.Read)
}
case State.Read =>
if (state.compareAndSet(State.Read, State.FullyRead)) {
closep.update(StreamTermination.FullyRead.Return)
}
closep.flatMap {
case StreamTermination.FullyRead => Future.None
case StreamTermination.Discarded => Future.exception(new ReaderDiscardedException)
}
case State.Exception =>
/** closep is guaranteed to be an exception, flatMap should never be triggered but return the exception */
closep.flatMap(_ => Future.None)
case State.FullyRead =>
Future.None
case State.Discard =>
Future.exception(new ReaderDiscardedException)
}
}

final def discard(): Unit = {
if (state.compareAndSet(State.Idle, State.Discard) || state
.compareAndSet(State.Read, State.Discard)) {
closep.update(StreamTermination.Discarded.Return)
fa.raise(new ReaderDiscardedException)
}
}

final def onClose: Future[StreamTermination] = closep
}
def fromFuture[A](fa: Future[A]): Reader[A] = new FutureReader(fa)

/**
* Construct a `Reader` from a value `a`
Expand Down Expand Up @@ -363,37 +308,10 @@ object Reader {
*
* The resources held by the returned [[Reader]] are released on reading of EOF and
* [[Reader.discard()]].
*
* @note Multiple outstanding reads are not allowed on this reader.
*/
def fromSeq[A](seq: Seq[A]): Reader[A] = new Reader[A] { self =>
private[this] val closep = Promise[StreamTermination]()
private[this] var state: Try[Seq[A]] = Return(seq)

def read(): Future[Option[A]] = {
val result: Future[Option[A]] = self.synchronized {
state match {
case Return(Nil) => Future.None
case Return(head +: tail) =>
state = Return(tail)
Future.value(Some(head))
case t: Throw[_] =>
Future.const(t.cast[Option[A]])
}
}
if (result eq Future.None) {
closep.updateIfEmpty(StreamTermination.FullyRead.Return)
}
result
}

def discard(): Unit = {
self.synchronized {
state = Throw(new ReaderDiscardedException)
}
closep.updateIfEmpty(StreamTermination.Discarded.Return)
}

def onClose: Future[StreamTermination] = closep
}
def fromSeq[A](seq: Seq[A]): Reader[A] = new SeqReader(seq)

/**
* Allow [[AsyncStream]] to be consumed as a [[Reader]]
Expand Down Expand Up @@ -540,24 +458,4 @@ object Reader {
*/
def framed(r: Reader[Buf], framer: Buf => Seq[Buf]): Reader[Buf] = new Framed(r, framer)

private sealed trait State

private object State {

/** Indicates no actions are taken on the Reader. */
case object Idle extends State

/** Indicates the reader has been read once. */
case object Read extends State

/** Indicates an exception occurred during reading */
case object Exception extends State

/** Indicates the reader is fully read. */
case object FullyRead extends State

/** Indicates the reader has been discarded. */
case object Discard extends State
}

}
78 changes: 78 additions & 0 deletions util-core/src/main/scala/com/twitter/io/SeqReader.scala
@@ -0,0 +1,78 @@
package com.twitter.io

import com.twitter.util.{Future, Promise}

/**
* We want to ensure that this reader always satisfies these invariants:
* 1. Reading from a discarded reader will always return ReaderDiscardedException
* 2. Reading from a fully read reader will always return None
*
* We achieved this with a state machine where any access to the state is synchronized,
* and by preventing changing the state when the reader is fully read or discarded.
*/
private[io] final class SeqReader[A](seq: Seq[A]) extends Reader[A] {
import SeqReader._

private[this] val closep = Promise[StreamTermination]()
private[this] var value: Seq[A] = seq
private[this] var state: State = State.Idle

def read(): Future[Option[A]] = {
val result = synchronized {
state match {
case State.Idle =>
value match {
case head +: tail =>
value = tail
Future.value(Some(head))
case Nil =>
state = State.FullyRead
Future.None
}
case State.FullyRead =>
Future.None
case State.Discarded =>
Future.exception(new ReaderDiscardedException)
}
}

if (result.eq(Future.None))
closep.updateIfEmpty(StreamTermination.FullyRead.Return)

result
}

def discard(): Unit = {
val discarded = synchronized {
state match {
case State.Idle =>
state = State.Discarded
true
case _ => false
}
}
if (discarded) closep.updateIfEmpty(StreamTermination.Discarded.Return)

}

def onClose: Future[StreamTermination] = closep
}

object SeqReader {

/**
* Indicates reader state when the reader is created via SeqReader
*/
sealed trait State
object State {

/** Indicates the reader is ready to be read. */
case object Idle extends State

/** Indicates the reader is fully read. */
case object FullyRead extends State

/** Indicates the reader has been discarded. */
case object Discarded extends State
}
}

0 comments on commit db8ad2a

Please sign in to comment.