Permalink
Browse files

! io: abort connection on idle-timeout, fixes #539

Also make sure idle-timeout only triggers if no write is currently
pending.
  • Loading branch information...
jrudolph committed Sep 27, 2013
1 parent 681485f commit 76345ba3313fc52424f3b3a5a92f575c438f6656
@@ -22,6 +22,7 @@ import org.specs2.time.NoTimeConversions
import akka.io.Tcp
import akka.util.ByteString
import spray.testkit.Specs2PipelineStageTest
+import akka.io.Tcp.CommandFailed
class ConnectionTimeoutsSpec extends Specification with Specs2PipelineStageTest with NoTimeConversions {
val stage = ConnectionTimeouts(200.millis)
@@ -46,7 +47,7 @@ class ConnectionTimeoutsSpec extends Specification with Specs2PipelineStageTest
connectionActor ! Tcp.Received(testData)
Thread.sleep(210)
connectionActor ! TickGenerator.Tick
- commands.expectMsg(Tcp.Close)
+ commands.expectMsg(Tcp.Abort)
}
"reset the idle timer on Received events" in new Fixture(stage) {
@@ -56,12 +57,28 @@ class ConnectionTimeoutsSpec extends Specification with Specs2PipelineStageTest
commands.expectNoMsg(100.millis)
}
- "reset the idle timer on Send commands" in new Fixture(stage) {
+ "start the idle timer when no writes are pending any more" in new Fixture(stage) {
Thread.sleep(210)
- connectionActor ! Tcp.Write(testData)
+ object customAck extends Tcp.Event
+ val write = Tcp.Write(testData)
+ connectionActor ! write
connectionActor ! TickGenerator.Tick
- commands.expectMsg(Tcp.Write(testData))
+ commands.expectMsg(write)
+ commands.expectNoMsg(210.millis)
+ connectionActor ! TickGenerator.Tick
+ commands.expectMsg(ConnectionTimeouts.TestWrite)
commands.expectNoMsg(100.millis)
+
+ // former write is still pending
+ connectionActor ! CommandFailed(ConnectionTimeouts.TestWrite)
+ commands.expectNoMsg(210.millis)
+ connectionActor ! TickGenerator.Tick
+ commands.expectMsg(ConnectionTimeouts.TestWrite)
+ commands.expectNoMsg(100.millis)
+ connectionActor ! ConnectionTimeouts.NoWritePending
+ commands.expectNoMsg(210.millis)
+ connectionActor ! TickGenerator.Tick
+ commands.expectMsg(Tcp.Abort)
}
}
}
@@ -19,36 +19,76 @@ package spray.io
import scala.concurrent.duration.Duration
import akka.io.Tcp
import spray.util.{ Timestamp, requirePositive }
+import akka.io.Tcp._
+import akka.util.ByteString
+/**
+ * A pipeline stage that will abort a connection after an idle timeout has elapsed.
+ * The idle timer is not exact but will abort the connection earliest when the timeout
+ * has passed after these events:
+ * - the last Tcp.Received message was received
+ * - no Write was pending according to an empty test write sent after the last Write
+ * - a new timeout was set
+ */
object ConnectionTimeouts {
def apply(idleTimeout: Duration): PipelineStage = {
requirePositive(idleTimeout)
new PipelineStage {
- def apply(context: PipelineContext, commandPL: CPL, eventPL: EPL): Pipelines = new Pipelines {
+ def apply(context: PipelineContext, commandPL: CPL, eventPL: EPL): Pipelines = new DynamicPipelines { outer
var timeout = idleTimeout
var idleDeadline = Timestamp.never
- def refreshDeadline() = idleDeadline = Timestamp.now + timeout
- refreshDeadline()
+ def resetDeadline() = idleDeadline = Timestamp.now + timeout
- val commandPipeline: CPL = {
- case x: Tcp.Write commandPL(x); refreshDeadline()
- case SetIdleTimeout(x) timeout = x; refreshDeadline()
- case cmd commandPL(cmd)
- }
+ def initialPipeline = atWork(writePossiblyPending = false)
- val eventPipeline: EPL = {
- case x: Tcp.Received refreshDeadline(); eventPL(x)
- case tick @ TickGenerator.Tick
- if (idleDeadline.isPast) {
- context.log.debug("Closing connection due to idle timeout...")
- commandPL(Tcp.Close)
- }
- eventPL(tick)
+ def atWork(writePossiblyPending: Boolean): Pipelines = new Pipelines {
+ resetDeadline()
+ val commandPipeline: CPL = {
+ case write: Tcp.WriteCommand
+ commandPL(write)
+ become(atWork(writePossiblyPending = true))
+ case SetIdleTimeout(newTimeout) timeout = newTimeout; resetDeadline()
+ case cmd commandPL(cmd)
+ }
+ val eventPipeline: EPL = {
+ case x: Tcp.Received resetDeadline(); eventPL(x)
+ case tick @ TickGenerator.Tick
+ if (idleDeadline.isPast && writePossiblyPending) become(checkForPendingWrite())
+ else shutdownIfIdle()
- case ev eventPL(ev)
+ eventPL(tick)
+
+ case CommandFailed(TestWrite) | NoWritePending // ignore
+ case ev eventPL(ev)
+ }
}
+ def checkForPendingWrite(): Pipelines = new Pipelines {
+ resetDeadline()
+ commandPL(TestWrite)
+
+ def commandPipeline = {
+ case write: Tcp.WriteCommand become(atWork(writePossiblyPending = true)); outer.commandPipeline(write)
+ case SetIdleTimeout(newTimeout) timeout = newTimeout; resetDeadline()
+ case cmd commandPL(cmd)
+ }
+ def eventPipeline = {
+ case r: Tcp.Received resetDeadline(); outer.eventPipeline(r)
+ case CommandFailed(TestWrite) become(atWork(writePossiblyPending = true)) // there's a write still pending
+ case NoWritePending become(atWork(writePossiblyPending = false))
+ case tick @ TickGenerator.Tick // happens only if connection actor is too busy to react
+ shutdownIfIdle()
+ eventPL(tick)
+ case ev eventPL(ev)
+ }
+ }
+
+ def shutdownIfIdle(): Unit =
+ if (idleDeadline.isPast) {
+ context.log.debug("Closing connection due to idle timeout...")
+ commandPL(Tcp.Abort)
+ }
}
}
}
@@ -58,4 +98,7 @@ object ConnectionTimeouts {
case class SetIdleTimeout(timeout: Duration) extends Command {
requirePositive(timeout)
}
+
+ private[io] case object NoWritePending extends Event
+ private[io] val TestWrite = Tcp.Write(ByteString.empty, NoWritePending)
}

0 comments on commit 76345ba

Please sign in to comment.