Permalink
Browse files

! can: upgrade to new HttpEntity / HttpData model

 The breaking change here is the evolution of the `response-size-hint` and `request-size-hint` config settings to `response-header-size-hint` and `request-header-size-hint` respectively (since spray-can now only renders the HTTP message headers into a `Tcp.Write` command and can forward the message entity untouched to the IO layer).
  • Loading branch information...
sirthias committed Aug 30, 2013
1 parent c6f49cc commit ba1ae7727ae890efba9e27ed2f7c04cf8a3d2d07
Showing with 113 additions and 128 deletions.
  1. +1 −1 spray-can-tests/src/test/scala/spray/can/client/HttpClientConnectionPipelineSpec.scala
  2. +2 −2 spray-can-tests/src/test/scala/spray/can/parsing/RequestParserSpec.scala
  3. +2 −2 spray-can-tests/src/test/scala/spray/can/parsing/ResponseParserSpec.scala
  4. +3 −2 spray-can-tests/src/test/scala/spray/can/rendering/ResponseRendererSpec.scala
  5. +1 −1 spray-can-tests/src/test/scala/spray/can/server/HttpServerConnectionPipelineSpec.scala
  6. +1 −1 spray-can-tests/src/test/scala/spray/can/server/SprayCanServerSpec.scala
  7. +4 −4 spray-can/src/main/resources/reference.conf
  8. +3 −3 spray-can/src/main/scala/spray/can/client/ClientConnectionSettings.scala
  9. +4 −4 spray-can/src/main/scala/spray/can/client/RequestRendering.scala
  10. +6 −6 spray-can/src/main/scala/spray/can/client/ResponseChunkAggregation.scala
  11. +5 −5 spray-can/src/main/scala/spray/can/parsing/HttpMessagePartParser.scala
  12. +2 −2 spray-can/src/main/scala/spray/can/parsing/HttpRequestPartParser.scala
  13. +5 −5 spray-can/src/main/scala/spray/can/parsing/HttpResponsePartParser.scala
  14. +0 −55 spray-can/src/main/scala/spray/can/rendering/ByteStringRendering.scala
  15. +2 −2 spray-can/src/main/scala/spray/can/rendering/RenderSupport.scala
  16. +11 −7 spray-can/src/main/scala/spray/can/rendering/RequestRenderingComponent.scala
  17. +10 −8 spray-can/src/main/scala/spray/can/rendering/ResponseRenderingComponent.scala
  18. +35 −0 spray-can/src/main/scala/spray/can/rendering/package.scala
  19. +6 −7 spray-can/src/main/scala/spray/can/server/RequestChunkAggregation.scala
  20. +5 −6 spray-can/src/main/scala/spray/can/server/ResponseRendering.scala
  21. +3 −3 spray-can/src/main/scala/spray/can/server/ServerSettings.scala
  22. +2 −2 spray-httpx/src/main/scala/spray/httpx/unmarshalling/BasicUnmarshallers.scala
