From e6e49819ed03112d43ac74cbf35c16aaf6a98de0 Mon Sep 17 00:00:00 2001 From: George Barnett Date: Tue, 25 Nov 2025 10:05:56 +0000 Subject: [PATCH] Don't hold a lock over a continuation in test helpers Motivation: The various 'withMumbleContinuation' APIs are supposed to be invoked synchronously with the caller. This assumption allows a lock to be acquired before the call and released from the body of the 'withMumbleContinuation' after e.g. storing the continuation. However this isn't the case and the job may be re-enqueued on the executor meaning that this is pattern is vulnerable to deadlocks. Modifications: - Rework the test helpers to avoid holding a lock when a continuation is created. - Switch to using NIOLockedValue box Result: Lower chance of deadlock --- .../AsyncTestHelpers.swift | 190 ++++++++++++------ 1 file changed, 127 insertions(+), 63 deletions(-) diff --git a/Tests/AsyncHTTPClientTests/AsyncTestHelpers.swift b/Tests/AsyncHTTPClientTests/AsyncTestHelpers.swift index 4a5c8d486..5e063be81 100644 --- a/Tests/AsyncHTTPClientTests/AsyncTestHelpers.swift +++ b/Tests/AsyncHTTPClientTests/AsyncTestHelpers.swift @@ -43,12 +43,11 @@ final class AsyncSequenceWriter: AsyncSequence, @unchecked Se case failed(Error, CheckedContinuation?) } - private var _state = State.buffering(.init(), nil) - private let lock = NIOLock() + private let state = NIOLockedValueBox(.buffering([], nil)) public var hasDemand: Bool { - self.lock.withLock { - switch self._state { + self.state.withLockedValue { state in + switch state { case .failed, .finished, .buffering: return false case .waiting: @@ -59,67 +58,132 @@ final class AsyncSequenceWriter: AsyncSequence, @unchecked Se /// Wait until a downstream consumer has issued more demand by calling `next`. public func demand() async { - self.lock.lock() + let shouldBuffer = self.state.withLockedValue { state in + switch state { + case .buffering(_, .none): + return true + case .waiting: + return false + case .buffering(_, .some), .failed(_, .some): + preconditionFailure("Already waiting for demand. Invalid state: \(state)") + case .finished, .failed: + preconditionFailure("Invalid state: \(state)") + } + } - switch self._state { - case .buffering(let buffer, .none): + if shouldBuffer { await withCheckedContinuation { (continuation: CheckedContinuation) in - self._state = .buffering(buffer, continuation) - self.lock.unlock() + let shouldResumeContinuation = self.state.withLockedValue { state in + switch state { + case .buffering(let buffer, .none): + state = .buffering(buffer, continuation) + return false + case .waiting: + return true + case .buffering(_, .some), .failed(_, .some): + preconditionFailure("Already waiting for demand. Invalid state: \(state)") + case .finished, .failed: + preconditionFailure("Invalid state: \(state)") + } + } + + if shouldResumeContinuation { + continuation.resume() + } } - - case .waiting: - self.lock.unlock() - return - - case .buffering(_, .some), .failed(_, .some): - let state = self._state - self.lock.unlock() - preconditionFailure("Already waiting for demand. Invalid state: \(state)") - - case .finished, .failed: - let state = self._state - self.lock.unlock() - preconditionFailure("Invalid state: \(state)") } } + private enum NextAction { + /// Resume the continuation if present, and return the result if present. + case resumeAndReturn(CheckedContinuation?, Result?) + /// Suspend the current task and wait for the next value. + case suspend + } + private func next() async throws -> Element? { - self.lock.lock() - switch self._state { - case .buffering(let buffer, let demandContinuation) where buffer.isEmpty: - return try await withCheckedThrowingContinuation { continuation in - self._state = .waiting(continuation) - self.lock.unlock() - demandContinuation?.resume(returning: ()) - } + let action: NextAction = self.state.withLockedValue { state in + switch state { + case .buffering(var buffer, let demandContinuation): + if buffer.isEmpty { + return .suspend + } else { + let first = buffer.removeFirst() + if first != nil { + state = .buffering(buffer, demandContinuation) + } else { + state = .finished + } + return .resumeAndReturn(nil, .success(first)) + } + + case .failed(let error, let demandContinuation): + state = .finished + return .resumeAndReturn(demandContinuation, .failure(error)) + + case .finished: + return .resumeAndReturn(nil, .success(nil)) - case .buffering(var buffer, let demandContinuation): - let first = buffer.removeFirst() - if first != nil { - self._state = .buffering(buffer, demandContinuation) - } else { - self._state = .finished + case .waiting: + preconditionFailure( + "Expected that there is always only one concurrent call to next. Invalid state: \(state)" + ) } - self.lock.unlock() - return first + } - case .failed(let error, let demandContinuation): - self._state = .finished - self.lock.unlock() + switch action { + case .resumeAndReturn(let demandContinuation, let result): demandContinuation?.resume() - throw error - - case .finished: - self.lock.unlock() - return nil - - case .waiting: - let state = self._state - self.lock.unlock() - preconditionFailure( - "Expected that there is always only one concurrent call to next. Invalid state: \(state)" - ) + return try result?.get() + + case .suspend: + // Holding the lock here *should* be safe but because of a bug in the runtime + // it isn't, so drop the lock, create the continuation and then try again. + // + // See https://github.com/swiftlang/swift/issues/85668 + return try await withCheckedThrowingContinuation { + (continuation: CheckedContinuation) in + let action: NextAction = self.state.withLockedValue { state in + switch state { + case .buffering(var buffer, let demandContinuation): + if buffer.isEmpty { + state = .waiting(continuation) + return .resumeAndReturn(demandContinuation, nil) + } else { + let first = buffer.removeFirst() + if first != nil { + state = .buffering(buffer, demandContinuation) + } else { + state = .finished + } + return .resumeAndReturn(nil, .success(first)) + } + + case .failed(let error, let demandContinuation): + state = .finished + return .resumeAndReturn(demandContinuation, .failure(error)) + + case .finished: + return .resumeAndReturn(nil, .success(nil)) + + case .waiting: + preconditionFailure( + "Expected that there is always only one concurrent call to next. Invalid state: \(state)" + ) + } + } + + switch action { + case .resumeAndReturn(let demandContinuation, let result): + demandContinuation?.resume() + // Resume the continuation rather than returning th result. + if let result { + continuation.resume(with: result) + } + case .suspend: + preconditionFailure() // Not returned from the code above. + } + } } } @@ -137,19 +201,19 @@ final class AsyncSequenceWriter: AsyncSequence, @unchecked Se } private func writeBufferOrEnd(_ element: Element?) { - let writeAction = self.lock.withLock { () -> WriteAction in - switch self._state { + let writeAction = self.state.withLockedValue { state -> WriteAction in + switch state { case .buffering(var buffer, let continuation): buffer.append(element) - self._state = .buffering(buffer, continuation) + state = .buffering(buffer, continuation) return .none case .waiting(let continuation): - self._state = .buffering(.init(), nil) + state = .buffering(.init(), nil) return .succeedContinuation(continuation, element) case .finished, .failed: - preconditionFailure("Invalid state: \(self._state)") + preconditionFailure("Invalid state: \(state)") } } @@ -170,17 +234,17 @@ final class AsyncSequenceWriter: AsyncSequence, @unchecked Se /// Drops all buffered writes and emits an error on the waiting `next`. If there is no call to `next` /// waiting, will emit the error on the next call to `next`. public func fail(_ error: Error) { - let errorAction = self.lock.withLock { () -> ErrorAction in - switch self._state { + let errorAction = self.state.withLockedValue { state -> ErrorAction in + switch state { case .buffering(_, let demandContinuation): - self._state = .failed(error, demandContinuation) + state = .failed(error, demandContinuation) return .none case .failed, .finished: return .none case .waiting(let continuation): - self._state = .finished + state = .finished return .failContinuation(continuation, error) } }