Skip to content

Commit

Permalink
[backport from master] =htp add size limit to decodeRequest directi…
Browse files Browse the repository at this point in the history
…ves, ref akka#2137
  • Loading branch information
raboof authored and jrudolph committed Sep 5, 2018
1 parent 6841af6 commit 3d8cac3
Show file tree
Hide file tree
Showing 13 changed files with 283 additions and 14 deletions.
Expand Up @@ -17,3 +17,9 @@ ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.http.impl.settings.P

# Uri conversion additions https://github.com/akka/akka-http/pull/1950
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.javadsl.model.Uri.asScala")

# RoutingSettings is @DoNotInherit
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.http.scaladsl.settings.RoutingSettings.decodeMaxSize")
# Impl classes
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.settings.RoutingSettingsImpl.*")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.http.impl.settings.RoutingSettingsImpl.*")
Expand Up @@ -17,6 +17,7 @@ private[http] final case class RoutingSettingsImpl(
rangeCountLimit: Int,
rangeCoalescingThreshold: Long,
decodeMaxBytesPerChunk: Int,
decodeMaxSize: Long,
fileIODispatcher: String) extends akka.http.scaladsl.settings.RoutingSettings {

override def productPrefix = "RoutingSettings"
Expand All @@ -30,5 +31,6 @@ object RoutingSettingsImpl extends SettingsCompanion[RoutingSettingsImpl]("akka.
c getInt "range-count-limit",
c getBytes "range-coalescing-threshold",
c getIntBytes "decode-max-bytes-per-chunk",
c getPossiblyInfiniteBytes "decode-max-size",
c getString "file-io-dispatcher")
}
Expand Up @@ -28,6 +28,7 @@ abstract class RoutingSettings private[akka] () { self: RoutingSettingsImpl ⇒
def withRangeCountLimit(rangeCountLimit: Int): RoutingSettings = self.copy(rangeCountLimit = rangeCountLimit)
def withRangeCoalescingThreshold(rangeCoalescingThreshold: Long): RoutingSettings = self.copy(rangeCoalescingThreshold = rangeCoalescingThreshold)
def withDecodeMaxBytesPerChunk(decodeMaxBytesPerChunk: Int): RoutingSettings = self.copy(decodeMaxBytesPerChunk = decodeMaxBytesPerChunk)
def withDecodeMaxSize(decodeMaxSize: Long): RoutingSettings = self.copy(decodeMaxSize = decodeMaxSize)
def withFileIODispatcher(fileIODispatcher: String): RoutingSettings = self.copy(fileIODispatcher = fileIODispatcher)
}

Expand Down
Expand Up @@ -19,6 +19,7 @@ abstract class RoutingSettings private[akka] () extends akka.http.javadsl.settin
def rangeCountLimit: Int
def rangeCoalescingThreshold: Long
def decodeMaxBytesPerChunk: Int
def decodeMaxSize: Long
def fileIODispatcher: String

/* Java APIs */
Expand All @@ -28,6 +29,7 @@ abstract class RoutingSettings private[akka] () extends akka.http.javadsl.settin
def getRangeCountLimit: Int = rangeCountLimit
def getRangeCoalescingThreshold: Long = rangeCoalescingThreshold
def getDecodeMaxBytesPerChunk: Int = decodeMaxBytesPerChunk
def getDecodeMaxSize: Long = decodeMaxSize
def getFileIODispatcher: String = fileIODispatcher

override def withVerboseErrorMessages(verboseErrorMessages: Boolean): RoutingSettings = self.copy(verboseErrorMessages = verboseErrorMessages)
Expand All @@ -36,6 +38,7 @@ abstract class RoutingSettings private[akka] () extends akka.http.javadsl.settin
override def withRangeCountLimit(rangeCountLimit: Int): RoutingSettings = self.copy(rangeCountLimit = rangeCountLimit)
override def withRangeCoalescingThreshold(rangeCoalescingThreshold: Long): RoutingSettings = self.copy(rangeCoalescingThreshold = rangeCoalescingThreshold)
override def withDecodeMaxBytesPerChunk(decodeMaxBytesPerChunk: Int): RoutingSettings = self.copy(decodeMaxBytesPerChunk = decodeMaxBytesPerChunk)
override def withDecodeMaxSize(decodeMaxSize: Long): RoutingSettings = self.copy(decodeMaxSize = decodeMaxSize)
override def withFileIODispatcher(fileIODispatcher: String): RoutingSettings = self.copy(fileIODispatcher = fileIODispatcher)
}

