Skip to content

Commit

Permalink
Add support for asynchronous body stream writing (#2939)
Browse files Browse the repository at this point in the history
* Add support for asynchronous body stream writing, thereby also fixing crashes when users try to write a body from within a task towards the ELF APIs

* Fix compilation for Swift 5.6 inference

* Add support for asynchronous body stream writing, thereby also fixing crashes when users try to write a body from within a task towards the ELF APIs

* Fix compilation for Swift 5.6 inference

* Fix older swift compilign tests

* Fix the test, as you cannot write a response while the request is still inbound

* Add docc comments, remove TODOs and converted them into an issue

* Be explicit in the return tpe that fails to infer on Swift 5.6

---------

Co-authored-by: Gwynne Raskind <gwynne@vapor.codes>
Co-authored-by: Tim Condon <0xTim@users.noreply.github.com>
  • Loading branch information
3 people committed Mar 9, 2023
1 parent 4889a7f commit 07b8b5d
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 8 deletions.
2 changes: 1 addition & 1 deletion Sources/Vapor/Concurrency/Request+Concurrency.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#if compiler(>=5.7) && canImport(_Concurrency)
#if compiler(>=5.6) && canImport(_Concurrency)
import NIOCore
import NIOConcurrencyHelpers

Expand Down
20 changes: 20 additions & 0 deletions Sources/Vapor/HTTP/BodyStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,12 @@ extension BodyStreamResult: CustomDebugStringConvertible {
}
}

/// A type that represents the writable handle of a streamed ``Response`` body.
public protocol BodyStreamWriter {
/// The eventloop upon which writes must be sent
var eventLoop: EventLoop { get }

/// Writes an event to a streaming HTTP body. If the `result` is `.end` or `.error`, the stream ends.
func write(_ result: BodyStreamResult, promise: EventLoopPromise<Void>?)
}

Expand All @@ -51,3 +55,19 @@ extension BodyStreamWriter {
return promise.futureResult
}
}

/// A type that represents the writable handle of a streamed ``Response`` body
public protocol AsyncBodyStreamWriter {
/// Writes an event to a streaming HTTP body. If the `result` is `.end` or `.error`, the stream ends.
func write(_ result: BodyStreamResult) async throws

/// Writes a `ByteBuffer` to the stream. Provides a default implementation that calls itself using `BodyStreamResult`
func writeBuffer(_ buffer: ByteBuffer) async throws
}

extension AsyncBodyStreamWriter {
/// Writes the buffer wrapped in a ``BodyStreamResult`` to `self`
public func writeBuffer(_ buffer: ByteBuffer) async throws {
try await write(.buffer(buffer))
}
}
25 changes: 23 additions & 2 deletions Sources/Vapor/HTTP/Server/HTTPServerResponseEncoder.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ final class HTTPServerResponseEncoder: ChannelOutboundHandler, RemovableChannelH
status: response.status,
headers: response.headers
))), promise: nil)


if response.status == .noContent || response.forHeadRequest {
// don't send bodies for 204 (no content) responses
Expand Down Expand Up @@ -66,6 +65,17 @@ final class HTTPServerResponseEncoder: ChannelOutboundHandler, RemovableChannelH
count: stream.count == -1 ? nil : stream.count
)
stream.callback(channelStream)
case .asyncStream(let stream):
let channelStream = ChannelResponseBodyStream(
context: context,
handler: self,
promise: nil,
count: stream.count == -1 ? nil : stream.count
)

promise?.completeWithTask {
try await stream.callback(channelStream)
}
}
}
}
Expand All @@ -80,7 +90,7 @@ final class HTTPServerResponseEncoder: ChannelOutboundHandler, RemovableChannelH
}
}

