-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for asynchronous body stream writing #2939
Conversation
Joannis
commented
Jan 19, 2023
- Fixes Crash during upload #2930 - a crash when users try to write a body from within a task towards the ELF APIs.
- Introduces a new API for writing chunked HTTP response bodies
- Adds a helper that automatically managed failing and closing streams
… crashes when users try to write a body from within a task towards the ELF APIs
… crashes when users try to write a body from within a task towards the ELF APIs
… into feature/jo-async-body-stream # Conflicts: # Tests/VaporTests/PipelineTests.swift
@gwynne what's your verdict on the crash inside Request.Body's |
@Joannis It is definitely a blocker. It's obviously a race condition, and not an easy one to hit. |
Have you been able to reproduce the failure with Thread Sanitizer enabled at all? |
Even though I didn't touch the crashing code? |
I'll check with TSAN later. |
Odds are very good you either 1) revealed an existing race condition we hadn't happened to trip over yet, or 2) introduced one in the new code that nonetheless happens to manifest deeper in the guts of things (a fairly strong probability pretty much by definition, given it's a precondition that's failing). |
(Or, even more painfully, both...) |
I suppose, it's an already existing issue, just had some trouble when using the Request.Body as an AsyncSequence. I'm very much looking forward for this PR & fix. Thank you. 🙏 |
Found it, it's really stupid and really just a bug in the test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this! This looks in a good shape. There are a couple of left over TODO
s we need to resolve and all new public APIs will need DocC comments then we're good to merge
@@ -51,3 +51,14 @@ extension BodyStreamWriter { | |||
return promise.futureResult | |||
} | |||
} | |||
|
|||
public protocol AsyncBodyStreamWriter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add DocC comments to these as they're public? Thanks!
public init(asyncStream: @escaping @Sendable (AsyncBodyStreamWriter) async throws -> (), count: Int, byteBufferAllocator: ByteBufferAllocator = ByteBufferAllocator()) { | ||
self.byteBufferAllocator = byteBufferAllocator | ||
self.storage = .asyncStream(.init(count: count, callback: asyncStream)) | ||
} | ||
|
||
public init(asyncStream: @escaping @Sendable (AsyncBodyStreamWriter) async throws -> (), byteBufferAllocator: ByteBufferAllocator = ByteBufferAllocator()) { | ||
self.init(asyncStream: asyncStream, count: -1, byteBufferAllocator: byteBufferAllocator) | ||
} | ||
|
||
public init(managedAsyncStream: @escaping @Sendable (AsyncBodyStreamWriter) async throws -> (), count: Int, byteBufferAllocator: ByteBufferAllocator = ByteBufferAllocator()) { | ||
self.byteBufferAllocator = byteBufferAllocator | ||
self.storage = .asyncStream(.init(count: count) { writer in | ||
do { | ||
try await managedAsyncStream(writer) | ||
try await writer.write(.end) | ||
} catch { | ||
try await writer.write(.error(error)) | ||
} | ||
}) | ||
} | ||
|
||
public init(managedAsyncStream: @escaping @Sendable (AsyncBodyStreamWriter) async throws -> (), byteBufferAllocator: ByteBufferAllocator = ByteBufferAllocator()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
New public APIs will need DocC comments
@@ -159,6 +169,31 @@ extension Response { | |||
self.init(stream: stream, count: -1, byteBufferAllocator: byteBufferAllocator) | |||
} | |||
|
|||
public init(asyncStream: @escaping @Sendable (AsyncBodyStreamWriter) async throws -> (), count: Int, byteBufferAllocator: ByteBufferAllocator = ByteBufferAllocator()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm interested to see how Sendable
works on 5.5 - but we're ready to bump to 5.6 if needed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sendable does work in 5.5.
Tests/VaporTests/PipelineTests.swift
Outdated
@@ -56,6 +57,73 @@ final class PipelineTests: XCTestCase { | |||
try XCTAssertEqual(channel.readOutbound(as: ByteBuffer.self)?.string, "0\r\n\r\n") | |||
try XCTAssertNil(channel.readOutbound(as: ByteBuffer.self)?.string) | |||
} | |||
|
|||
#if compiler(>=5.7) && canImport(_Concurrency) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should test on 5.6 as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks! Looks good to go, just a 5.6 compiler issue to fix
These changes are now available in 4.73.0 |
Hi all, I was looking forward to this feature... Any news on if/when it will be merged into main? Thanks! |
Same here. I'd love to get some updates about this feature. Thank you. 🙏 |
@Joannis, if you want to open a fresh PR from the branch (and update it to reflect 5.6 now being the minimum Swift version - don't need all the conditionals anymore!), we can re-review and make sure there aren't any outstanding concerns remaining. |