Skip to content

Commit

Permalink
! can: upgrade to new HttpEntity / HttpData model
Browse files Browse the repository at this point in the history
 The breaking change here is the evolution of the `response-size-hint` and `request-size-hint` config settings to `response-header-size-hint` and `request-header-size-hint` respectively (since spray-can now only renders the HTTP message headers into a `Tcp.Write` command and can forward the message entity untouched to the IO layer).
  • Loading branch information
sirthias committed Sep 10, 2013
1 parent c6f49cc commit ba1ae77
Show file tree
Hide file tree
Showing 22 changed files with 113 additions and 128 deletions.
Expand Up @@ -107,7 +107,7 @@ class HttpClientConnectionPipelineSpec extends Specification with RawSpecs2Pipel
}
commands.expectMsgPF() {
case Pipeline.Tell(`probeRef`, response: HttpResponse, `connectionActor`) response.entity
} === HttpEntity("body123body123")
} === HttpEntity(ContentTypes.`text/plain(UTF-8)`, HttpData("body123") +: HttpData("body123"))
}

"properly complete a 3 requests pipelined dialog" in new Fixture(stage) {
Expand Down
Expand Up @@ -361,8 +361,8 @@ class RequestParserSpec extends Specification {
val data = CompactByteString(rawRequest)
parser.parse(data) match {
case Result.Ok(HttpRequest(m, u, h, e, p), rd, close) (m, u, p, h, e.asString, rd.utf8String, close)
case Result.Ok(ChunkedRequestStart(HttpRequest(m, u, h, EmptyEntity, p)), rd, close) (m, u, p, h, rd.utf8String, close)
case Result.Ok(MessageChunk(body, ext), rd, close) (new String(body), ext, rd.utf8String, close)
case Result.Ok(ChunkedRequestStart(HttpRequest(m, u, h, HttpEntity.Empty, p)), rd, close) (m, u, p, h, rd.utf8String, close)
case Result.Ok(MessageChunk(d, ext), rd, close) (d.asString, ext, rd.utf8String, close)
case Result.Ok(ChunkedMessageEnd(ext, trailer), rd, close) (ext, trailer, rd.utf8String, close)
case Result.ParsingError(status, info) (status, info.formatPretty)
case x x
Expand Down
Expand Up @@ -211,8 +211,8 @@ class ResponseParserSpec extends Specification {
val data = CompactByteString(rawResponse)
parser.parse(data) match {
case Result.Ok(HttpResponse(s, e, h, p), rd, close) (s, e.asString, h, p, rd.utf8String, close)
case Result.Ok(ChunkedResponseStart(HttpResponse(s, EmptyEntity, h, p)), rd, close) (s, h, p, rd.utf8String, close)
case Result.Ok(MessageChunk(body, ext), rd, close) (new String(body), ext, rd.utf8String, close)
case Result.Ok(ChunkedResponseStart(HttpResponse(s, HttpEntity.Empty, h, p)), rd, close) (s, h, p, rd.utf8String, close)
case Result.Ok(MessageChunk(d, ext), rd, close) (d.asString, ext, rd.utf8String, close)
case Result.Ok(ChunkedMessageEnd(ext, trailer), rd, close) (ext, trailer, rd.utf8String, close)
case Result.ParsingError(BadRequest, info) info.formatPretty
case x x
Expand Down
Expand Up @@ -140,7 +140,7 @@ class ResponseRendererSpec extends mutable.Specification with DataTables {
}

"a response chunk" in new TestSetup() {
render(MessageChunk("body123".getBytes("ISO-8859-1"), """key=value;another="tl;dr"""")) === result {
render(MessageChunk(HttpData("body123".getBytes), """key=value;another="tl;dr"""")) === result {
"""7;key=value;another="tl;dr"
|body123
|"""
Expand Down Expand Up @@ -180,7 +180,8 @@ class ResponseRendererSpec extends mutable.Specification with DataTables {
}

"a chunkless response chunk" in new TestSetup(chunklessStreaming = true) {
render(response = MessageChunk("body123".getBytes("ISO-8859-1"), """key=value;another="tl;dr"""")) === result {
render(response = MessageChunk(HttpData("body123".getBytes),
"""key=value;another="tl;dr"""")) === result {
"body123"
} -> false
}
Expand Down
Expand Up @@ -58,7 +58,7 @@ class HttpServerConnectionPipelineSpec extends Specification with RawSpecs2Pipel
`Transfer-Encoding`("chunked"),
`Content-Type`(ContentTypes.`text/plain(UTF-8)`),
`Host`("test.com")),
entity = HttpEntity("body123body123"))
entity = HttpEntity(ContentTypes.`text/plain(UTF-8)`, HttpData("body123") +: HttpData("body123")))
}

"dispatch Ack messages" in {
Expand Down
Expand Up @@ -79,7 +79,7 @@ class SprayCanServerSpec extends Specification with NoTimeConversions {
serverHandler.reply(MessageChunk("234"))
serverHandler.reply(MessageChunk("345"))
serverHandler.reply(ChunkedMessageEnd)
probe.expectMsgType[ChunkedResponseStart].response.entity === EmptyEntity
probe.expectMsgType[ChunkedResponseStart].response.entity === HttpEntity.Empty
probe.expectMsg(MessageChunk("yeah"))
probe.expectMsg(MessageChunk("234"))
probe.expectMsg(MessageChunk("345"))
Expand Down
8 changes: 4 additions & 4 deletions spray-can/src/main/resources/reference.conf
Expand Up @@ -108,10 +108,10 @@ spray.can {
# dispatched to the handler.
request-chunk-aggregation-limit = 1m

# The initial size if the buffer to render the response in.
# The initial size if the buffer to render the response headers in.
# Can be used for fine-tuning response rendering performance but probably
# doesn't have to be fiddled with in most applications.
response-size-hint = 1k
response-header-size-hint = 512

# The time period within which the TCP binding process must be completed.
# Set to `infinite` to disable.
Expand Down Expand Up @@ -187,10 +187,10 @@ spray.can {
# dispatched to the application.
response-chunk-aggregation-limit = 1m

# The initial size if the buffer to render the request in.
# The initial size if the buffer to render the request headers in.
# Can be used for fine-tuning request rendering performance but probably
# doesn't have to be fiddled with in most applications.
request-size-hint = 512
request-header-size-hint = 512

# The time period within which the TCP connecting process must be completed.
# Set to `infinite` to disable.
Expand Down
Expand Up @@ -28,7 +28,7 @@ case class ClientConnectionSettings(
requestTimeout: Duration,
reapingCycle: Duration,
responseChunkAggregationLimit: Int,
requestSizeHint: Int,
requestHeaderSizeHint: Int,
connectingTimeout: Duration,
parserSettings: ParserSettings) {

Expand All @@ -37,7 +37,7 @@ case class ClientConnectionSettings(
requirePositive(reapingCycle)
require(0 <= responseChunkAggregationLimit && responseChunkAggregationLimit <= Int.MaxValue,
"response-chunk-aggregation-limit must be >= 0 and <= Int.MaxValue")
require(0 <= requestSizeHint && requestSizeHint <= Int.MaxValue,
require(0 <= requestHeaderSizeHint && requestHeaderSizeHint <= Int.MaxValue,
"request-size-hint must be >= 0 and <= Int.MaxValue")
requirePositive(connectingTimeout)
}
Expand All @@ -55,7 +55,7 @@ object ClientConnectionSettings extends SettingsCompanion[ClientConnectionSettin
c getDuration "request-timeout",
c getDuration "reaping-cycle",
c getBytes "response-chunk-aggregation-limit" toInt,
c getBytes "request-size-hint" toInt,
c getBytes "request-header-size-hint" toInt,
c getDuration "connecting-timeout",
ParserSettings fromSubConfig c.getConfig("parsing"))
}
Expand Down
Expand Up @@ -16,9 +16,9 @@

package spray.can.client

import akka.io.Tcp
import spray.can.rendering.{ ByteStringRendering, RequestRenderingComponent, RequestPartRenderingContext }
import spray.http.HttpHeaders.`User-Agent`
import spray.http.HttpDataRendering
import spray.can.rendering._
import spray.io._
import spray.util._

Expand All @@ -32,9 +32,9 @@ object RequestRendering {
new Pipelines {
val commandPipeline: CPL = {
case RequestPartRenderingContext(requestPart, ack)
val rendering = new ByteStringRendering(settings.requestSizeHint)
val rendering = new HttpDataRendering(settings.requestHeaderSizeHint)
renderRequestPart(rendering, requestPart, context.remoteAddress, context.log)
commandPL(Tcp.Write(rendering.get, ack))
commandPL(toTcpWriteCommand(rendering.get, ack))

case cmd commandPL(cmd)
}
Expand Down
Expand Up @@ -16,7 +16,6 @@

package spray.can.client

import akka.util.{ ByteString, ByteStringBuilder }
import spray.http._
import spray.io._
import spray.can.Http
Expand All @@ -31,22 +30,23 @@ object ResponseChunkAggregation {

val initialEventPipeline: EPL = {
case Http.MessageEvent(ChunkedResponseStart(response))
eventPipeline.become(aggregating(response))
eventPipeline.become(aggregating(response, HttpData.newBuilder += response.entity.data))

case ev eventPL(ev)
}

def aggregating(response: HttpResponse, bb: ByteStringBuilder = ByteString.newBuilder): EPL = {
case Http.MessageEvent(MessageChunk(body, _))
if (bb.length + body.length <= limit) bb.putBytes(body)
def aggregating(response: HttpResponse, builder: HttpData.Builder): EPL = {
case Http.MessageEvent(MessageChunk(data, _))
if (builder.byteCount + data.length <= limit)
builder += data
else closeWithError()

case Http.MessageEvent(_: ChunkedMessageEnd)
val contentType = response.header[HttpHeaders.`Content-Type`] match {
case Some(x) x.contentType
case None ContentTypes.`application/octet-stream`
}
eventPL(Http.MessageEvent(response.copy(entity = HttpEntity(contentType, bb.result().toArray[Byte]))))
eventPL(Http.MessageEvent(response.copy(entity = HttpEntity(contentType, builder.result()))))
eventPipeline.become(initialEventPipeline)

case ev eventPL(ev)
Expand Down
Expand Up @@ -127,7 +127,7 @@ private[parsing] abstract class HttpMessagePartParser[Part <: HttpMessagePart](v
else if (bodyStart.toLong + length <= input.length) {
val intLength = length.toInt
parse = this
val part = message(headers, entity(cth, input.iterator.slice(bodyStart, bodyStart + intLength).toArray[Byte]))
val part = message(headers, entity(cth, input.slice(bodyStart, bodyStart + intLength)))
Result.Ok(part, drop(input, bodyStart + intLength), closeAfterResponseCompletion)
} else {
parse = more parseFixedLengthBody(headers, input ++ more, bodyStart, length, cth, closeAfterResponseCompletion)
Expand All @@ -154,7 +154,7 @@ private[parsing] abstract class HttpMessagePartParser[Part <: HttpMessagePart](v
val chunkBodyEnd = cursor + chunkSize
def result(terminatorLen: Int) = {
parse = parseChunk(closeAfterResponseCompletion)
val chunk = MessageChunk(input.iterator.slice(cursor, chunkBodyEnd).toArray[Byte], extension)
val chunk = MessageChunk(HttpData(input.slice(cursor, chunkBodyEnd)), extension)
Result.Ok(chunk.asInstanceOf[Part], drop(input, chunkBodyEnd + terminatorLen), closeAfterResponseCompletion)
}
byteChar(input, chunkBodyEnd) match {
Expand Down Expand Up @@ -202,7 +202,7 @@ private[parsing] abstract class HttpMessagePartParser[Part <: HttpMessagePart](v
}

val consumed = math.min(remainingBytes, input.size).toInt // safe conversion because input.size returns an Int
val chunk = MessageChunk(input.take(consumed).toArray[Byte])
val chunk = MessageChunk(HttpData(input.take(consumed)))

val remaining =
consumed match {
Expand All @@ -223,12 +223,12 @@ private[parsing] abstract class HttpMessagePartParser[Part <: HttpMessagePart](v
Result.Ok(chunk.asInstanceOf[Part], remaining, closeAfterResponseCompletion)
}

def entity(cth: Option[`Content-Type`], body: Array[Byte]): HttpEntity = {
def entity(cth: Option[`Content-Type`], body: ByteString): HttpEntity = {
val contentType = cth match {
case Some(x) x.contentType
case None ContentTypes.`application/octet-stream`
}
HttpEntity(contentType, body)
HttpEntity(contentType, HttpData(body))
}

def closeAfterResponseCompletion(connectionHeader: Option[Connection]) =
Expand Down
Expand Up @@ -115,7 +115,7 @@ class HttpRequestPartParser(_settings: ParserSettings)(_headerParser: HttpHeader
}
if (contentLength == 0) {
parse = this
Result.Ok(message(headers, EmptyEntity), drop(input, bodyStart), closeAfterResponseCompletion)
Result.Ok(message(headers, HttpEntity.Empty), drop(input, bodyStart), closeAfterResponseCompletion)
} else if (contentLength <= settings.maxContentLength)
parseFixedLengthBody(headers, input, bodyStart, contentLength, cth, closeAfterResponseCompletion)
else fail(RequestEntityTooLarge, s"Request Content-Length $contentLength exceeds the configured limit of " +
Expand All @@ -127,5 +127,5 @@ class HttpRequestPartParser(_settings: ParserSettings)(_headerParser: HttpHeader
HttpRequest(method, uri, headers, entity, protocol)

def chunkStartMessage(headers: List[HttpHeader]) =
ChunkedRequestStart(message(headers, EmptyEntity))
ChunkedRequestStart(message(headers, HttpEntity.Empty))
}
Expand Up @@ -95,7 +95,7 @@ class HttpResponsePartParser(_settings: ParserSettings)(_headerParser: HttpHeade
case Some(te) if te.encodings.size == 1 && te.hasChunked
if (clh.isEmpty) {
parse = parseChunk(closeAfterResponseCompletion)
Result.Ok(ChunkedResponseStart(message(headers, EmptyEntity)), drop(input, bodyStart), closeAfterResponseCompletion)
Result.Ok(ChunkedResponseStart(message(headers, HttpEntity.Empty)), drop(input, bodyStart), closeAfterResponseCompletion)
} else fail("A chunked request must not contain a Content-Length header.")

case Some(te) fail(te.toString + " is not supported by this client")
Expand All @@ -104,7 +104,7 @@ class HttpResponsePartParser(_settings: ParserSettings)(_headerParser: HttpHeade
case Some(`Content-Length`(contentLength))
if (contentLength == 0) {
parse = this
Result.Ok(message(headers, EmptyEntity), drop(input, bodyStart), closeAfterResponseCompletion)
Result.Ok(message(headers, HttpEntity.Empty), drop(input, bodyStart), closeAfterResponseCompletion)
} else if (contentLength <= settings.maxContentLength)
parseFixedLengthBody(headers, input, bodyStart, contentLength, cth, closeAfterResponseCompletion)
else fail(s"Response Content-Length $contentLength exceeds the configured limit of " +
Expand All @@ -115,7 +115,7 @@ class HttpResponsePartParser(_settings: ParserSettings)(_headerParser: HttpHeade
}
} else {
parse = this
Result.Ok(message(headers, EmptyEntity), drop(input, bodyStart), closeAfterResponseCompletion)
Result.Ok(message(headers, HttpEntity.Empty), drop(input, bodyStart), closeAfterResponseCompletion)
}
}

Expand All @@ -125,7 +125,7 @@ class HttpResponsePartParser(_settings: ParserSettings)(_headerParser: HttpHeade
parse = { more
if (more.isEmpty) {
parse = this
val part = message(headers, entity(cth, input.iterator.drop(bodyStart).toArray[Byte]))
val part = message(headers, entity(cth, input drop bodyStart))
Result.Ok(part, CompactByteString.empty, closeAfterResponseCompletion = true)
} else parseToCloseBody(headers, input ++ more, bodyStart, cth)
}
Expand All @@ -137,5 +137,5 @@ class HttpResponsePartParser(_settings: ParserSettings)(_headerParser: HttpHeade
HttpResponse(statusCode, entity, headers, protocol)

def chunkStartMessage(headers: List[HttpHeader]): ChunkedResponseStart =
ChunkedResponseStart(message(headers, EmptyEntity))
ChunkedResponseStart(message(headers, HttpEntity.Empty))
}

This file was deleted.

Expand Up @@ -31,9 +31,9 @@ private[rendering] object RenderSupport {
implicit object MessageChunkRenderer extends Renderer[MessageChunk] {
def render[R <: Rendering](r: R, chunk: MessageChunk): r.type = {
import chunk._
r ~~ Integer.toHexString(body.length)
r ~~% data.length
if (!extension.isEmpty) r ~~ ';' ~~ extension
r ~~ CrLf ~~ body ~~ CrLf
r ~~ CrLf ~~ data ~~ CrLf
}
}

Expand Down

0 comments on commit ba1ae77

Please sign in to comment.