Permalink
Browse files

! io: improve DynamicPipelines trait

- introduce `DynamicPipelines.State` type alias
- remove `initialPipelines` method
- introduce `process` method for easy wrapping of pipeline stage execution
  • Loading branch information...
sirthias committed Oct 16, 2013
1 parent e20bc7c commit 5f23219f7dcdde508fd0585c6bac9f859f892cc9
@@ -56,8 +56,7 @@ object BackPressureHandling {
new DynamicPipelines { effective
import context.log
- def initialPipeline =
- writeThrough(new OutQueue(ackRate), isReading = true, closeCommand = None)
+ become(writeThrough(new OutQueue(ackRate), isReading = true, closeCommand = None))
/**
* In this state all incoming write requests have already been relayed to the connection. There's a buffer
@@ -66,7 +65,7 @@ object BackPressureHandling {
* Invariant:
* * we've not experienced any failed writes
*/
- def writeThrough(out: OutQueue, isReading: Boolean, closeCommand: Option[Tcp.CloseCommand]): Pipelines = new Pipelines {
+ def writeThrough(out: OutQueue, isReading: Boolean, closeCommand: Option[Tcp.CloseCommand]): State = new State {
def resumeReading(): Unit = {
commandPL(Tcp.ResumeReading)
become(writeThrough(out, isReading = true, closeCommand))
@@ -128,13 +127,13 @@ object BackPressureHandling {
* The state where writing is suspended and we are waiting for WritingResumed. Reading will be suspended
* if it currently isn't and if the connection isn't already going to be closed.
*/
- def buffering(out: OutQueue, failedSeq: Int, isReading: Boolean, closeCommand: Option[CloseCommand]): Pipelines = {
+ def buffering(out: OutQueue, failedSeq: Int, isReading: Boolean, closeCommand: Option[CloseCommand]): State = {
def isClosing = closeCommand.isDefined
if (!isClosing && isReading) {
commandPL(Tcp.SuspendReading)
buffering(out, failedSeq, isReading = false, closeCommand)
- } else new Pipelines {
+ } else new State {
def commandPipeline = {
case w: Tcp.Write
if (isClosing) log.warning("Can't process more writes when closing. Dropping...")
@@ -177,7 +176,7 @@ object BackPressureHandling {
}
}
- def closed(): Pipelines = new Pipelines {
+ def closed(): State = new State {
def commandPipeline = {
case c @ (_: Tcp.Write | _: Tcp.CloseCommand) log.warning(s"Connection is already closed, dropping command $c")
case c commandPL(c)
@@ -41,9 +41,9 @@ object ConnectionTimeouts {
var idleDeadline = Timestamp.never
def resetDeadline() = idleDeadline = Timestamp.now + timeout
- def initialPipeline = atWork(writePossiblyPending = false)
+ become(atWork(writePossiblyPending = false))
- def atWork(writePossiblyPending: Boolean): Pipelines = new Pipelines {
+ def atWork(writePossiblyPending: Boolean): State = new State {
resetDeadline()
val commandPipeline: CPL = {
case write: Tcp.WriteCommand
@@ -64,7 +64,7 @@ object ConnectionTimeouts {
case ev eventPL(ev)
}
}
- def checkForPendingWrite(): Pipelines = new Pipelines {
+ def checkForPendingWrite(): State = new State {
resetDeadline()
commandPL(TestWrite)
@@ -40,18 +40,15 @@ object Pipelines {
}
trait DynamicPipelines extends Pipelines {
- def initialPipeline: Pipelines
- private[this] var _cpl: Pipeline[Command] = _
- private[this] var _epl: Pipeline[Event] = _
+ type State = Pipelines
+ private[this] var _cpl: Pipeline[Command] = Pipeline.Uninitialized
+ private[this] var _epl: Pipeline[Event] = Pipeline.Uninitialized
+
+ def commandPipeline = cmd process(cmd, _cpl)
+ def eventPipeline = ev process(ev, _epl)
+
+ protected def process[T](msg: T, pl: Pipeline[T]): Unit = pl(msg)
- def commandPipeline = {
- if (_cpl eq null) become(initialPipeline)
- cmd _cpl(cmd)
- }
- def eventPipeline = {
- if (_epl eq null) become(initialPipeline)
- event _epl(event)
- }
def become(newPipes: Pipelines): Unit = {
_cpl = newPipes.commandPipeline
_epl = newPipes.eventPipeline

0 comments on commit 5f23219

Please sign in to comment.