Permalink
Browse files

! can: require services to respond to ChunkedRequestStart with Regist…

…erChunkHandler, fixes #473
  • Loading branch information...
jrudolph committed Oct 14, 2013
1 parent 979fc31 commit d738e54fc7d0b553d546aee9447ce4913e501745
@@ -10,15 +10,12 @@ import spray.util._
import spray.http._
import HttpMethods._
import MediaTypes._
+import spray.can.Http.RegisterChunkHandler
class DemoService extends Actor with ActorLogging {
implicit val timeout: Timeout = 1.second // for the actor 'asks'
import context.dispatcher // ExecutionContext for the futures and scheduler
- // a map from connections to chunk handlers
- // will be created on ChunkedRequestStart and removed on ChunkedRequestEnd
- var chunkHandlers = Map.empty[ActorRef, ActorRef]
-
def receive = {
// when a new connection comes in we register ourselves as the connection handler
case _: Http.Connected => sender ! Http.Register(self)
@@ -56,15 +53,9 @@ class DemoService extends Actor with ActorLogging {
r.asPartStream().foreach(self.tell(_, sender))
case s@ChunkedRequestStart(HttpRequest(POST, Uri.Path("/file-upload"), _, _, _)) =>
- require(!chunkHandlers.contains(sender))
val client = sender
val handler = context.actorOf(Props(new FileUploadHandler(client, s)))
- chunkHandlers += (client -> handler)
- handler.tell(s, client)
- case c: MessageChunk => chunkHandlers(sender).tell(c, sender)
- case e: ChunkedMessageEnd =>
- chunkHandlers(sender).tell(e, sender)
- chunkHandlers -= sender
+ sender ! RegisterChunkHandler(handler)
case _: HttpRequest => sender ! HttpResponse(status = 404, entity = "Unknown resource!")
@@ -22,7 +22,7 @@ import akka.actor.{ ActorRef, Status, ActorSystem }
import akka.io.IO
import akka.testkit.TestProbe
import spray.can.Http
-import spray.can.Http.ClientConnectionType
+import spray.can.Http.{ RegisterChunkHandler, ClientConnectionType }
import spray.io.ClientSSLEngineProvider
import spray.util.Utils._
import spray.httpx.RequestBuilding._
@@ -36,6 +36,7 @@ class SprayCanClientSpec extends Specification {
akka.loglevel = ERROR
akka.io.tcp.trace-logging = off
spray.can.client.request-timeout = 500ms
+ spray.can.server.verbose-error-messages = on
spray.can.host-connector.max-retries = 1
spray.can.host-connector.idle-timeout = infinite
spray.can.host-connector.client.request-timeout = 500ms
@@ -66,10 +67,14 @@ class SprayCanClientSpec extends Specification {
client.send(clientConnection, Get("/def") ~> Host(hostname, port))
val server = acceptConnection()
+ val chunkHandler = TestProbe()
+
server.expectMsgType[ChunkedRequestStart].request.uri.path.toString === "/abc"
- server.expectMsg(MessageChunk("123"))
- server.expectMsg(MessageChunk("456"))
- server.expectMsg(ChunkedMessageEnd)
+ server.reply(RegisterChunkHandler(chunkHandler.ref))
+
+ chunkHandler.expectMsg(MessageChunk("123"))
+ chunkHandler.expectMsg(MessageChunk("456"))
+ chunkHandler.expectMsg(ChunkedMessageEnd)
val firstRequestSender = server.sender
server.expectMsgType[HttpRequest].uri.path.toString === "/def"
server.reply(HttpResponse(entity = "ok-def")) // reply to the second request first
@@ -31,6 +31,7 @@ import spray.util.Utils.temporaryServerHostnameAndPort
import spray.httpx.RequestBuilding._
import spray.http._
import HttpProtocols._
+import spray.can.Http.RegisterChunkHandler
import spray.can.client.ClientConnectionSettings
class SprayCanServerSpec extends Specification with NoTimeConversions {
@@ -96,21 +97,27 @@ class SprayCanServerSpec extends Specification with NoTimeConversions {
val probe = sendRequest(connection, ChunkedRequestStart(Get("/abc")))
serverHandler.expectMsgType[ChunkedRequestStart].request.uri === Uri(s"http://$hostname:$port/abc")
probe.send(connection, MessageChunk("123"))
+
+ val chunkHandler = TestProbe()
+ serverHandler.reply(RegisterChunkHandler(chunkHandler.ref))
+
probe.send(connection, MessageChunk("456"))
- serverHandler.expectMsg(MessageChunk("123"))
- serverHandler.expectMsg(MessageChunk("456"))
+
+ chunkHandler.expectMsg(MessageChunk("123"))
+ chunkHandler.expectMsg(MessageChunk("456"))
probe.send(connection, ChunkedMessageEnd)
- serverHandler.expectMsg(ChunkedMessageEnd)
+ chunkHandler.expectMsg(ChunkedMessageEnd)
- serverHandler.reply(ChunkedResponseStart(HttpResponse(entity = "yeah")))
- serverHandler.reply(MessageChunk("234"))
- serverHandler.reply(MessageChunk("345"))
- serverHandler.reply(ChunkedMessageEnd)
+ chunkHandler.reply(ChunkedResponseStart(HttpResponse(entity = "yeah")))
+ chunkHandler.reply(MessageChunk("234"))
+ chunkHandler.reply(MessageChunk("345"))
+ chunkHandler.reply(ChunkedMessageEnd)
probe.expectMsgType[ChunkedResponseStart].response.entity === HttpEntity.Empty
probe.expectMsg(MessageChunk("yeah"))
probe.expectMsg(MessageChunk("234"))
probe.expectMsg(MessageChunk("345"))
probe.expectMsg(ChunkedMessageEnd)
+ serverHandler.expectNoMsg()
}
"maintain response order for pipelined requests" in new TestSetup {
@@ -47,6 +47,12 @@ spray.can {
# Set to `infinite` to disable timeout timeouts.
timeout-timeout = 2 s
+ # The time during which a service must respond to a `ChunkedRequestStart` message
+ # with a `RegisterChunkHandler` message and during which incoming request chunks
+ # have to be buffered. If the timeout is triggered the connection is immediately
+ # aborted to avoid memory being overflowed with buffered chunks.
+ chunkhandler-registration-timeout = 500 ms
+
# The path of the actor to send `spray.http.Timedout` messages to.
# If empty all `Timedout` messages will go to the "regular" request
# handling actor.
@@ -88,6 +88,7 @@ object Http extends ExtensionKey[HttpExt] {
case class Register(handler: ActorRef,
keepOpenOnPeerClosed: Boolean = false,
fastPath: FastPath = EmptyFastPath) extends Command
+ case class RegisterChunkHandler(handler: ActorRef) extends Command
case class Unbind(timeout: Duration) extends Command
object Unbind extends Unbind(Duration.Zero)
@@ -42,12 +42,15 @@ private sealed trait OpenRequest {
def handleResponseEndAndReturnNextOpenRequest(part: HttpMessagePartWrapper): OpenRequest
def handleResponsePart(part: HttpMessagePartWrapper)
def enqueueCommand(command: Command)
+ def registerChunkHandler(handler: ActorRef)
// events
def handleMessageChunk(chunk: MessageChunk)
def handleChunkedMessageEnd(part: ChunkedMessageEnd)
def handleSentAckAndReturnNextUnconfirmed(ev: AckEventWithReceiver): OpenRequest
def handleClosed(ev: Http.ConnectionClosed)
+
+ def isWaitingForChunkHandler: Boolean
}
private trait OpenRequestComponent { component
@@ -80,8 +83,10 @@ private trait OpenRequestComponent { component ⇒
request.copy(method = HttpMethods.GET)
else request
val partToDispatch: HttpRequestPart =
- if (state == WaitingForChunkedEnd) ChunkedRequestStart(requestToDispatch)
- else requestToDispatch
+ state match {
+ case _: WaitingForChunkHandler ChunkedRequestStart(requestToDispatch)
+ case _ requestToDispatch
+ }
if (context.log.isDebugEnabled)
context.log.debug("Dispatching {} to handler {}", format(partToDispatch), handler)
downstreamCommandPL(Pipeline.Tell(handler, partToDispatch, receiverRef))
@@ -96,9 +101,15 @@ private trait OpenRequestComponent { component ⇒
def checkForTimeout(now: Timestamp): Unit = {
state match {
- case WaitingForChunkedEnd
+ case w: WaitingForChunkHandler
+ if ((w.timestamp + settings.chunkHandlerRegistrationTimeout).isPast(now)) {
+ context.log.warning("A chunk handler wasn't registered timely. Aborting the connection.")
+ downstreamCommandPL(Tcp.Abort)
+ }
+ case _: ReceivingRequestChunks
+ case StreamingResponseChunks
case WaitingForResponse(timestamp)
- if ((timestamp + requestTimeout).isPast) {
+ if ((timestamp + requestTimeout).isPast(now)) {
val timeoutHandler =
if (settings.timeoutHandler.isEmpty) handler
else context.actorContext.actorFor(settings.timeoutHandler)
@@ -109,7 +120,7 @@ private trait OpenRequestComponent { component ⇒
state = WaitingForTimeoutResponse()
}
case WaitingForTimeoutResponse(timestamp)
- if ((timestamp + timeoutTimeout).isPast) {
+ if ((timestamp + timeoutTimeout).isPast(now)) {
val response = timeoutResponse(request)
// we always close the connection after a timeout-timeout
sendPart(response.withHeaders(HttpHeaders.Connection("close") :: response.headers))
@@ -136,7 +147,7 @@ private trait OpenRequestComponent { component ⇒
}
def handleResponsePart(part: HttpMessagePartWrapper): Unit = {
- state = WaitingForChunkedEnd // disable request timeout checking once the first response part has come in
+ state = StreamingResponseChunks // disable request timeout checking once the first response part has come in
handler = context.actorContext.sender // remember who to send Closed events to
sendPart(part)
dispatchNextQueuedResponse()
@@ -146,25 +157,36 @@ private trait OpenRequestComponent { component ⇒
responseQueue = responseQueue enqueue command
}
- /***** EVENTS *****/
+ def registerChunkHandler(handler: ActorRef): Unit = {
+ def dispatch(part: HttpRequestPart) = downstreamCommandPL(Pipeline.Tell(handler, part, receiverRef))
- def handleMessageChunk(chunk: MessageChunk): Unit = {
- if (nextInChain.isEmpty)
- downstreamCommandPL(Pipeline.Tell(handler, chunk, receiverRef))
- else
- // we accept non-tail recursion since HTTP pipeline depth is limited (and small)
- nextInChain handleMessageChunk chunk
+ state =
+ state match {
+ case WaitingForChunkHandlerBuffering(_, receiveds) receiveds.foreach(dispatch); ReceivingRequestChunks(handler)
+ case WaitingForChunkHandlerReceivedAll(_, receiveds) receiveds.foreach(dispatch); WaitingForResponse()
+ case x throw new IllegalStateException("Didn't expect " + x)
+ }
}
- def handleChunkedMessageEnd(part: ChunkedMessageEnd): Unit = {
- if (nextInChain.isEmpty) {
- // only start request timeout checking after request has been completed
- state = WaitingForResponse()
- downstreamCommandPL(Pipeline.Tell(handler, part, receiverRef))
- } else
- // we accept non-tail recursion since HTTP pipeline depth is limited (and small)
- nextInChain handleChunkedMessageEnd part
- }
+ /***** EVENTS *****/
+
+ def handleMessageChunk(chunk: MessageChunk): Unit =
+ state match {
+ case WaitingForChunkHandlerBuffering(timeout, receiveds) state = WaitingForChunkHandlerBuffering(timeout, receiveds.enqueue(chunk))
+ case ReceivingRequestChunks(chunkHandler) downstreamCommandPL(Pipeline.Tell(chunkHandler, chunk, receiverRef))
+ case x if nextInChain.isEmpty throw new IllegalArgumentException(s"$this Didn't expect message chunks in state $state")
+ case _ nextInChain handleMessageChunk chunk
+ }
+
+ def handleChunkedMessageEnd(part: ChunkedMessageEnd): Unit =
+ state match {
+ case WaitingForChunkHandlerBuffering(timeout, receiveds) state = WaitingForChunkHandlerReceivedAll(timeout, receiveds.enqueue(part))
+ case ReceivingRequestChunks(chunkHandler)
+ state = WaitingForResponse()
+ downstreamCommandPL(Pipeline.Tell(chunkHandler, part, receiverRef))
+ case x if nextInChain.isEmpty throw new IllegalArgumentException(s"$this Didn't expect ChunkedMessageEnd in state $state")
+ case _ nextInChain handleChunkedMessageEnd part
+ }
def handleSentAckAndReturnNextUnconfirmed(ev: AckEventWithReceiver) = {
downstreamCommandPL(Pipeline.Tell(ev.receiver, ev.ack, receiverRef))
@@ -202,6 +224,8 @@ private trait OpenRequestComponent { component ⇒
case MessageChunk(body, _) body.length.toString + " byte request chunk"
case x x.toString
}
+
+ def isWaitingForChunkHandler: Boolean = state.isInstanceOf[WaitingForChunkHandler]
}
object EmptyOpenRequest extends OpenRequest {
@@ -223,6 +247,9 @@ private trait OpenRequestComponent { component ⇒
def enqueueCommand(command: Command): Unit = {}
+ def registerChunkHandler(handler: ActorRef): Unit =
+ throw new IllegalStateException("Received RegisterChunkHandler for non-existing request")
+
// events
def handleMessageChunk(chunk: MessageChunk): Unit = { throw new IllegalStateException }
def handleChunkedMessageEnd(part: ChunkedMessageEnd): Unit = { throw new IllegalStateException }
@@ -233,14 +260,63 @@ private trait OpenRequestComponent { component ⇒
def handleClosed(ev: Http.ConnectionClosed): Unit = {
downstreamCommandPL(Pipeline.Tell(context.handler, ev, context.self))
}
+
+ def isWaitingForChunkHandler: Boolean = false
}
}
private case class AckEventWithReceiver(ack: Any, receiver: ActorRef) extends Event
private case class PartAndSender(part: HttpResponsePart, sender: ActorRef)
+/**
+ * The state of a request. State transformations:
+ *
+ * Initial state:
+ * -> WaitingForChunkHandlerBuffering: if incoming request is chunked
+ * -> WaitingForResponse: if request was received completely
+ *
+ * WaitingForChunkHandlerBuffering:
+ * -> WaitingForChunkHandlerReceivedAll: if ChunkedMessageEnd was received before chunk handler was registered
+ * -> ReceivingRequestChunks: after chunk handler was registered
+ *
+ * WaitingForChunkHandlerReceivedAll:
+ * -> WaitingForResponse: after chunk handler was registered
+ *
+ * ReceivingRequestChunks:
+ * -> WaitingForResponse: after ChunkedMessageEnd was received and dispatched
+ *
+ * WaitingForResponse:
+ * -> -finish-: if complete response was received
+ * -> StreamingResponseChunks: after a ChunkedResponseStart was sent
+ * -> WaitingForTimeoutResponse: if the timeout triggered before a response (start) was produced
+ *
+ * StreamingResponseStart:
+ * -> -finish-: if ChunkedMessageEnd was sent
+ *
+ * WaitingForTimeoutResponse:
+ * -> -finish-: if the timeout response was delivered
+ * -> -finish-: if the timeout timeout triggered before the timeout response was produced
+ *
+ */
private sealed trait RequestState
-private case object WaitingForChunkedEnd extends RequestState
+private sealed abstract class WaitingForChunkHandler extends RequestState {
+ def timestamp: Timestamp
+ def receivedChunks: Queue[HttpRequestPart]
+}
+/** Got a ChunkedRequestStart, waiting for chunk handler to register */
+private case class WaitingForChunkHandlerBuffering(
+ timestamp: Timestamp = Timestamp.now,
+ receivedChunks: Queue[HttpRequestPart] = Queue.empty) extends WaitingForChunkHandler
+/** Got ChunkedMessageEnd, waiting for chunk handler to register */
+private case class WaitingForChunkHandlerReceivedAll(
+ timestamp: Timestamp,
+ receivedChunks: Queue[HttpRequestPart]) extends WaitingForChunkHandler
+/** Got the chunk handler, receiving and dispatching request chunks */
+private case class ReceivingRequestChunks(chunkHandler: ActorRef) extends RequestState
+/** Request was fully delivered waiting for response */
private case class WaitingForResponse(timestamp: Timestamp = Timestamp.now) extends RequestState
-private case class WaitingForTimeoutResponse(timestamp: Timestamp = Timestamp.now) extends RequestState
+/** ChunkedRequestStart was sent waiting for remaining chunks */
+private case object StreamingResponseChunks extends RequestState
+/** Timed-out while waiting for request, `Timeout` was dispatched, now waiting for timeout response */
+private case class WaitingForTimeoutResponse(timestamp: Timestamp = Timestamp.now) extends RequestState
Oops, something went wrong.

0 comments on commit d738e54

Please sign in to comment.