Skip to content

Commit

Permalink
Async stream initializers order matches existing
Browse files Browse the repository at this point in the history
Motivation:

When adding in the SPI methods for exposing async sequences of HTTP/2
streams we moved the stream initialization to a subtly different
location so that it was easier to exfiltrate the outputs of those
initialization functions (such as protocol negotiation outputs).

In some cases this broke ordering expectations.

Modifications:

Yield the (optionally wrapped) channels as initializer outputs to the async stream

Result:

Changes only exist within SPI. Async inbound stream channel
initialization now matches previous behavior.
  • Loading branch information
rnro committed Aug 11, 2023
1 parent 2bf5733 commit 53be8d4
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 111 deletions.
27 changes: 17 additions & 10 deletions Sources/NIOHTTP2/HTTP2ChannelHandler+InlineStreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,29 @@ internal struct InlineStreamMultiplexer {
/// The delegate to be notified upon stream creation and close.
private var streamDelegate: NIOHTTP2StreamDelegate?

init(context: ChannelHandlerContext, outboundView: NIOHTTP2Handler.OutboundView, mode: NIOHTTP2Handler.ParserMode, inboundStreamStateInitializer: MultiplexerAbstractChannel.InboundStreamStateInitializer, targetWindowSize: Int, streamChannelOutboundBytesHighWatermark: Int, streamChannelOutboundBytesLowWatermark: Int, streamDelegate: NIOHTTP2StreamDelegate?) {
internal var streamInitializerProductContinuation: (any AnyContinuation)? {
self.commonStreamMultiplexer.streamInitializerProductContinuation
}

init(
context: ChannelHandlerContext,
outboundView: NIOHTTP2Handler.OutboundView,
mode: NIOHTTP2Handler.ParserMode,
inboundStreamStateInitializer: MultiplexerAbstractChannel.InboundStreamStateInitializer,
targetWindowSize: Int, streamChannelOutboundBytesHighWatermark: Int,
streamChannelOutboundBytesLowWatermark: Int,
streamDelegate: NIOHTTP2StreamDelegate?,
streamInitializerProductContinuation: (any AnyContinuation)?
) {
self.context = context
self.commonStreamMultiplexer = HTTP2CommonInboundStreamMultiplexer(
mode: mode,
channel: context.channel,
inboundStreamStateInitializer: inboundStreamStateInitializer,
targetWindowSize: targetWindowSize,
streamChannelOutboundBytesHighWatermark: streamChannelOutboundBytesHighWatermark,
streamChannelOutboundBytesLowWatermark: streamChannelOutboundBytesLowWatermark
streamChannelOutboundBytesLowWatermark: streamChannelOutboundBytesLowWatermark,
streamInitializerProductContinuation: streamInitializerProductContinuation
)
self.outboundView = outboundView
self.streamDelegate = streamDelegate
Expand Down Expand Up @@ -206,12 +220,6 @@ extension NIOHTTP2Handler {
}
}

extension InlineStreamMultiplexer {
func setChannelContinuation(_ streamChannels: any ChannelContinuation) {
self.commonStreamMultiplexer.setChannelContinuation(streamChannels)
}
}

extension NIOHTTP2Handler {
/// A variant of `NIOHTTP2Handler.StreamMultiplexer` which creates a child channel for each HTTP/2 stream and
/// provides access to inbound HTTP/2 streams.
Expand All @@ -232,9 +240,8 @@ extension NIOHTTP2Handler {
public let inbound: NIOHTTP2InboundStreamChannels<InboundStreamOutput>

// Cannot be created by users.
internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, continuation: any ChannelContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<InboundStreamOutput>) {
internal init(_ inlineStreamMultiplexer: InlineStreamMultiplexer, inboundStreamChannels: NIOHTTP2InboundStreamChannels<InboundStreamOutput>) {
self.inlineStreamMultiplexer = inlineStreamMultiplexer
self.inlineStreamMultiplexer.setChannelContinuation(continuation)
self.inbound = inboundStreamChannels
}

Expand Down
46 changes: 40 additions & 6 deletions Sources/NIOHTTP2/HTTP2ChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler {
/// - `InboundStreamMultiplexer`: The component responsible for (de)multiplexing inbound streams.
private enum InboundStreamMultiplexerState {
case uninitializedLegacy
case uninitializedInline(StreamConfiguration, StreamInitializer, NIOHTTP2StreamDelegate?)
case uninitializedInline(StreamConfiguration, StreamInitializer, NIOHTTP2StreamDelegate?, (any AnyContinuation)?)
case initialized(InboundStreamMultiplexer)
case deinitialized

Expand All @@ -139,7 +139,7 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler {
case .uninitializedLegacy:
self = .initialized(.legacy(LegacyInboundStreamMultiplexer(context: context)))

case .uninitializedInline(let streamConfiguration, let inboundStreamInitializer, let streamDelegate):
case .uninitializedInline(let streamConfiguration, let inboundStreamInitializer, let streamDelegate, let streamInitializerProductContinuation):
self = .initialized(.inline(
InlineStreamMultiplexer(
context: context,
Expand All @@ -149,7 +149,8 @@ public final class NIOHTTP2Handler: ChannelDuplexHandler {
targetWindowSize: max(0, min(streamConfiguration.targetWindowSize, Int(Int32.max))),
streamChannelOutboundBytesHighWatermark: streamConfiguration.outboundBufferSizeHighWatermark,
streamChannelOutboundBytesLowWatermark: streamConfiguration.outboundBufferSizeLowWatermark,
streamDelegate: streamDelegate
streamDelegate: streamDelegate,
streamInitializerProductContinuation: streamInitializerProductContinuation
)
))

Expand Down Expand Up @@ -1022,7 +1023,40 @@ extension NIOHTTP2Handler {
maximumSequentialEmptyDataFrames: connectionConfiguration.maximumSequentialEmptyDataFrames,
maximumBufferedControlFrames: connectionConfiguration.maximumBufferedControlFrames
)
self.inboundStreamMultiplexerState = .uninitializedInline(streamConfiguration, inboundStreamInitializer, streamDelegate)
self.inboundStreamMultiplexerState = .uninitializedInline(streamConfiguration, inboundStreamInitializer, streamDelegate, nil)
}

/// Creates a new ``NIOHTTP2Handler`` with a local multiplexer which yields inbound stream initializer products to
/// an async stream.
///
/// Frames on the root stream will continue to be passed down the main channel, whereas those intended for
/// other streams will be forwarded to the appropriate child channel.
///
/// To create streams using the local multiplexer, first obtain it via the computed property (`multiplexer`)
/// and then invoke one of the `multiplexer.createStreamChannel` methods. If possible the multiplexer should be
/// stored and used across multiple invocations because obtaining it requires synchronizing on the event loop.
///
/// the `streamInitializerProductContinuation` will be used to asynchronously stream inbound stream initializer products.
///
/// The optional `streamDelegate` will be notified of stream creation and close events.
internal convenience init(
mode: ParserMode,
eventLoop: EventLoop,
connectionConfiguration: ConnectionConfiguration = .init(),
streamConfiguration: StreamConfiguration = .init(),
streamDelegate: NIOHTTP2StreamDelegate? = nil,
streamInitializerProductContinuation: any AnyContinuation,
inboundStreamInitializer: @escaping StreamInitializer
) {
self.init(mode: mode,
eventLoop: eventLoop,
initialSettings: connectionConfiguration.initialSettings,
headerBlockValidation: connectionConfiguration.headerBlockValidation,
contentLengthValidation: connectionConfiguration.contentLengthValidation,
maximumSequentialEmptyDataFrames: connectionConfiguration.maximumSequentialEmptyDataFrames,
maximumBufferedControlFrames: connectionConfiguration.maximumBufferedControlFrames
)
self.inboundStreamMultiplexerState = .uninitializedInline(streamConfiguration, inboundStreamInitializer, streamDelegate, streamInitializerProductContinuation)
}

/// Connection-level configuration.
Expand Down Expand Up @@ -1101,12 +1135,12 @@ extension NIOHTTP2Handler {
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
internal func syncAsyncStreamMultiplexer<Output>(continuation: any ChannelContinuation, inboundStreamChannels: NIOHTTP2InboundStreamChannels<Output>) throws -> AsyncStreamMultiplexer<Output> {
internal func syncAsyncStreamMultiplexer<Output>(inboundStreamChannels: NIOHTTP2InboundStreamChannels<Output>) throws -> AsyncStreamMultiplexer<Output> {
self.eventLoop!.preconditionInEventLoop()

switch self.inboundStreamMultiplexer {
case let .some(.inline(multiplexer)):
return AsyncStreamMultiplexer(multiplexer, continuation: continuation, inboundStreamChannels: inboundStreamChannels)
return AsyncStreamMultiplexer(multiplexer, inboundStreamChannels: inboundStreamChannels)
case .some(.legacy), .none:
throw NIOHTTP2Errors.missingMultiplexer()
}
Expand Down
151 changes: 67 additions & 84 deletions Sources/NIOHTTP2/HTTP2CommonInboundStreamMultiplexer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,25 @@ internal class HTTP2CommonInboundStreamMultiplexer {
private var isReading = false
private var flushPending = false

var streamChannelContinuation: (any ChannelContinuation)?
var streamInitializerProductContinuation: (any AnyContinuation)?

init(
mode: NIOHTTP2Handler.ParserMode,
channel: Channel,
inboundStreamStateInitializer: MultiplexerAbstractChannel.InboundStreamStateInitializer,
targetWindowSize: Int,
streamChannelOutboundBytesHighWatermark: Int,
streamChannelOutboundBytesLowWatermark: Int
streamChannelOutboundBytesLowWatermark: Int,
streamInitializerProductContinuation: (any AnyContinuation)?
) {
self.channel = channel
self.inboundStreamStateInitializer = inboundStreamStateInitializer
self.targetWindowSize = targetWindowSize
self.connectionFlowControlManager = InboundWindowManager(targetSize: Int32(targetWindowSize))
self.streamChannelOutboundBytesHighWatermark = streamChannelOutboundBytesHighWatermark
self.streamChannelOutboundBytesLowWatermark = streamChannelOutboundBytesLowWatermark
self.streamInitializerProductContinuation = streamInitializerProductContinuation

self.mode = mode
switch mode {
case .client:
Expand Down Expand Up @@ -103,12 +106,6 @@ extension HTTP2CommonInboundStreamMultiplexer {

self.streams[streamID] = channel

// If we have an async sequence of inbound stream channels yield the channel to it
// This also implicitly performs the stream initialization step.
// Note that in this case the API is constructed such that `self.inboundStreamStateInitializer`
// does no actual work.
self.streamChannelContinuation?.yield(channel: channel.baseChannel)

// Note: Firing the initial (header) frame before calling `HTTP2StreamChannel.configureInboundStream(initializer:)`
// is crucial to preserve frame order, since the initialization process might trigger another read on the parent
// channel which in turn might cause further frames to be processed synchronously.
Expand Down Expand Up @@ -198,7 +195,7 @@ extension HTTP2CommonInboundStreamMultiplexer {
channel.receiveStreamClosed(nil)
}
// there cannot be any more inbound streams now that the connection channel is inactive
self.streamChannelContinuation?.finish()
self.streamInitializerProductContinuation?.finish()
}

internal func propagateChannelWritabilityChanged(context: ChannelHandlerContext) {
Expand Down Expand Up @@ -365,7 +362,8 @@ extension HTTP2CommonInboundStreamMultiplexer {

internal func createStreamChannel(
multiplexer: HTTP2StreamChannel.OutboundStreamMultiplexer,
_ streamStateInitializer: @escaping (Channel) -> EventLoopFuture<Void>) -> EventLoopFuture<Channel> {
_ streamStateInitializer: @escaping (Channel) -> EventLoopFuture<Void>
) -> EventLoopFuture<Channel> {
let promise = self.channel.eventLoop.makePromise(of: Channel.self)
self.createStreamChannel(multiplexer: multiplexer, promise: promise, streamStateInitializer)
return promise.futureResult
Expand Down Expand Up @@ -409,85 +407,17 @@ extension HTTP2CommonInboundStreamMultiplexer {
}
}

extension HTTP2CommonInboundStreamMultiplexer {
func setChannelContinuation(_ streamChannels: any ChannelContinuation) {
self.channel.eventLoop.assertInEventLoop()
self.streamChannelContinuation = streamChannels
}
}

/// `ChannelContinuation` is used to generic async-sequence-like objects to deal with `Channel`s. This is so that they may be held
/// by the `HTTP2ChannelHandler` without causing it to become generic itself.
internal protocol ChannelContinuation {
func yield(channel: Channel)
/// `AnyContinuation` is used to describe generic async-sequence-like objects which deal with the products of inbound stream initializers.
/// This is so that they may be held by the `HTTP2ChannelHandler` without causing it to become generic itself.
internal protocol AnyContinuation {
func yield(_ any: Any)
func finish()
func finish(throwing error: Error)
}


/// `StreamChannelContinuation` is a wrapper for a generic `AsyncThrowingStream` which holds the inbound HTTP2 stream channels.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
struct StreamChannelContinuation<Output>: ChannelContinuation {
private var continuation: AsyncThrowingStream<Output, Error>.Continuation
private let inboundStreamInititializer: NIOChannelInitializerWithOutput<Output>

private init(
continuation: AsyncThrowingStream<Output, Error>.Continuation,
inboundStreamInititializer: @escaping NIOChannelInitializerWithOutput<Output>
) {
self.continuation = continuation
self.inboundStreamInititializer = inboundStreamInititializer
}

/// `initialize` creates a new `StreamChannelContinuation` object and returns it along with its backing `AsyncThrowingStream`.
/// The `StreamChannelContinuation` provides access to the inbound HTTP2 stream channels.
///
/// - Parameters:
/// - inboundStreamInititializer: A closure which initializes the newly-created inbound stream channel and returns a generic.
/// The returned type corresponds to the output of the channel once the operations in the initializer have been performed.
/// For example an `inboundStreamInititializer` which inserts handlers before wrapping the channel in a `NIOAsyncChannel` would
/// have a `Output` corresponding to that `NIOAsyncChannel` type. Another example is in cases where there is
/// per-stream protocol negotiation where `Output` would be some form of `NIOProtocolNegotiationResult`.
static func initialize(
with inboundStreamInititializer: @escaping NIOChannelInitializerWithOutput<Output>
) -> (StreamChannelContinuation<Output>, NIOHTTP2InboundStreamChannels<Output>) {
let (stream, continuation) = AsyncThrowingStream.makeStream(of: Output.self)
return (StreamChannelContinuation(continuation: continuation, inboundStreamInititializer: inboundStreamInititializer), NIOHTTP2InboundStreamChannels(stream))
}

/// `yield` takes a channel, executes the stored `streamInitializer` upon it and then yields the *derived* type to
/// the wrapped `AsyncThrowingStream`.
func yield(channel: Channel) {
channel.eventLoop.assertInEventLoop()
self.inboundStreamInititializer(channel).whenSuccess { output in
let yieldResult = self.continuation.yield(output)
switch yieldResult {
case .enqueued:
break // success, nothing to do
case .dropped:
preconditionFailure("Attempted to yield channel when AsyncThrowingStream is over capacity. This shouldn't be possible for an unbounded stream.")
case .terminated:
channel.close(mode: .all, promise: nil)
preconditionFailure("Attempted to yield channel to AsyncThrowingStream in terminated state.")
default:
channel.close(mode: .all, promise: nil)
preconditionFailure("Attempt to yield channel to AsyncThrowingStream failed for unhandled reason.")
}
}
}

/// `finish` marks the continuation as finished.
func finish() {
self.continuation.finish()
}

/// `finish` marks the continuation as finished with the supplied error.
func finish(throwing error: Error) {
self.continuation.finish(throwing: error)
}
}

/// `NIOHTTP2InboundStreamChannels` provides access to inbound stream channels as an `AsyncSequence`.
/// `NIOHTTP2InboundStreamChannels` provides access to the products of inbound stream channel initializers as an `AsyncSequence`.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
@_spi(AsyncChannel)
public struct NIOHTTP2InboundStreamChannels<Output>: AsyncSequence {
Expand All @@ -509,7 +439,7 @@ public struct NIOHTTP2InboundStreamChannels<Output>: AsyncSequence {

private let asyncThrowingStream: AsyncThrowingStream<Output, Error>

init(_ asyncThrowingStream: AsyncThrowingStream<Output, Error>) {
private init(_ asyncThrowingStream: AsyncThrowingStream<Output, Error>) {
self.asyncThrowingStream = asyncThrowingStream
}

Expand All @@ -518,6 +448,59 @@ public struct NIOHTTP2InboundStreamChannels<Output>: AsyncSequence {
}
}

@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
extension NIOHTTP2InboundStreamChannels {
/// `Continuation` provides an async stream interface for accessing initialized inbound HTTP/2 stream channels.
///
/// This is defined to operate on a generic `Output` type rather than `Channel` to allow the stream channel initializer
/// to wrap the underlying channel, for example as `NIOAsyncChannel`s or protocol negotiation objects.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
struct Continuation: AnyContinuation {
private var continuation: AsyncThrowingStream<Output, Error>.Continuation

internal init(
continuation: AsyncThrowingStream<Output, Error>.Continuation
) {
self.continuation = continuation
}

/// `yield` supplies the provided object to the .
func yield(_ any: Any) {
let output = any as! Output
let yieldResult = self.continuation.yield(output)
switch yieldResult {
case .enqueued:
break // success, nothing to do
case .dropped:
preconditionFailure("Attempted to yield when AsyncThrowingStream is over capacity. This shouldn't be possible for an unbounded stream.")
case .terminated:
preconditionFailure("Attempted to yield to AsyncThrowingStream in terminated state.")
default:
preconditionFailure("Attempt to yield to AsyncThrowingStream failed for unhandled reason.")
}
}

/// `finish` marks the continuation as finished.
func finish() {
self.continuation.finish()
}

/// `finish` marks the continuation as finished with the supplied error.
func finish(throwing error: Error) {
self.continuation.finish(throwing: error)
}
}

/// `initialize` creates a new `NIOHTTP2InboundStreamChannels` object and returns it along with its `StreamChannelContinuation`.
///
/// This is defined to operate on a generic `Output` type rather than `Channel` to allow the stream channel initializer
/// to wrap the underlying channel, for example as `NIOAsyncChannel`s or protocol negotiation objects.
static func initialize(inboundStreamInitializerOutput: Output.Type = Output.self) -> (NIOHTTP2InboundStreamChannels<Output>, Continuation) {
let (stream, continuation) = AsyncThrowingStream.makeStream(of: Output.self)
return (.init(stream), Continuation(continuation: continuation))
}
}

#if swift(>=5.7)
// This doesn't compile on 5.6 but the omission of Sendable is sufficient in any case
@available(*, unavailable)
Expand Down

0 comments on commit 53be8d4

Please sign in to comment.