Skip to content

Commit

Permalink
Fix HTTP request decoder race (#2132)
Browse files Browse the repository at this point in the history
* fix HTTPServerRequestDecoder stream race

* rm version change

* downstreamIsReady

* write(_:to:)
  • Loading branch information
tanner0101 committed Dec 26, 2019
1 parent 11ed61f commit adcce86
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 19 deletions.
5 changes: 4 additions & 1 deletion Package.swift
Expand Up @@ -85,6 +85,9 @@ let package = Package(

// Testing
.target(name: "XCTVapor", dependencies: ["Vapor"]),
.testTarget(name: "VaporTests", dependencies: ["XCTVapor"]),
.testTarget(name: "VaporTests", dependencies: [
"NIOTestUtils",
"XCTVapor"
]),
]
)
15 changes: 14 additions & 1 deletion Sources/Vapor/Request/Request+BodyStream.swift
Expand Up @@ -25,7 +25,6 @@ extension Request {
if case .end = chunk {
self.isClosed = true
}

if let handler = handler {
handler(chunk, promise)
} else {
Expand Down Expand Up @@ -53,3 +52,17 @@ extension Request {
}
}
}

extension BodyStreamResult: CustomStringConvertible {
public var description: String {
switch self {
case .buffer(let buffer):
let value = String(decoding: buffer.readableBytesView, as: UTF8.self)
return "buffer(\(value))"
case .error(let error):
return "error(\(error))"
case .end:
return "end"
}
}
}
29 changes: 12 additions & 17 deletions Sources/Vapor/Server/HTTPServerRequestDecoder.swift
Expand Up @@ -23,7 +23,7 @@ final class HTTPServerRequestDecoder: ChannelDuplexHandler, RemovableChannelHand

private let logger: Logger

var isWritable: Bool
var pendingWriteCount: Int
var hasReadPending: Bool
var application: Application

Expand All @@ -32,7 +32,7 @@ final class HTTPServerRequestDecoder: ChannelDuplexHandler, RemovableChannelHand
self.maxBodySize = maxBodySize
self.requestState = .ready
self.logger = Logger(label: "codes.vapor.server")
self.isWritable = true
self.pendingWriteCount = 0
self.hasReadPending = false
}

Expand Down Expand Up @@ -72,15 +72,11 @@ final class HTTPServerRequestDecoder: ChannelDuplexHandler, RemovableChannelHand
let stream = Request.BodyStream(on: context.eventLoop)
request.bodyStorage = .stream(stream)
context.fireChannelRead(self.wrapInboundOut(request))
let done = stream.write(.buffer(previousBuffer)).flatMap {
stream.write(.buffer(buffer))
}
self.updateReadability(done, context: context)
self.write(.buffer(previousBuffer), to: stream, context: context)
self.write(.buffer(buffer), to: stream, context: context)
self.requestState = .streamingBody(stream)
case .streamingBody(let stream):
self.isWritable = false
let done = stream.write(.buffer(buffer))
self.updateReadability(done, context: context)
self.write(.buffer(buffer), to: stream, context: context)
}
case .end(let tailHeaders):
assert(tailHeaders == nil, "Tail headers are not supported.")
Expand All @@ -92,28 +88,27 @@ final class HTTPServerRequestDecoder: ChannelDuplexHandler, RemovableChannelHand
request.bodyStorage = .collected(buffer)
context.fireChannelRead(self.wrapInboundOut(request))
case .streamingBody(let stream):
let done = stream.write(.end)
self.updateReadability(done, context: context)
self.write(.end, to: stream, context: context)
}
self.requestState = .ready
}
}

func read(context: ChannelHandlerContext) {
if self.isWritable {
if self.pendingWriteCount <= 0 {
context.read()
} else {
self.hasReadPending = true
}
}

func updateReadability(_ future: EventLoopFuture<Void>, context: ChannelHandlerContext) {
self.isWritable = false
future.whenComplete { result in
self.isWritable = true
func write(_ part: BodyStreamResult, to stream: Request.BodyStream, context: ChannelHandlerContext) {
self.pendingWriteCount += 1
stream.write(part).whenComplete { result in
self.pendingWriteCount -= 1
if self.hasReadPending {
self.hasReadPending = false
context.read()
self.read(context: context)
}
switch result {
case .failure(let error):
Expand Down
22 changes: 22 additions & 0 deletions Tests/VaporTests/ApplicationTests.swift
Expand Up @@ -1230,6 +1230,28 @@ final class ApplicationTests: XCTestCase {

XCTAssertEqual(res, decoded)
}

func testMultipleChunkBody() throws {
let app = Application(.testing)
defer { app.shutdown() }

let payload = [UInt8].random(count: 1 << 20)

app.post("payload") { req -> HTTPStatus in
guard let data = req.body.data else {
throw Abort(.internalServerError)
}
XCTAssertEqual(payload.count, data.readableBytes)
XCTAssertEqual([UInt8](data.readableBytesView), payload)
return .ok
}

var buffer = ByteBufferAllocator().buffer(capacity: payload.count)
buffer.writeBytes(payload)
try app.testable(method: .running).test(.POST, "payload", body: buffer) { res in
XCTAssertEqual(res.status, .ok)
}
}
}

private extension ByteBuffer {
Expand Down

0 comments on commit adcce86

Please sign in to comment.