Expand Down
Expand Up @@ -22,6 +22,7 @@ public void testCreateWithActorSystem() {
" range-coalescing-threshold = 80\n" +
" range-count-limit = 16\n" +
" decode-max-bytes-per-chunk = 1m\n" +
" decode-max-size = 8m\n" +
" file-io-dispatcher = \"test-only\"\n" +
"}";
Config config = ConfigFactory.parseString(testConfig);
Expand Down
Expand Up @@ -38,7 +38,7 @@ abstract class RequestParserSpec(mode: String, newLine: String) extends FreeSpec
akka.loglevel = WARNING
akka.http.parsing.max-header-value-length = 32
akka.http.parsing.max-uri-length = 40
akka.http.parsing.max-content-length = 4000000000""")
akka.http.parsing.max-content-length = infinite""")
implicit val system = ActorSystem(getClass.getSimpleName, testConf)
import system.dispatcher

Expand Down
Expand Up @@ -19,6 +19,7 @@ class SettingsEqualitySpec extends WordSpec with Matchers {
range-coalescing-threshold = 80
range-count-limit = 16
decode-max-bytes-per-chunk = 1m
decode-max-size = 8m
file-io-dispatcher = ${akka.stream.blocking-io-dispatcher}
}
""").withFallback(ConfigFactory.load).resolve
Expand Down
@@ -0,0 +1,241 @@
/*
* Copyright (C) 2009-2018 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.http.scaladsl.server

import akka.NotUsed

import scala.collection.immutable
import akka.actor.ActorSystem
import akka.http.scaladsl.client.RequestBuilding
import akka.http.scaladsl.coding.{ Decoder, Gzip }
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.HttpEntity.Chunk
import akka.http.scaladsl.model.headers.{ HttpEncoding, HttpEncodings, `Content-Encoding` }
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Flow, Source }
import akka.testkit.{ EventFilter, TestKit }
import akka.util.ByteString
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpec }
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{ Millis, Seconds, Span }

class SizeLimitSpec extends WordSpec with Matchers with RequestBuilding with BeforeAndAfterAll with ScalaFutures {

val maxContentLength = 800
// Protect network more than memory:
val decodeMaxSize = 1600

val testConf: Config = ConfigFactory.parseString(s"""
akka.loggers = ["akka.testkit.TestEventListener"]
akka.loglevel = ERROR
akka.stdout-loglevel = ERROR
akka.http.parsing.max-content-length = $maxContentLength
akka.http.routing.decode-max-size = $decodeMaxSize
""")
implicit val system = ActorSystem(getClass.getSimpleName, testConf)
import system.dispatcher
implicit val materializer = ActorMaterializer()
val random = new scala.util.Random(42)

implicit val defaultPatience = PatienceConfig(timeout = Span(2, Seconds), interval = Span(5, Millis))

"a normal route" should {
val route = path("noDirective") {
post {
entity(as[String]) { _
complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))
}
}
}

val binding = Http().bindAndHandle(route, "localhost", port = 0).futureValue

"accept small POST requests" in {
Http().singleRequest(Post(s"http:/${binding.localAddress}/noDirective", entityOfSize(maxContentLength)))
.futureValue.status shouldEqual StatusCodes.OK
}

"not accept entities bigger than configured with akka.http.parsing.max-content-length" in {
Http().singleRequest(Post(s"http:/${binding.localAddress}/noDirective", entityOfSize(maxContentLength + 1)))
.futureValue.status shouldEqual StatusCodes.BadRequest
}
}

"a route with decodeRequest" should {
val route = path("noDirective") {
decodeRequest {
post {
entity(as[String]) { e
complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, s"Got request with entity of ${e.length} characters"))
}
}
}
}

val binding = Http().bindAndHandle(route, "localhost", port = 0).futureValue

"accept a small request" in {
val response = Http().singleRequest(Post(s"http:/${binding.localAddress}/noDirective", entityOfSize(maxContentLength))).futureValue
response.status shouldEqual StatusCodes.OK
response.entity.dataBytes.runReduce(_ ++ _).futureValue.utf8String shouldEqual (s"Got request with entity of $maxContentLength characters")
}

"reject a small request that decodes into a large entity" in {
val data = ByteString.fromString("0" * (decodeMaxSize + 1))
val zippedData = Gzip.encode(data)
val request = HttpRequest(
HttpMethods.POST,
s"http:/${binding.localAddress}/noDirective",
immutable.Seq(`Content-Encoding`(HttpEncodings.gzip)),
HttpEntity(ContentTypes.`text/plain(UTF-8)`, zippedData))

zippedData.size should be <= maxContentLength
data.size should be > decodeMaxSize

Http().singleRequest(request)
.futureValue.status shouldEqual StatusCodes.BadRequest
}
}

"a route with decodeRequest that results in a large chunked entity" should {
val decoder = decodeTo(chunkedEntityOfSize(decodeMaxSize + 1))

val route = path("noDirective") {
decodeRequestWith(decoder) {
post {
entity(as[String]) { e
complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, s"Got request with entity of ${e.length} characters"))
}
}
}
}

val binding = Http().bindAndHandle(route, "localhost", port = 0).futureValue

"reject a small request that decodes into a large chunked entity" in {
val request = Post(s"http:/${binding.localAddress}/noDirective", "x").withHeaders(`Content-Encoding`(HttpEncoding("custom")))
val response = Http().singleRequest(request).futureValue
response.status shouldEqual StatusCodes.BadRequest
}
}

"a route with decodeRequest that results in a large non-chunked streaming entity" should {
val decoder = decodeTo(nonChunkedEntityOfSize(decodeMaxSize + 1))

val route = path("noDirective") {
decodeRequestWith(decoder) {
post {
entity(as[String]) { e
complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, s"Got request with entity of ${e.length} characters"))
}
}
}
}

val binding = Http().bindAndHandle(route, "localhost", port = 0).futureValue

"reject a small request that decodes into a large non-chunked streaming entity" in {
val request = Post(s"http:/${binding.localAddress}/noDirective", "x").withHeaders(`Content-Encoding`(HttpEncoding("custom")))
val response = Http().singleRequest(request).futureValue
response.status shouldEqual StatusCodes.BadRequest
}
}

"a route with decodeRequest followed by withoutSizeLimit" should {
val route = path("noDirective") {
decodeRequest {
withoutSizeLimit {
post {
entity(as[String]) { e
complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, s"Got request with entity of ${e.length} characters"))
}
}
}
}
}

val binding = Http().bindAndHandle(route, "localhost", port = 0).futureValue

"accept a small request" in {
Http().singleRequest(Post(s"http:/${binding.localAddress}/noDirective", entityOfSize(maxContentLength)))
.futureValue.status shouldEqual StatusCodes.OK
}

"accept a small request that decodes into a large entity" in {
val data = ByteString.fromString("0" * (decodeMaxSize + 1))
val zippedData = Gzip.encode(data)
val request = HttpRequest(
HttpMethods.POST,
s"http:/${binding.localAddress}/noDirective",
immutable.Seq(`Content-Encoding`(HttpEncodings.gzip)),
HttpEntity(ContentTypes.`text/plain(UTF-8)`, zippedData))

zippedData.size should be <= maxContentLength
data.size should be > decodeMaxSize

val response = Http().singleRequest(request).futureValue
response.status shouldEqual StatusCodes.OK
response.entity.dataBytes.runReduce(_ ++ _).futureValue.utf8String shouldEqual (s"Got request with entity of ${decodeMaxSize + 1} characters")
}

"accept a large request that decodes into a large entity" in {
val data = new Array[Byte](decodeMaxSize)
random.nextBytes(data)
val zippedData = Gzip.encode(ByteString(data))
val request = HttpRequest(
HttpMethods.POST,
s"http:/${binding.localAddress}/noDirective",
immutable.Seq(`Content-Encoding`(HttpEncodings.gzip)),
HttpEntity(ContentTypes.`text/plain(UTF-8)`, zippedData))

zippedData.size should be > maxContentLength
data.length should be <= decodeMaxSize

Http().singleRequest(request)
.futureValue.status shouldEqual StatusCodes.OK
}
}

"the withoutSizeLimit directive" should {
val route = path("withoutSizeLimit") {
post {
withoutSizeLimit {
entity(as[String]) { _
complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))
}
}
}
}

val binding = Http().bindAndHandle(route, "localhost", port = 0).futureValue

"accept entities bigger than configured with akka.http.parsing.max-content-length" in {
Http().singleRequest(Post(s"http:/${binding.localAddress}/withoutSizeLimit", entityOfSize(maxContentLength + 1)))
.futureValue.status shouldEqual StatusCodes.OK
}
}

override def afterAll() = TestKit.shutdownActorSystem(system)

private def byteSource(size: Int): Source[ByteString, Any] = Source(Array.fill[ByteString](size)(ByteString("0")).toVector)

private def chunkedEntityOfSize(size: Int) = HttpEntity.Chunked(ContentTypes.`text/plain(UTF-8)`, byteSource(size).map(Chunk(_)))
private def nonChunkedEntityOfSize(size: Int): MessageEntity = HttpEntity.Default(ContentTypes.`text/plain(UTF-8)`, size, byteSource(size))
private def entityOfSize(size: Int) = HttpEntity(ContentTypes.`text/plain(UTF-8)`, "0" * size)

private def decodeTo(result: MessageEntity): Decoder = new Decoder {
override def encoding: HttpEncoding = HttpEncoding("custom")

override def maxBytesPerChunk: Int = 1000
override def withMaxBytesPerChunk(maxBytesPerChunk: Int): Decoder = this

override def decoderFlow: Flow[ByteString, ByteString, NotUsed] = ???

override def decodeMessage(message: HttpMessage) = message.withEntity(result)
}
}
9 changes: 9 additions & 0 deletions akka-http/src/main/resources/reference.conf
Expand Up @@ -37,6 +37,15 @@ akka.http.routing {
# for an entity data stream.
decode-max-bytes-per-chunk = 1m

# Maximum content length after applying a decoding directive. When the directive
# decompresses, for example, an entity compressed with gzip, the resulting stream can be much
# larger than the max-content-length. Like with max-content-length, this is not necessarilly a
# problem when consuming the entity in a streaming fashion, but does risk high memory use
# when the entity is made strict or marshalled into an in-memory object.
# This limit (like max-content-length) can be overridden on a case-by-case basis using the
# withSizeLimit directive.
decode-max-size = 8m

# Fully qualified config path which holds the dispatcher configuration
# to be used by FlowMaterialiser when creating Actors for IO operations.
file-io-dispatcher = ${akka.stream.blocking-io-dispatcher}
Expand Down
Expand Up @@ -13,13 +13,19 @@ import headers.HttpEncoding
import akka.stream.scaladsl.{ Flow, Sink, Source }

import scala.concurrent.Future
import scala.util.control.NonFatal

trait Decoder {
def encoding: HttpEncoding

def decodeMessage(message: HttpMessage): message.Self =
if (message.headers exists Encoder.isContentEncodingHeader)
message.transformEntityDataBytes(decoderFlow).withHeaders(message.headers filterNot Encoder.isContentEncodingHeader)
message.transformEntityDataBytes(decoderFlow.recover {
case NonFatal(e)
throw IllegalRequestException(
StatusCodes.BadRequest,
ErrorInfo("The request's encoding is corrupt", e.getMessage))
}).withHeaders(message.headers filterNot Encoder.isContentEncodingHeader)
else message.self

@deprecated("Use Decoder#decodeMessage instead. No need for implicit mapper.", since = "10.0.6")
Expand Down
Expand Up @@ -80,16 +80,7 @@ trait CodingDirectives {
else
extractSettings flatMap { settings
val effectiveDecoder = decoder.withMaxBytesPerChunk(settings.decodeMaxBytesPerChunk)
mapRequest { request
effectiveDecoder.decodeMessage(request).mapEntity { entity
entity.transformDataBytes(Flow[ByteString].recover {
case NonFatal(e)
throw IllegalRequestException(
StatusCodes.BadRequest,
ErrorInfo("The request's encoding is corrupt", e.getMessage))
})
}
}
mapRequest(effectiveDecoder.decodeMessage(_)) & withSizeLimit(settings.decodeMaxSize)
}

requestEntityEmpty | (
Expand Down
Expand Up @@ -10,7 +10,11 @@

## Description

Decompresses the incoming request if it is `gzip` or `deflate` compressed. Uncompressed requests are passed through untouched. If the request encoded with another encoding the request is rejected with an @unidoc[UnsupportedRequestEncodingRejection].
Decompresses the incoming request if it is `gzip` or `deflate` compressed. Uncompressed requests are passed through untouched.
If the request encoded with another encoding the request is rejected with an @unidoc[UnsupportedRequestEncodingRejection].
If the request entity after decoding exceeds `akka.http.routing.decode-max-size` the stream fails with an
@unidoc[akka.http.scaladsl.model.EntityStreamSizeException].


## Example

Expand Down

0 comments on commit 3d8cac3

Please sign in to comment.