private final class ChannelResponseBodyStream: BodyStreamWriter {
private final class ChannelResponseBodyStream: BodyStreamWriter, AsyncBodyStreamWriter {
let context: ChannelHandlerContext
let handler: HTTPServerResponseEncoder
let promise: EventLoopPromise<Void>?
Expand Down Expand Up @@ -111,16 +121,27 @@ private final class ChannelResponseBodyStream: BodyStreamWriter {
self.isComplete = false
}

func write(_ result: BodyStreamResult) async throws {
// Explicitly adds the ELF because Swift 5.6 fails to infer the return type
try await self.eventLoop.flatSubmit { () -> EventLoopFuture<Void> in
let promise = self.eventLoop.makePromise(of: Void.self)
self.write(result, promise: promise)
return promise.futureResult
}.get()
}

func write(_ result: BodyStreamResult, promise: EventLoopPromise<Void>?) {
switch result {
case .buffer(let buffer):
// See: https://github.com/vapor/vapor/issues/2976
self.context.writeAndFlush(self.handler.wrapOutboundOut(.body(.byteBuffer(buffer))), promise: promise)
self.currentCount += buffer.readableBytes
if let count = self.count, self.currentCount > count {
self.promise?.fail(Error.tooManyBytes)
promise?.fail(Error.notEnoughBytes)
}
case .end:
// See: https://github.com/vapor/vapor/issues/2976
self.isComplete = true
if let count = self.count, self.currentCount != count {
self.promise?.fail(Error.notEnoughBytes)
Expand Down
11 changes: 10 additions & 1 deletion Sources/Vapor/Request/Request+BodyStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import NIOCore
import NIOConcurrencyHelpers

extension Request {
final class BodyStream: BodyStreamWriter {
final class BodyStream: BodyStreamWriter, AsyncBodyStreamWriter {
let eventLoop: EventLoop

var isBeingRead: Bool {
Expand All @@ -28,6 +28,15 @@ extension Request {
}
self.buffer = []
}

func write(_ result: BodyStreamResult) async throws {
// Explicitly adds the ELF because Swift 5.6 fails to infer the return type
try await self.eventLoop.flatSubmit { () -> EventLoopFuture<Void> in
let promise = self.eventLoop.makePromise(of: Void.self)
self.write0(result, promise: promise)
return promise.futureResult
}.get()
}

func write(_ chunk: BodyStreamResult, promise: EventLoopPromise<Void>?) {
// See https://github.com/vapor/vapor/issues/2906
Expand Down
58 changes: 58 additions & 0 deletions Sources/Vapor/Response/Response+Body.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ extension Response {
let count: Int
let callback: (BodyStreamWriter) -> ()
}

struct AsyncBodyStream {
let count: Int
let callback: @Sendable (AsyncBodyStreamWriter) async throws -> ()
}

/// Represents a `Response`'s body.
///
Expand All @@ -23,6 +28,7 @@ extension Response {
case staticString(StaticString)
case string(String)
case stream(BodyStream)
case asyncStream(AsyncBodyStream)
}

/// An empty `Response.Body`.
Expand Down Expand Up @@ -50,6 +56,7 @@ extension Response {
case .buffer(let buffer): return buffer.readableBytes
case .none: return 0
case .stream(let stream): return stream.count
case .asyncStream(let stream): return stream.count
}
}

Expand All @@ -63,6 +70,7 @@ extension Response {
case .string(let string): return Data(string.utf8)
case .none: return nil
case .stream: return nil
case .asyncStream: return nil
}
}

Expand All @@ -83,6 +91,7 @@ extension Response {
return buffer
case .none: return nil
case .stream: return nil
case .asyncStream: return nil
}
}

Expand All @@ -108,6 +117,7 @@ extension Response {
case .staticString(let string): return string.description
case .string(let string): return string
case .stream: return "<stream>"
case .asyncStream: return "<async stream>"
}
}

Expand Down Expand Up @@ -159,6 +169,54 @@ extension Response {
self.init(stream: stream, count: -1, byteBufferAllocator: byteBufferAllocator)
}

/// Creates a chunked HTTP ``Response`` steam using ``AsyncBodyStreamWriter``.
///
/// - Parameters:
/// - asyncStream: The closure that will generate the results. **MUST** call `.end` or `.error` when terminating the stream
/// - count: The amount of bytes that will be written. The `asyncStream` **MUST** produce exactly `count` bytes.
/// - byteBufferAllocator: The allocator that is preferred when writing data to SwiftNIO
public init(asyncStream: @escaping @Sendable (AsyncBodyStreamWriter) async throws -> (), count: Int, byteBufferAllocator: ByteBufferAllocator = ByteBufferAllocator()) {
self.byteBufferAllocator = byteBufferAllocator
self.storage = .asyncStream(.init(count: count, callback: asyncStream))
}

/// Creates a chunked HTTP ``Response`` steam using ``AsyncBodyStreamWriter``.
///
/// - Parameters:
/// - asyncStream: The closure that will generate the results. **MUST** call `.end` or `.error` when terminating the stream
/// - byteBufferAllocator: The allocator that is preferred when writing data to SwiftNIO
public init(asyncStream: @escaping @Sendable (AsyncBodyStreamWriter) async throws -> (), byteBufferAllocator: ByteBufferAllocator = ByteBufferAllocator()) {
self.init(asyncStream: asyncStream, count: -1, byteBufferAllocator: byteBufferAllocator)
}

/// Creates a _managed_ chunked HTTP ``Response`` steam using ``AsyncBodyStreamWriter`` that automtically closes or fails based if the closure throws an error or returns.
///
/// - Parameters:
/// - asyncStream: The closure that will generate the results, which **MUST NOT** call `.end` or `.error` when terminating the stream.
/// - count: The amount of bytes that will be written. The `asyncStream` **MUST** produce exactly `count` bytes.
/// - byteBufferAllocator: The allocator that is preferred when writing data to SwiftNIO
public init(managedAsyncStream: @escaping @Sendable (AsyncBodyStreamWriter) async throws -> (), count: Int, byteBufferAllocator: ByteBufferAllocator = ByteBufferAllocator()) {
self.byteBufferAllocator = byteBufferAllocator
self.storage = .asyncStream(.init(count: count) { writer in
do {
try await managedAsyncStream(writer)
try await writer.write(.end)
} catch {
try await writer.write(.error(error))
}
})
}

/// Creates a _managed_ chunked HTTP ``Response`` steam using ``AsyncBodyStreamWriter`` that automtically closes or fails based if the closure throws an error or returns.
///
/// - Parameters:
/// - asyncStream: The closure that will generate the results, which **MUST NOT** call `.end` or `.error` when terminating the stream.
/// - count: The amount of bytes that will be written
/// - byteBufferAllocator: The allocator that is preferred when writing data to SwiftNIO
public init(managedAsyncStream: @escaping @Sendable (AsyncBodyStreamWriter) async throws -> (), byteBufferAllocator: ByteBufferAllocator = ByteBufferAllocator()) {
self.init(managedAsyncStream: managedAsyncStream, count: -1, byteBufferAllocator: byteBufferAllocator)
}

/// `ExpressibleByStringLiteral` conformance.
public init(stringLiteral value: String) {
self.byteBufferAllocator = ByteBufferAllocator()
Expand Down
7 changes: 3 additions & 4 deletions Tests/AsyncTests/AsyncRequestTests.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#if compiler(>=5.7) && canImport(_Concurrency)
#if compiler(>=5.6) && canImport(_Concurrency)
import XCTVapor
import XCTest
import Vapor
Expand All @@ -23,15 +23,14 @@ final class AsyncRequestTests: XCTestCase {

let testValue = String.randomDigits()

app.on(.POST, "stream", body: .stream) { req in
app.on(.POST, "stream", body: .stream) { req -> String in
var recievedBuffer = ByteBuffer()
for try await part in req.body {
XCTAssertNotNil(part)
var part = part
recievedBuffer.writeBuffer(&part)
}
let string = String(buffer: recievedBuffer)
return string
return String(buffer: recievedBuffer)
}

try app.testable().test(.POST, "/stream", beforeRequest: { req in
Expand Down
68 changes: 68 additions & 0 deletions Tests/VaporTests/PipelineTests.swift
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
@testable import Vapor
import XCTest
import AsyncHTTPClient
import NIOEmbedded
import NIOCore

Expand Down Expand Up @@ -56,6 +57,73 @@ final class PipelineTests: XCTestCase {
try XCTAssertEqual(channel.readOutbound(as: ByteBuffer.self)?.string, "0\r\n\r\n")
try XCTAssertNil(channel.readOutbound(as: ByteBuffer.self)?.string)
}

#if compiler(>=5.6) && canImport(_Concurrency)
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
func testAsyncEchoHandlers() async throws {
let app = Application(.testing)
defer { app.shutdown() }


app.on(.POST, "echo", body: .stream) { request async throws -> Response in
var buffers = [ByteBuffer]()

for try await buffer in request.body {
buffers.append(buffer)
}

return Response(body: .init(managedAsyncStream: { [buffers] writer in
for buffer in buffers {
try await writer.writeBuffer(buffer)
}
}))
}

try app.start()

guard
let localAddress = app.http.server.shared.localAddress,
let port = localAddress.port
else {
XCTFail("couldn't get port from \(app.http.server.shared.localAddress.debugDescription)")
return
}

let client = HTTPClient(eventLoopGroupProvider: .createNew)

let chunks = [
"1\r\n",
"a",
"\r\n",
"1\r\n",
"b",
"\r\n",
"1\r\n",
"c",
"\r\n",
]

let response = try await client.post(url: "http://localhost:\(port)/echo", body: .stream { writer in
@Sendable func write(chunks: [String]) -> EventLoopFuture<Void> {
var chunks = chunks
let chunk = chunks.removeFirst()

if chunks.isEmpty {
return writer.write(.byteBuffer(ByteBuffer(string: chunk)))
} else {
return writer.write(.byteBuffer(ByteBuffer(string: chunk))).flatMap { [chunks] in
return write(chunks: chunks)
}
}
}

return write(chunks: chunks)
}).get()

XCTAssertEqual(response.body?.string, chunks.joined(separator: ""))
try await client.shutdown()
}
#endif

func testEOFFraming() throws {
let app = Application(.testing)
Expand Down

0 comments on commit 07b8b5d

Please sign in to comment.