Skip to content
This repository has been archived by the owner on Apr 24, 2024. It is now read-only.

Commit

Permalink
! can: require services to respond to ChunkedRequestStart with Regist…
Browse files Browse the repository at this point in the history
…erChunkHandler, fixes #473
  • Loading branch information
jrudolph committed Oct 15, 2013
1 parent 979fc31 commit d738e54
Show file tree
Hide file tree
Showing 10 changed files with 154 additions and 55 deletions.
Expand Up @@ -10,15 +10,12 @@ import spray.util._
import spray.http._
import HttpMethods._
import MediaTypes._
import spray.can.Http.RegisterChunkHandler

class DemoService extends Actor with ActorLogging {
implicit val timeout: Timeout = 1.second // for the actor 'asks'
import context.dispatcher // ExecutionContext for the futures and scheduler

// a map from connections to chunk handlers
// will be created on ChunkedRequestStart and removed on ChunkedRequestEnd
var chunkHandlers = Map.empty[ActorRef, ActorRef]

def receive = {
// when a new connection comes in we register ourselves as the connection handler
case _: Http.Connected => sender ! Http.Register(self)
Expand Down Expand Up @@ -56,15 +53,9 @@ class DemoService extends Actor with ActorLogging {
r.asPartStream().foreach(self.tell(_, sender))

case s@ChunkedRequestStart(HttpRequest(POST, Uri.Path("/file-upload"), _, _, _)) =>
require(!chunkHandlers.contains(sender))
val client = sender
val handler = context.actorOf(Props(new FileUploadHandler(client, s)))
chunkHandlers += (client -> handler)
handler.tell(s, client)
case c: MessageChunk => chunkHandlers(sender).tell(c, sender)
case e: ChunkedMessageEnd =>
chunkHandlers(sender).tell(e, sender)
chunkHandlers -= sender
sender ! RegisterChunkHandler(handler)

case _: HttpRequest => sender ! HttpResponse(status = 404, entity = "Unknown resource!")

Expand Down
Expand Up @@ -22,7 +22,7 @@ import akka.actor.{ ActorRef, Status, ActorSystem }
import akka.io.IO
import akka.testkit.TestProbe
import spray.can.Http
import spray.can.Http.ClientConnectionType
import spray.can.Http.{ RegisterChunkHandler, ClientConnectionType }
import spray.io.ClientSSLEngineProvider
import spray.util.Utils._
import spray.httpx.RequestBuilding._
Expand All @@ -36,6 +36,7 @@ class SprayCanClientSpec extends Specification {
akka.loglevel = ERROR
akka.io.tcp.trace-logging = off
spray.can.client.request-timeout = 500ms
spray.can.server.verbose-error-messages = on
spray.can.host-connector.max-retries = 1
spray.can.host-connector.idle-timeout = infinite
spray.can.host-connector.client.request-timeout = 500ms
Expand Down Expand Up @@ -66,10 +67,14 @@ class SprayCanClientSpec extends Specification {
client.send(clientConnection, Get("/def") ~> Host(hostname, port))

val server = acceptConnection()
val chunkHandler = TestProbe()

server.expectMsgType[ChunkedRequestStart].request.uri.path.toString === "/abc"
server.expectMsg(MessageChunk("123"))
server.expectMsg(MessageChunk("456"))
server.expectMsg(ChunkedMessageEnd)
server.reply(RegisterChunkHandler(chunkHandler.ref))

chunkHandler.expectMsg(MessageChunk("123"))
chunkHandler.expectMsg(MessageChunk("456"))
chunkHandler.expectMsg(ChunkedMessageEnd)
val firstRequestSender = server.sender
server.expectMsgType[HttpRequest].uri.path.toString === "/def"
server.reply(HttpResponse(entity = "ok-def")) // reply to the second request first
Expand Down
Expand Up @@ -31,6 +31,7 @@ import spray.util.Utils.temporaryServerHostnameAndPort
import spray.httpx.RequestBuilding._
import spray.http._
import HttpProtocols._
import spray.can.Http.RegisterChunkHandler
import spray.can.client.ClientConnectionSettings

class SprayCanServerSpec extends Specification with NoTimeConversions {
Expand Down Expand Up @@ -96,21 +97,27 @@ class SprayCanServerSpec extends Specification with NoTimeConversions {
val probe = sendRequest(connection, ChunkedRequestStart(Get("/abc")))
serverHandler.expectMsgType[ChunkedRequestStart].request.uri === Uri(s"http://$hostname:$port/abc")
probe.send(connection, MessageChunk("123"))

val chunkHandler = TestProbe()
serverHandler.reply(RegisterChunkHandler(chunkHandler.ref))

probe.send(connection, MessageChunk("456"))
serverHandler.expectMsg(MessageChunk("123"))
serverHandler.expectMsg(MessageChunk("456"))

chunkHandler.expectMsg(MessageChunk("123"))
chunkHandler.expectMsg(MessageChunk("456"))
probe.send(connection, ChunkedMessageEnd)
serverHandler.expectMsg(ChunkedMessageEnd)
chunkHandler.expectMsg(ChunkedMessageEnd)

serverHandler.reply(ChunkedResponseStart(HttpResponse(entity = "yeah")))
serverHandler.reply(MessageChunk("234"))
serverHandler.reply(MessageChunk("345"))
serverHandler.reply(ChunkedMessageEnd)
chunkHandler.reply(ChunkedResponseStart(HttpResponse(entity = "yeah")))
chunkHandler.reply(MessageChunk("234"))
chunkHandler.reply(MessageChunk("345"))
chunkHandler.reply(ChunkedMessageEnd)
probe.expectMsgType[ChunkedResponseStart].response.entity === HttpEntity.Empty
probe.expectMsg(MessageChunk("yeah"))
probe.expectMsg(MessageChunk("234"))
probe.expectMsg(MessageChunk("345"))
probe.expectMsg(ChunkedMessageEnd)
serverHandler.expectNoMsg()
}

"maintain response order for pipelined requests" in new TestSetup {
Expand Down
6 changes: 6 additions & 0 deletions spray-can/src/main/resources/reference.conf
Expand Up @@ -47,6 +47,12 @@ spray.can {
# Set to `infinite` to disable timeout timeouts.
timeout-timeout = 2 s

# The time during which a service must respond to a `ChunkedRequestStart` message
# with a `RegisterChunkHandler` message and during which incoming request chunks
# have to be buffered. If the timeout is triggered the connection is immediately
# aborted to avoid memory being overflowed with buffered chunks.
chunkhandler-registration-timeout = 500 ms

# The path of the actor to send `spray.http.Timedout` messages to.
# If empty all `Timedout` messages will go to the "regular" request
# handling actor.
Expand Down
1 change: 1 addition & 0 deletions spray-can/src/main/scala/spray/can/Http.scala
Expand Up @@ -88,6 +88,7 @@ object Http extends ExtensionKey[HttpExt] {
case class Register(handler: ActorRef,
keepOpenOnPeerClosed: Boolean = false,
fastPath: FastPath = EmptyFastPath) extends Command
case class RegisterChunkHandler(handler: ActorRef) extends Command

case class Unbind(timeout: Duration) extends Command
object Unbind extends Unbind(Duration.Zero)
Expand Down
124 changes: 100 additions & 24 deletions spray-can/src/main/scala/spray/can/server/OpenRequest.scala
Expand Up @@ -42,12 +42,15 @@ private sealed trait OpenRequest {
def handleResponseEndAndReturnNextOpenRequest(part: HttpMessagePartWrapper): OpenRequest
def handleResponsePart(part: HttpMessagePartWrapper)
def enqueueCommand(command: Command)
def registerChunkHandler(handler: ActorRef)

// events
def handleMessageChunk(chunk: MessageChunk)
def handleChunkedMessageEnd(part: ChunkedMessageEnd)
def handleSentAckAndReturnNextUnconfirmed(ev: AckEventWithReceiver): OpenRequest
def handleClosed(ev: Http.ConnectionClosed)

def isWaitingForChunkHandler: Boolean
}

private trait OpenRequestComponent { component
Expand Down Expand Up @@ -80,8 +83,10 @@ private trait OpenRequestComponent { component ⇒
request.copy(method = HttpMethods.GET)
else request
val partToDispatch: HttpRequestPart =
if (state == WaitingForChunkedEnd) ChunkedRequestStart(requestToDispatch)
else requestToDispatch
state match {
case _: WaitingForChunkHandler ChunkedRequestStart(requestToDispatch)
case _ requestToDispatch
}
if (context.log.isDebugEnabled)
context.log.debug("Dispatching {} to handler {}", format(partToDispatch), handler)
downstreamCommandPL(Pipeline.Tell(handler, partToDispatch, receiverRef))
Expand All @@ -96,9 +101,15 @@ private trait OpenRequestComponent { component ⇒

def checkForTimeout(now: Timestamp): Unit = {
state match {
case WaitingForChunkedEnd
case w: WaitingForChunkHandler
if ((w.timestamp + settings.chunkHandlerRegistrationTimeout).isPast(now)) {
context.log.warning("A chunk handler wasn't registered timely. Aborting the connection.")
downstreamCommandPL(Tcp.Abort)
}
case _: ReceivingRequestChunks
case StreamingResponseChunks
case WaitingForResponse(timestamp)
if ((timestamp + requestTimeout).isPast) {
if ((timestamp + requestTimeout).isPast(now)) {
val timeoutHandler =
if (settings.timeoutHandler.isEmpty) handler
else context.actorContext.actorFor(settings.timeoutHandler)
Expand All @@ -109,7 +120,7 @@ private trait OpenRequestComponent { component ⇒
state = WaitingForTimeoutResponse()
}
case WaitingForTimeoutResponse(timestamp)
if ((timestamp + timeoutTimeout).isPast) {
if ((timestamp + timeoutTimeout).isPast(now)) {
val response = timeoutResponse(request)
// we always close the connection after a timeout-timeout
sendPart(response.withHeaders(HttpHeaders.Connection("close") :: response.headers))
Expand All @@ -136,7 +147,7 @@ private trait OpenRequestComponent { component ⇒
}

def handleResponsePart(part: HttpMessagePartWrapper): Unit = {
state = WaitingForChunkedEnd // disable request timeout checking once the first response part has come in
state = StreamingResponseChunks // disable request timeout checking once the first response part has come in
handler = context.actorContext.sender // remember who to send Closed events to
sendPart(part)
dispatchNextQueuedResponse()
Expand All @@ -146,25 +157,36 @@ private trait OpenRequestComponent { component ⇒
responseQueue = responseQueue enqueue command
}

/***** EVENTS *****/
def registerChunkHandler(handler: ActorRef): Unit = {
def dispatch(part: HttpRequestPart) = downstreamCommandPL(Pipeline.Tell(handler, part, receiverRef))

def handleMessageChunk(chunk: MessageChunk): Unit = {
if (nextInChain.isEmpty)
downstreamCommandPL(Pipeline.Tell(handler, chunk, receiverRef))
else
// we accept non-tail recursion since HTTP pipeline depth is limited (and small)
nextInChain handleMessageChunk chunk
state =
state match {
case WaitingForChunkHandlerBuffering(_, receiveds) receiveds.foreach(dispatch); ReceivingRequestChunks(handler)
case WaitingForChunkHandlerReceivedAll(_, receiveds) receiveds.foreach(dispatch); WaitingForResponse()
case x throw new IllegalStateException("Didn't expect " + x)
}
}

def handleChunkedMessageEnd(part: ChunkedMessageEnd): Unit = {
if (nextInChain.isEmpty) {
// only start request timeout checking after request has been completed
state = WaitingForResponse()
downstreamCommandPL(Pipeline.Tell(handler, part, receiverRef))
} else
// we accept non-tail recursion since HTTP pipeline depth is limited (and small)
nextInChain handleChunkedMessageEnd part
}
/***** EVENTS *****/

def handleMessageChunk(chunk: MessageChunk): Unit =
state match {
case WaitingForChunkHandlerBuffering(timeout, receiveds) state = WaitingForChunkHandlerBuffering(timeout, receiveds.enqueue(chunk))
case ReceivingRequestChunks(chunkHandler) downstreamCommandPL(Pipeline.Tell(chunkHandler, chunk, receiverRef))
case x if nextInChain.isEmpty throw new IllegalArgumentException(s"$this Didn't expect message chunks in state $state")
case _ nextInChain handleMessageChunk chunk
}

def handleChunkedMessageEnd(part: ChunkedMessageEnd): Unit =
state match {
case WaitingForChunkHandlerBuffering(timeout, receiveds) state = WaitingForChunkHandlerReceivedAll(timeout, receiveds.enqueue(part))
case ReceivingRequestChunks(chunkHandler)
state = WaitingForResponse()
downstreamCommandPL(Pipeline.Tell(chunkHandler, part, receiverRef))
case x if nextInChain.isEmpty throw new IllegalArgumentException(s"$this Didn't expect ChunkedMessageEnd in state $state")
case _ nextInChain handleChunkedMessageEnd part
}

def handleSentAckAndReturnNextUnconfirmed(ev: AckEventWithReceiver) = {
downstreamCommandPL(Pipeline.Tell(ev.receiver, ev.ack, receiverRef))
Expand Down Expand Up @@ -202,6 +224,8 @@ private trait OpenRequestComponent { component ⇒
case MessageChunk(body, _) body.length.toString + " byte request chunk"
case x x.toString
}

def isWaitingForChunkHandler: Boolean = state.isInstanceOf[WaitingForChunkHandler]
}

object EmptyOpenRequest extends OpenRequest {
Expand All @@ -223,6 +247,9 @@ private trait OpenRequestComponent { component ⇒

def enqueueCommand(command: Command): Unit = {}

def registerChunkHandler(handler: ActorRef): Unit =
throw new IllegalStateException("Received RegisterChunkHandler for non-existing request")

// events
def handleMessageChunk(chunk: MessageChunk): Unit = { throw new IllegalStateException }
def handleChunkedMessageEnd(part: ChunkedMessageEnd): Unit = { throw new IllegalStateException }
Expand All @@ -233,14 +260,63 @@ private trait OpenRequestComponent { component ⇒
def handleClosed(ev: Http.ConnectionClosed): Unit = {
downstreamCommandPL(Pipeline.Tell(context.handler, ev, context.self))
}

def isWaitingForChunkHandler: Boolean = false
}

}

private case class AckEventWithReceiver(ack: Any, receiver: ActorRef) extends Event
private case class PartAndSender(part: HttpResponsePart, sender: ActorRef)

/**
* The state of a request. State transformations:
*
* Initial state:
* -> WaitingForChunkHandlerBuffering: if incoming request is chunked
* -> WaitingForResponse: if request was received completely
*
* WaitingForChunkHandlerBuffering:
* -> WaitingForChunkHandlerReceivedAll: if ChunkedMessageEnd was received before chunk handler was registered
* -> ReceivingRequestChunks: after chunk handler was registered
*
* WaitingForChunkHandlerReceivedAll:
* -> WaitingForResponse: after chunk handler was registered
*
* ReceivingRequestChunks:
* -> WaitingForResponse: after ChunkedMessageEnd was received and dispatched
*
* WaitingForResponse:
* -> -finish-: if complete response was received
* -> StreamingResponseChunks: after a ChunkedResponseStart was sent
* -> WaitingForTimeoutResponse: if the timeout triggered before a response (start) was produced
*
* StreamingResponseStart:
* -> -finish-: if ChunkedMessageEnd was sent
*
* WaitingForTimeoutResponse:
* -> -finish-: if the timeout response was delivered
* -> -finish-: if the timeout timeout triggered before the timeout response was produced
*
*/
private sealed trait RequestState
private case object WaitingForChunkedEnd extends RequestState
private sealed abstract class WaitingForChunkHandler extends RequestState {
def timestamp: Timestamp
def receivedChunks: Queue[HttpRequestPart]
}
/** Got a ChunkedRequestStart, waiting for chunk handler to register */
private case class WaitingForChunkHandlerBuffering(
timestamp: Timestamp = Timestamp.now,
receivedChunks: Queue[HttpRequestPart] = Queue.empty) extends WaitingForChunkHandler
/** Got ChunkedMessageEnd, waiting for chunk handler to register */
private case class WaitingForChunkHandlerReceivedAll(
timestamp: Timestamp,
receivedChunks: Queue[HttpRequestPart]) extends WaitingForChunkHandler
/** Got the chunk handler, receiving and dispatching request chunks */
private case class ReceivingRequestChunks(chunkHandler: ActorRef) extends RequestState
/** Request was fully delivered waiting for response */
private case class WaitingForResponse(timestamp: Timestamp = Timestamp.now) extends RequestState
private case class WaitingForTimeoutResponse(timestamp: Timestamp = Timestamp.now) extends RequestState
/** ChunkedRequestStart was sent waiting for remaining chunks */
private case object StreamingResponseChunks extends RequestState
/** Timed-out while waiting for request, `Timeout` was dispatched, now waiting for timeout response */
private case class WaitingForTimeoutResponse(timestamp: Timestamp = Timestamp.now) extends RequestState

0 comments on commit d738e54

Please sign in to comment.