Permalink
Browse files

! can, io: use util.Timestamp instead of longs for timeout checking

The breaking part is that ConnectionTimeouts pipeline stage will now
reset the timeout when it receives an `SetIdleTimeout` message.
  • Loading branch information...
jrudolph committed Sep 19, 2013
1 parent 87720e8 commit ab17f009a151a85eecd159d544ec0c942910fcd4
@@ -24,7 +24,7 @@ import spray.can.rendering.RequestPartRenderingContext
import spray.can.Http
import spray.http._
import spray.io._
-import System.{ nanoTime now }
+import spray.util.Timestamp
object ClientFrontend {
@@ -37,30 +37,32 @@ object ClientFrontend {
var requestTimeout = initialRequestTimeout
var closeCommanders = Set.empty[ActorRef]
+ def lastRequestComplete = openRequests.isEmpty || openRequests.last.state.isComplete
+
val commandPipeline: CPL = {
case Http.MessageCommand(HttpMessagePartWrapper(x: HttpRequest, ack)) if closeCommanders.isEmpty
- if (openRequests.isEmpty || openRequests.last.timestamp > 0) {
+ if (lastRequestComplete) {
render(x, ack)
- openRequests = openRequests enqueue new RequestRecord(x, context.sender, timestamp = now)
+ openRequests = openRequests enqueue new RequestRecord(x, context.sender, state = Complete(Timestamp.now))
} else log.warning("Received new HttpRequest before previous chunking request was " +
"finished, ignoring...")
case Http.MessageCommand(HttpMessagePartWrapper(x: ChunkedRequestStart, ack)) if closeCommanders.isEmpty
- if (openRequests.isEmpty || openRequests.last.timestamp > 0) {
+ if (lastRequestComplete) {
render(x, ack)
- openRequests = openRequests enqueue new RequestRecord(x, context.sender, timestamp = 0)
+ openRequests = openRequests enqueue new RequestRecord(x, context.sender, state = WaitingForChunkedEnd)
} else log.warning("Received new ChunkedRequestStart before previous chunking " +
"request was finished, ignoring...")
case Http.MessageCommand(HttpMessagePartWrapper(x: MessageChunk, ack)) if closeCommanders.isEmpty
- if (!openRequests.isEmpty && openRequests.last.timestamp == 0) {
+ if (!lastRequestComplete) {
render(x, ack)
} else log.warning("Received MessageChunk outside of chunking request context, ignoring...")
case Http.MessageCommand(HttpMessagePartWrapper(x: ChunkedMessageEnd, ack)) if closeCommanders.isEmpty
- if (!openRequests.isEmpty && openRequests.last.timestamp == 0) {
+ if (!lastRequestComplete) {
render(x, ack)
- openRequests.last.timestamp = now // only start timer once the request is completed
+ openRequests.last.state = Complete(Timestamp.now) // only start timer once the request is completed
} else log.warning("Received ChunkedMessageEnd outside of chunking request " +
"context, ignoring...")
@@ -127,7 +129,7 @@ object ClientFrontend {
def checkForTimeout(): Unit =
if (!openRequests.isEmpty && requestTimeout.isFinite) {
val rec = openRequests.head
- if (rec.timestamp > 0 && rec.timestamp + requestTimeout.toNanos < now) {
+ if (rec.state.isOverdue(requestTimeout)) {
log.warning("Request timed out after {}, closing connection", requestTimeout)
dispatch(rec.sender, Timedout(rec.request))
commandPL(Http.Close)
@@ -140,7 +142,19 @@ object ClientFrontend {
}
}
- private class RequestRecord(val request: HttpRequestPart with HttpMessageStart, val sender: ActorRef, var timestamp: Long)
+ sealed trait RequestState {
+ def isComplete: Boolean
+ def isOverdue(timeout: Duration): Boolean
+ }
+ case object WaitingForChunkedEnd extends RequestState {
+ def isComplete: Boolean = false
+ def isOverdue(timeout: Duration): Boolean = false
+ }
+ case class Complete(timestamp: Timestamp) extends RequestState {
+ def isComplete: Boolean = true
+ def isOverdue(timeout: Duration): Boolean = (timestamp + timeout).isPast
+ }
+ private class RequestRecord(val request: HttpRequestPart with HttpMessageStart, val sender: ActorRef, var state: RequestState)
private case class PartAndSender(part: HttpRequestPart, sender: ActorRef)
}
@@ -26,6 +26,7 @@ import spray.http._
import spray.can.Http
import spray.can.server.ServerFrontend.Context
import akka.io.Tcp
+import spray.util.Timestamp
sealed trait OpenRequest {
def context: ServerFrontend.Context
@@ -34,7 +35,7 @@ sealed trait OpenRequest {
def appendToEndOfChain(openRequest: OpenRequest): OpenRequest
def dispatchInitialRequestPartToHandler()
def dispatchNextQueuedResponse()
- def checkForTimeout(now: Long)
+ def checkForTimeout(now: Timestamp)
def nextIfNoAcksPending: OpenRequest
// commands
@@ -58,7 +59,7 @@ trait OpenRequestComponent { component ⇒
class DefaultOpenRequest(val request: HttpRequest,
private[this] val closeAfterResponseCompletion: Boolean,
- private[this] var timestamp: Long) extends OpenRequest {
+ private[this] var state: RequestState) extends OpenRequest {
private[this] val receiverRef = new ResponseReceiverRef(this)
private[this] var handler = context.handler
private[this] var nextInChain: OpenRequest = EmptyOpenRequest
@@ -79,7 +80,7 @@ trait OpenRequestComponent { component ⇒
request.copy(method = HttpMethods.GET)
else request
val partToDispatch: HttpRequestPart =
- if (timestamp == 0L) ChunkedRequestStart(requestToDispatch)
+ if (state == WaitingForChunkedEnd) ChunkedRequestStart(requestToDispatch)
else requestToDispatch
if (context.log.isDebugEnabled)
context.log.debug("Dispatching {} to handler {}", format(partToDispatch), handler)
@@ -93,22 +94,26 @@ trait OpenRequestComponent { component ⇒
}
}
- def checkForTimeout(now: Long): Unit = {
- if (timestamp > 0) {
- if (timestamp + requestTimeout.toNanos < now) {
- val timeoutHandler =
- if (settings.timeoutHandler.isEmpty) handler
- else context.actorContext.actorFor(settings.timeoutHandler)
- if (RefUtils.isLocal(timeoutHandler))
- downstreamCommandPL(Pipeline.Tell(timeoutHandler, Timedout(request), receiverRef))
- else context.log.warning("The TimeoutHandler '{}' is not a local actor and thus cannot be used as a " +
- "timeout handler", timeoutHandler)
- timestamp = -now // we record the time of the Timeout dispatch as negative timestamp value
- }
- } else if (timestamp < -1 && timeoutTimeout.isFinite() && (-timestamp + timeoutTimeout.toNanos < now)) {
- val response = timeoutResponse(request)
- // we always close the connection after a timeout-timeout
- sendPart(response.withHeaders(HttpHeaders.Connection("close") :: response.headers))
+ def checkForTimeout(now: Timestamp): Unit = {
+ state match {
+ case WaitingForChunkedEnd
+ case WaitingForResponse(timestamp)
+ if ((timestamp + requestTimeout).isPast) {
+ val timeoutHandler =
+ if (settings.timeoutHandler.isEmpty) handler
+ else context.actorContext.actorFor(settings.timeoutHandler)
+ if (RefUtils.isLocal(timeoutHandler))
+ downstreamCommandPL(Pipeline.Tell(timeoutHandler, Timedout(request), receiverRef))
+ else context.log.warning("The TimeoutHandler '{}' is not a local actor and thus cannot be used as a " +
+ "timeout handler", timeoutHandler)
+ state = WaitingForTimeoutResponse()
+ }
+ case WaitingForTimeoutResponse(timestamp)
+ if ((timestamp + timeoutTimeout).isPast) {
+ val response = timeoutResponse(request)
+ // we always close the connection after a timeout-timeout
+ sendPart(response.withHeaders(HttpHeaders.Connection("close") :: response.headers))
+ }
}
nextInChain checkForTimeout now // we accept non-tail recursion since HTTP pipeline depth is limited (and small)
}
@@ -131,7 +136,7 @@ trait OpenRequestComponent { component ⇒
}
def handleResponsePart(part: HttpMessagePartWrapper): Unit = {
- timestamp = 0L // disable request timeout checking once the first response part has come in
+ state = WaitingForChunkedEnd // disable request timeout checking once the first response part has come in
handler = context.actorContext.sender // remember who to send Closed events to
sendPart(part)
dispatchNextQueuedResponse()
@@ -154,7 +159,7 @@ trait OpenRequestComponent { component ⇒
def handleChunkedMessageEnd(part: ChunkedMessageEnd): Unit = {
if (nextInChain.isEmpty) {
// only start request timeout checking after request has been completed
- timestamp = System.nanoTime()
+ state = WaitingForResponse()
downstreamCommandPL(Pipeline.Tell(handler, part, receiverRef))
} else
// we accept non-tail recursion since HTTP pipeline depth is limited (and small)
@@ -206,7 +211,7 @@ trait OpenRequestComponent { component ⇒
def request = throw new IllegalStateException
def dispatchInitialRequestPartToHandler(): Unit = { throw new IllegalStateException }
def dispatchNextQueuedResponse(): Unit = {}
- def checkForTimeout(now: Long): Unit = {}
+ def checkForTimeout(now: Timestamp): Unit = {}
def nextIfNoAcksPending = throw new IllegalStateException
// commands
@@ -234,3 +239,8 @@ trait OpenRequestComponent { component ⇒
private[server] case class AckEventWithReceiver(ack: Any, receiver: ActorRef) extends Event
private[server] case class PartAndSender(part: HttpResponsePart, sender: ActorRef)
+
+private[server] sealed trait RequestState
+private[server] case object WaitingForChunkedEnd extends RequestState
+private[server] case class WaitingForResponse(timestamp: Timestamp = Timestamp.now) extends RequestState
+private[server] case class WaitingForTimeoutResponse(timestamp: Timestamp = Timestamp.now) extends RequestState
@@ -25,6 +25,7 @@ import spray.can.rendering.ResponsePartRenderingContext
import spray.can.Http
import spray.http._
import spray.io._
+import spray.util.Timestamp
object ServerFrontend {
@@ -109,10 +110,10 @@ object ServerFrontend {
}
else throw new NotImplementedError("fastPath is not yet supported with pipelining enabled")
- } else openNewRequest(request, closeAfterResponseCompletion, System.nanoTime())
+ } else openNewRequest(request, closeAfterResponseCompletion, WaitingForResponse())
case HttpMessageStartEvent(ChunkedRequestStart(request), closeAfterResponseCompletion)
- openNewRequest(request, closeAfterResponseCompletion, 0L)
+ openNewRequest(request, closeAfterResponseCompletion, WaitingForChunkedEnd)
case Http.MessageEvent(x: MessageChunk)
firstOpenRequest handleMessageChunk x
@@ -137,7 +138,7 @@ object ServerFrontend {
case TickGenerator.Tick
if (requestTimeout.isFinite())
- firstOpenRequest checkForTimeout System.nanoTime()
+ firstOpenRequest checkForTimeout Timestamp.now
eventPL(TickGenerator.Tick)
case Pipeline.ActorDeath(actor) if actor == context.handler
@@ -147,8 +148,8 @@ object ServerFrontend {
case ev eventPL(ev)
}
- def openNewRequest(request: HttpRequest, closeAfterResponseCompletion: Boolean, timestamp: Long): Unit = {
- val nextOpenRequest = new DefaultOpenRequest(request, closeAfterResponseCompletion, timestamp)
+ def openNewRequest(request: HttpRequest, closeAfterResponseCompletion: Boolean, state: RequestState): Unit = {
+ val nextOpenRequest = new DefaultOpenRequest(request, closeAfterResponseCompletion, state)
firstOpenRequest = firstOpenRequest appendToEndOfChain nextOpenRequest
nextOpenRequest.dispatchInitialRequestPartToHandler()
if (firstUnconfirmed.isEmpty) firstUnconfirmed = firstOpenRequest
@@ -22,12 +22,12 @@ import spray.can.rendering.ResponsePartRenderingContext
import spray.can.server.RequestParsing.HttpMessageStartEvent
import spray.io._
import spray.can.Http
-import spray.util.PaddedAtomicLong
+import spray.util.{ Timestamp, PaddedAtomicLong }
object StatsSupport {
class StatsHolder {
- val startTimestamp = System.nanoTime()
+ val startTimestamp = Timestamp.now
val requestStarts = new PaddedAtomicLong
val responseStarts = new PaddedAtomicLong
val maxOpenRequests = new PaddedAtomicLong
@@ -57,7 +57,7 @@ object StatsSupport {
}
def toStats = Stats(
- uptime = (System.nanoTime() - startTimestamp).nanos,
+ uptime = (Timestamp.now - startTimestamp).asInstanceOf[FiniteDuration],
totalRequests = requestStarts.get,
openRequests = requestStarts.get - responseStarts.get,
maxOpenRequests = maxOpenRequests.get,
@@ -16,10 +16,9 @@
package spray.io
-import scala.concurrent.duration.{ FiniteDuration, Deadline, Duration }
+import scala.concurrent.duration.Duration
import akka.io.Tcp
-import spray.util.requirePositive
-import System.{ nanoTime now }
+import spray.util.{ Timestamp, requirePositive }
object ConnectionTimeouts {
@@ -29,25 +28,20 @@ object ConnectionTimeouts {
new PipelineStage {
def apply(context: PipelineContext, commandPL: CPL, eventPL: EPL): Pipelines = new Pipelines {
var timeout = idleTimeout
- var lastActivity = now
+ var idleDeadline = Timestamp.never
+ def refreshDeadline() = idleDeadline = Timestamp.now + timeout
+ refreshDeadline()
val commandPipeline: CPL = {
- case x: Tcp.Write
- commandPL(x)
- lastActivity = now
-
- case SetIdleTimeout(x) timeout = x
-
+ case x: Tcp.Write commandPL(x); refreshDeadline()
+ case SetIdleTimeout(x) timeout = x; refreshDeadline()
case cmd commandPL(cmd)
}
val eventPipeline: EPL = {
- case x: Tcp.Received
- lastActivity = now
- eventPL(x)
-
+ case x: Tcp.Received refreshDeadline(); eventPL(x)
case tick @ TickGenerator.Tick
- if (timeout.isFinite && (lastActivity + timeout.toNanos < System.nanoTime())) {
+ if (idleDeadline.isPast) {
context.log.debug("Closing connection due to idle timeout...")
commandPL(Tcp.Close)
}

0 comments on commit ab17f00

Please sign in to comment.