@@ -107,7 +107,7 @@ class HttpClientConnectionPipelineSpec extends Specification with RawSpecs2Pipel
}
commands.expectMsgPF() {
case Pipeline.Tell(`probeRef`, response: HttpResponse, `connectionActor`) response.entity
- } === HttpEntity("body123body123")
+ } === HttpEntity(ContentTypes.`text/plain(UTF-8)`, HttpData("body123") +: HttpData("body123"))
}
"properly complete a 3 requests pipelined dialog" in new Fixture(stage) {
@@ -361,8 +361,8 @@ class RequestParserSpec extends Specification {
val data = CompactByteString(rawRequest)
parser.parse(data) match {
case Result.Ok(HttpRequest(m, u, h, e, p), rd, close) ⇒ (m, u, p, h, e.asString, rd.utf8String, close)
- case Result.Ok(ChunkedRequestStart(HttpRequest(m, u, h, EmptyEntity, p)), rd, close) ⇒ (m, u, p, h, rd.utf8String, close)
- case Result.Ok(MessageChunk(body, ext), rd, close) ⇒ (new String(body), ext, rd.utf8String, close)
+ case Result.Ok(ChunkedRequestStart(HttpRequest(m, u, h, HttpEntity.Empty, p)), rd, close) ⇒ (m, u, p, h, rd.utf8String, close)
+ case Result.Ok(MessageChunk(d, ext), rd, close) ⇒ (d.asString, ext, rd.utf8String, close)
case Result.Ok(ChunkedMessageEnd(ext, trailer), rd, close) ⇒ (ext, trailer, rd.utf8String, close)
case Result.ParsingError(status, info) ⇒ (status, info.formatPretty)
case x ⇒ x
@@ -211,8 +211,8 @@ class ResponseParserSpec extends Specification {
val data = CompactByteString(rawResponse)
parser.parse(data) match {
case Result.Ok(HttpResponse(s, e, h, p), rd, close) (s, e.asString, h, p, rd.utf8String, close)
- case Result.Ok(ChunkedResponseStart(HttpResponse(s, EmptyEntity, h, p)), rd, close) (s, h, p, rd.utf8String, close)
- case Result.Ok(MessageChunk(body, ext), rd, close) (new String(body), ext, rd.utf8String, close)
+ case Result.Ok(ChunkedResponseStart(HttpResponse(s, HttpEntity.Empty, h, p)), rd, close) (s, h, p, rd.utf8String, close)
+ case Result.Ok(MessageChunk(d, ext), rd, close) (d.asString, ext, rd.utf8String, close)
case Result.Ok(ChunkedMessageEnd(ext, trailer), rd, close) (ext, trailer, rd.utf8String, close)
case Result.ParsingError(BadRequest, info) info.formatPretty
case x x
@@ -140,7 +140,7 @@ class ResponseRendererSpec extends mutable.Specification with DataTables {
}
"a response chunk" in new TestSetup() {
- render(MessageChunk("body123".getBytes("ISO-8859-1"), """key=value;another="tl;dr"""")) === result {
+ render(MessageChunk(HttpData("body123".getBytes), """key=value;another="tl;dr"""")) === result {
"""7;key=value;another="tl;dr"
|body123
|"""
@@ -180,7 +180,8 @@ class ResponseRendererSpec extends mutable.Specification with DataTables {
}
"a chunkless response chunk" in new TestSetup(chunklessStreaming = true) {
- render(response = MessageChunk("body123".getBytes("ISO-8859-1"), """key=value;another="tl;dr"""")) === result {
+ render(response = MessageChunk(HttpData("body123".getBytes),
+ """key=value;another="tl;dr"""")) === result {
"body123"
} -> false
}
@@ -58,7 +58,7 @@ class HttpServerConnectionPipelineSpec extends Specification with RawSpecs2Pipel
`Transfer-Encoding`("chunked"),
`Content-Type`(ContentTypes.`text/plain(UTF-8)`),
`Host`("test.com")),
- entity = HttpEntity("body123body123"))
+ entity = HttpEntity(ContentTypes.`text/plain(UTF-8)`, HttpData("body123") +: HttpData("body123")))
}
"dispatch Ack messages" in {
@@ -79,7 +79,7 @@ class SprayCanServerSpec extends Specification with NoTimeConversions {
serverHandler.reply(MessageChunk("234"))
serverHandler.reply(MessageChunk("345"))
serverHandler.reply(ChunkedMessageEnd)
- probe.expectMsgType[ChunkedResponseStart].response.entity === EmptyEntity
+ probe.expectMsgType[ChunkedResponseStart].response.entity === HttpEntity.Empty
probe.expectMsg(MessageChunk("yeah"))
probe.expectMsg(MessageChunk("234"))
probe.expectMsg(MessageChunk("345"))
@@ -108,10 +108,10 @@ spray.can {
# dispatched to the handler.
request-chunk-aggregation-limit = 1m
- # The initial size if the buffer to render the response in.
+ # The initial size if the buffer to render the response headers in.
# Can be used for fine-tuning response rendering performance but probably
# doesn't have to be fiddled with in most applications.
- response-size-hint = 1k
+ response-header-size-hint = 512
# The time period within which the TCP binding process must be completed.
# Set to `infinite` to disable.
@@ -187,10 +187,10 @@ spray.can {
# dispatched to the application.
response-chunk-aggregation-limit = 1m
- # The initial size if the buffer to render the request in.
+ # The initial size if the buffer to render the request headers in.
# Can be used for fine-tuning request rendering performance but probably
# doesn't have to be fiddled with in most applications.
- request-size-hint = 512
+ request-header-size-hint = 512
# The time period within which the TCP connecting process must be completed.
# Set to `infinite` to disable.
@@ -28,7 +28,7 @@ case class ClientConnectionSettings(
requestTimeout: Duration,
reapingCycle: Duration,
responseChunkAggregationLimit: Int,
- requestSizeHint: Int,
+ requestHeaderSizeHint: Int,
connectingTimeout: Duration,
parserSettings: ParserSettings) {
@@ -37,7 +37,7 @@ case class ClientConnectionSettings(
requirePositive(reapingCycle)
require(0 <= responseChunkAggregationLimit && responseChunkAggregationLimit <= Int.MaxValue,
"response-chunk-aggregation-limit must be >= 0 and <= Int.MaxValue")
- require(0 <= requestSizeHint && requestSizeHint <= Int.MaxValue,
+ require(0 <= requestHeaderSizeHint && requestHeaderSizeHint <= Int.MaxValue,
"request-size-hint must be >= 0 and <= Int.MaxValue")
requirePositive(connectingTimeout)
}
@@ -55,7 +55,7 @@ object ClientConnectionSettings extends SettingsCompanion[ClientConnectionSettin
c getDuration "request-timeout",
c getDuration "reaping-cycle",
c getBytes "response-chunk-aggregation-limit" toInt,
- c getBytes "request-size-hint" toInt,
+ c getBytes "request-header-size-hint" toInt,
c getDuration "connecting-timeout",
ParserSettings fromSubConfig c.getConfig("parsing"))
}
@@ -16,9 +16,9 @@
package spray.can.client
-import akka.io.Tcp
-import spray.can.rendering.{ ByteStringRendering, RequestRenderingComponent, RequestPartRenderingContext }
import spray.http.HttpHeaders.`User-Agent`
+import spray.http.HttpDataRendering
+import spray.can.rendering._
import spray.io._
import spray.util._
@@ -32,9 +32,9 @@ object RequestRendering {
new Pipelines {
val commandPipeline: CPL = {
case RequestPartRenderingContext(requestPart, ack)
- val rendering = new ByteStringRendering(settings.requestSizeHint)
+ val rendering = new HttpDataRendering(settings.requestHeaderSizeHint)
renderRequestPart(rendering, requestPart, context.remoteAddress, context.log)
- commandPL(Tcp.Write(rendering.get, ack))
+ commandPL(toTcpWriteCommand(rendering.get, ack))
case cmd commandPL(cmd)
}
@@ -16,7 +16,6 @@
package spray.can.client
-import akka.util.{ ByteString, ByteStringBuilder }
import spray.http._
import spray.io._
import spray.can.Http
@@ -31,22 +30,23 @@ object ResponseChunkAggregation {
val initialEventPipeline: EPL = {
case Http.MessageEvent(ChunkedResponseStart(response))
- eventPipeline.become(aggregating(response))
+ eventPipeline.become(aggregating(response, HttpData.newBuilder += response.entity.data))
case ev eventPL(ev)
}
- def aggregating(response: HttpResponse, bb: ByteStringBuilder = ByteString.newBuilder): EPL = {
- case Http.MessageEvent(MessageChunk(body, _))
- if (bb.length + body.length <= limit) bb.putBytes(body)
+ def aggregating(response: HttpResponse, builder: HttpData.Builder): EPL = {
+ case Http.MessageEvent(MessageChunk(data, _))
+ if (builder.byteCount + data.length <= limit)
+ builder += data
else closeWithError()
case Http.MessageEvent(_: ChunkedMessageEnd)
val contentType = response.header[HttpHeaders.`Content-Type`] match {
case Some(x) x.contentType
case None ContentTypes.`application/octet-stream`
}
- eventPL(Http.MessageEvent(response.copy(entity = HttpEntity(contentType, bb.result().toArray[Byte]))))
+ eventPL(Http.MessageEvent(response.copy(entity = HttpEntity(contentType, builder.result()))))
eventPipeline.become(initialEventPipeline)
case ev eventPL(ev)
@@ -127,7 +127,7 @@ private[parsing] abstract class HttpMessagePartParser[Part <: HttpMessagePart](v
else if (bodyStart.toLong + length <= input.length) {
val intLength = length.toInt
parse = this
- val part = message(headers, entity(cth, input.iterator.slice(bodyStart, bodyStart + intLength).toArray[Byte]))
+ val part = message(headers, entity(cth, input.slice(bodyStart, bodyStart + intLength)))
Result.Ok(part, drop(input, bodyStart + intLength), closeAfterResponseCompletion)
} else {
parse = more parseFixedLengthBody(headers, input ++ more, bodyStart, length, cth, closeAfterResponseCompletion)
@@ -154,7 +154,7 @@ private[parsing] abstract class HttpMessagePartParser[Part <: HttpMessagePart](v
val chunkBodyEnd = cursor + chunkSize
def result(terminatorLen: Int) = {
parse = parseChunk(closeAfterResponseCompletion)
- val chunk = MessageChunk(input.iterator.slice(cursor, chunkBodyEnd).toArray[Byte], extension)
+ val chunk = MessageChunk(HttpData(input.slice(cursor, chunkBodyEnd)), extension)
Result.Ok(chunk.asInstanceOf[Part], drop(input, chunkBodyEnd + terminatorLen), closeAfterResponseCompletion)
}
byteChar(input, chunkBodyEnd) match {
@@ -202,7 +202,7 @@ private[parsing] abstract class HttpMessagePartParser[Part <: HttpMessagePart](v
}
val consumed = math.min(remainingBytes, input.size).toInt // safe conversion because input.size returns an Int
- val chunk = MessageChunk(input.take(consumed).toArray[Byte])
+ val chunk = MessageChunk(HttpData(input.take(consumed)))
val remaining =
consumed match {
@@ -223,12 +223,12 @@ private[parsing] abstract class HttpMessagePartParser[Part <: HttpMessagePart](v
Result.Ok(chunk.asInstanceOf[Part], remaining, closeAfterResponseCompletion)
}
- def entity(cth: Option[`Content-Type`], body: Array[Byte]): HttpEntity = {
+ def entity(cth: Option[`Content-Type`], body: ByteString): HttpEntity = {
val contentType = cth match {
case Some(x) x.contentType
case None ContentTypes.`application/octet-stream`
}
- HttpEntity(contentType, body)
+ HttpEntity(contentType, HttpData(body))
}
def closeAfterResponseCompletion(connectionHeader: Option[Connection]) =
@@ -115,7 +115,7 @@ class HttpRequestPartParser(_settings: ParserSettings)(_headerParser: HttpHeader
}
if (contentLength == 0) {
parse = this
- Result.Ok(message(headers, EmptyEntity), drop(input, bodyStart), closeAfterResponseCompletion)
+ Result.Ok(message(headers, HttpEntity.Empty), drop(input, bodyStart), closeAfterResponseCompletion)
} else if (contentLength <= settings.maxContentLength)
parseFixedLengthBody(headers, input, bodyStart, contentLength, cth, closeAfterResponseCompletion)
else fail(RequestEntityTooLarge, s"Request Content-Length $contentLength exceeds the configured limit of " +
@@ -127,5 +127,5 @@ class HttpRequestPartParser(_settings: ParserSettings)(_headerParser: HttpHeader
HttpRequest(method, uri, headers, entity, protocol)
def chunkStartMessage(headers: List[HttpHeader]) =
- ChunkedRequestStart(message(headers, EmptyEntity))
+ ChunkedRequestStart(message(headers, HttpEntity.Empty))
}
@@ -95,7 +95,7 @@ class HttpResponsePartParser(_settings: ParserSettings)(_headerParser: HttpHeade
case Some(te) if te.encodings.size == 1 && te.hasChunked
if (clh.isEmpty) {
parse = parseChunk(closeAfterResponseCompletion)
- Result.Ok(ChunkedResponseStart(message(headers, EmptyEntity)), drop(input, bodyStart), closeAfterResponseCompletion)
+ Result.Ok(ChunkedResponseStart(message(headers, HttpEntity.Empty)), drop(input, bodyStart), closeAfterResponseCompletion)
} else fail("A chunked request must not contain a Content-Length header.")
case Some(te) fail(te.toString + " is not supported by this client")
@@ -104,7 +104,7 @@ class HttpResponsePartParser(_settings: ParserSettings)(_headerParser: HttpHeade
case Some(`Content-Length`(contentLength))
if (contentLength == 0) {
parse = this
- Result.Ok(message(headers, EmptyEntity), drop(input, bodyStart), closeAfterResponseCompletion)
+ Result.Ok(message(headers, HttpEntity.Empty), drop(input, bodyStart), closeAfterResponseCompletion)
} else if (contentLength <= settings.maxContentLength)
parseFixedLengthBody(headers, input, bodyStart, contentLength, cth, closeAfterResponseCompletion)
else fail(s"Response Content-Length $contentLength exceeds the configured limit of " +
@@ -115,7 +115,7 @@ class HttpResponsePartParser(_settings: ParserSettings)(_headerParser: HttpHeade
}
} else {
parse = this
- Result.Ok(message(headers, EmptyEntity), drop(input, bodyStart), closeAfterResponseCompletion)
+ Result.Ok(message(headers, HttpEntity.Empty), drop(input, bodyStart), closeAfterResponseCompletion)
}
}
@@ -125,7 +125,7 @@ class HttpResponsePartParser(_settings: ParserSettings)(_headerParser: HttpHeade
parse = { more
if (more.isEmpty) {
parse = this
- val part = message(headers, entity(cth, input.iterator.drop(bodyStart).toArray[Byte]))
+ val part = message(headers, entity(cth, input drop bodyStart))
Result.Ok(part, CompactByteString.empty, closeAfterResponseCompletion = true)
} else parseToCloseBody(headers, input ++ more, bodyStart, cth)
}
@@ -137,5 +137,5 @@ class HttpResponsePartParser(_settings: ParserSettings)(_headerParser: HttpHeade
HttpResponse(statusCode, entity, headers, protocol)
def chunkStartMessage(headers: List[HttpHeader]): ChunkedResponseStart =
- ChunkedResponseStart(message(headers, EmptyEntity))
+ ChunkedResponseStart(message(headers, HttpEntity.Empty))
}
@@ -1,55 +0,0 @@
-/*
- * Copyright (C) 2011-2013 spray.io
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spray.can.rendering
-
-import spray.http.Rendering
-import akka.util.ByteString
-
-private[can] class ByteStringRendering(sizeHint: Int) extends Rendering {
- private[this] var array = new Array[Byte](sizeHint)
- private[this] var size = 0
-
- def ~~(char: Char): this.type = {
- val oldSize = growBy(1)
- array(oldSize) = char.toByte
- this
- }
-
- def ~~(bytes: Array[Byte]): this.type = {
- if (bytes.length > 0) {
- val oldSize = growBy(bytes.length)
- System.arraycopy(bytes, 0, array, oldSize, bytes.length)
- }
- this
- }
-
- private def growBy(delta: Int): Int = {
- val oldSize = size
- val neededSize = oldSize.toLong + delta
- if (array.length < neededSize)
- if (neededSize < Int.MaxValue) {
- val newLen = math.min(math.max(array.length.toLong * 2, neededSize), Int.MaxValue).toInt
- val newArray = new Array[Byte](newLen)
- System.arraycopy(array, 0, newArray, 0, array.length)
- array = newArray
- } else sys.error("Cannot create compact ByteString greater than 2GB in size")
- size = neededSize.toInt
- oldSize
- }
-
- def get: ByteString = akka.spray.createByteStringUnsafe(array, 0, size)
-}
@@ -31,9 +31,9 @@ private[rendering] object RenderSupport {
implicit object MessageChunkRenderer extends Renderer[MessageChunk] {
def render[R <: Rendering](r: R, chunk: MessageChunk): r.type = {
import chunk._
- r ~~ Integer.toHexString(body.length)
+ r ~~% data.length
if (!extension.isEmpty) r ~~ ';' ~~ extension
- r ~~ CrLf ~~ body ~~ CrLf
+ r ~~ CrLf ~~ data ~~ CrLf
}
}
Oops, something went wrong.

0 comments on commit ba1ae77

Please sign in to comment.