diff --git a/Sources/OpenFeature/AsyncProviderOperationsQueue.swift b/Sources/OpenFeature/AsyncProviderOperationsQueue.swift new file mode 100644 index 0000000..f3377d1 --- /dev/null +++ b/Sources/OpenFeature/AsyncProviderOperationsQueue.swift @@ -0,0 +1,81 @@ +import Foundation + +/// Unified serial async task queue with operation-type-aware last-wins semantics. +/// - Non-last-wins operations always execute in order +/// - Consecutive last-wins operations: only the last one executes +/// - Order is always preserved +internal actor AsyncProviderOperationsQueue { + private var currentTask: Task? + + private struct QueuedOperation { + let operation: () async -> Void + let continuation: CheckedContinuation + let lastWins: Bool + } + + private var queue: [QueuedOperation] = [] + + /// Runs the given operation serially. + /// - If lastWins is false: operation always executes + /// - If lastWins is true: may be skipped if superseded by a later last-wins operation + func run(lastWins: Bool, operation: @Sendable @escaping () async -> Void) async { + await withCheckedContinuation { continuation in + queue.append(QueuedOperation(operation: operation, continuation: continuation, lastWins: lastWins)) + + if currentTask == nil { + processNext() + } + } + } + + private func processNext() { + guard !queue.isEmpty else { + currentTask = nil + return + } + + // Find the next batch to execute + // A batch is either: + // 1. A single non-last-wins operation, OR + // 2. Consecutive last-wins operations (we execute only the last one) + + let firstOp = queue[0] + + if !firstOp.lastWins { + // Non-last-wins operation: execute it immediately + let op = queue.removeFirst() + currentTask = Task { [weak self] in + await op.operation() + op.continuation.resume() + await self?.processNext() + } + } else { + // Last-wins operation: find all consecutive last-wins ops + var lastWinsCount = 0 + for op in queue { + if op.lastWins { + lastWinsCount += 1 + } else { + break + } + } + + // Execute only the last one in the last-wins batch + let toSkip = Array(queue.prefix(lastWinsCount - 1)) + let toExecute = queue[lastWinsCount - 1] + queue.removeFirst(lastWinsCount) + + currentTask = Task { [weak self] in + await toExecute.operation() + + // Resume all continuations (both skipped and executed) + for op in toSkip { + op.continuation.resume() + } + toExecute.continuation.resume() + + await self?.processNext() + } + } + } +} diff --git a/Sources/OpenFeature/OpenFeatureAPI.swift b/Sources/OpenFeature/OpenFeatureAPI.swift index fc785ab..77bbcdb 100644 --- a/Sources/OpenFeature/OpenFeatureAPI.swift +++ b/Sources/OpenFeature/OpenFeatureAPI.swift @@ -5,7 +5,10 @@ import Foundation /// Configuration here will be shared across all ``Client``s. public class OpenFeatureAPI { private let eventHandler = EventHandler() - private let queue = DispatchQueue(label: "com.openfeature.providerDescriptor.queue") + // Sync queue to change state atomically + private let stateQueue = DispatchQueue(label: "com.openfeature.state.queue") + // Queue for provider's initialize and onContextSet operations + private let unifiedQueue = AsyncProviderOperationsQueue() private(set) var providerSubject = CurrentValueSubject(nil) private(set) var evaluationContext: EvaluationContext? @@ -15,18 +18,15 @@ public class OpenFeatureAPI { /// The ``OpenFeatureAPI`` singleton static public let shared = OpenFeatureAPI() - public init() { - } + public init() {} /** Set provider and calls its `initialize` in a background thread. Readiness can be determined from `getState` or listening for `ready` event. */ public func setProvider(provider: FeatureProvider, initialContext: EvaluationContext?) { - queue.async { - Task { - await self.setProviderInternal(provider: provider, initialContext: initialContext) - } + Task { + await self.setProviderInternal(provider: provider, initialContext: initialContext) } } @@ -35,14 +35,7 @@ public class OpenFeatureAPI { This async function returns when the `initialize` from the provider is completed. */ public func setProviderAndWait(provider: FeatureProvider, initialContext: EvaluationContext?) async { - await withCheckedContinuation { continuation in - queue.async { - Task { - await self.setProviderInternal(provider: provider, initialContext: initialContext) - continuation.resume() - } - } - } + await self.setProviderInternal(provider: provider, initialContext: initialContext) } /** @@ -62,13 +55,31 @@ public class OpenFeatureAPI { } public func getProvider() -> FeatureProvider? { - return self.providerSubject.value + return stateQueue.sync { + self.providerSubject.value + } } public func clearProvider() { - queue.sync { - self.providerSubject.send(nil) - self.providerStatus = .notReady + Task { + await clearProviderInternal() + } + } + + /** + Clear provider. + This async function returns when the clear operation is completed. + */ + public func clearProviderAndWait() async { + await clearProviderInternal() + } + + private func clearProviderInternal() async { + await unifiedQueue.run(lastWins: false) { [self] in + stateQueue.sync { + self.providerSubject.send(nil) + self.providerStatus = .notReady + } } } @@ -77,10 +88,8 @@ public class OpenFeatureAPI { Readiness can be determined from `getState` or listening for `contextChanged` event. */ public func setEvaluationContext(evaluationContext: EvaluationContext) { - queue.async { - Task { - await self.updateContext(evaluationContext: evaluationContext) - } + Task { + await self.updateContext(evaluationContext: evaluationContext) } } @@ -89,22 +98,19 @@ public class OpenFeatureAPI { This async function returns when the `onContextSet` from the provider is completed. */ public func setEvaluationContextAndWait(evaluationContext: EvaluationContext) async { - await withCheckedContinuation { continuation in - queue.async { - Task { - await self.updateContext(evaluationContext: evaluationContext) - continuation.resume() - } - } - } + await updateContext(evaluationContext: evaluationContext) } public func getEvaluationContext() -> EvaluationContext? { - return self.evaluationContext + return stateQueue.sync { + self.evaluationContext + } } public func getProviderStatus() -> ProviderStatus { - return self.providerStatus + return stateQueue.sync { + self.providerStatus + } } public func getProviderMetadata() -> ProviderMetadata? { @@ -120,11 +126,21 @@ public class OpenFeatureAPI { } public func addHooks(hooks: (any Hook)...) { - self.hooks.append(contentsOf: hooks) + stateQueue.sync { + self.hooks.append(contentsOf: hooks) + } } public func clearHooks() { - self.hooks.removeAll() + stateQueue.sync { + self.hooks.removeAll() + } + } + + internal func getHooks() -> [any Hook] { + return stateQueue.sync { + self.hooks + } } public func observe() -> AnyPublisher { @@ -143,7 +159,7 @@ public class OpenFeatureAPI { } internal func getState() -> OpenFeatureState { - return queue.sync { + return stateQueue.sync { OpenFeatureState( provider: providerSubject.value, evaluationContext: evaluationContext, @@ -152,44 +168,81 @@ public class OpenFeatureAPI { } private func setProviderInternal(provider: FeatureProvider, initialContext: EvaluationContext? = nil) async { - self.providerStatus = .notReady - self.providerSubject.send(provider) - - if let initialContext = initialContext { - self.evaluationContext = initialContext - } + await unifiedQueue.run(lastWins: false) { [self] in + // Set initial state atomically + stateQueue.sync { + self.providerStatus = .notReady + self.providerSubject.send(provider) + if let initialContext = initialContext { + self.evaluationContext = initialContext + } + } - do { - try await provider.initialize(initialContext: initialContext) - self.providerStatus = .ready - self.eventHandler.send(.ready(nil)) - } catch { - switch error { - case OpenFeatureError.providerFatalError(let message): - self.providerStatus = .fatal - self.eventHandler.send(.error(ProviderEventDetails(message: message, errorCode: .providerFatal))) - default: - self.providerStatus = .error - self.eventHandler.send(.error(ProviderEventDetails(message: error.localizedDescription))) + // Initialize provider - this entire operation is atomic + do { + try await provider.initialize(initialContext: initialContext) + stateQueue.sync { + self.providerStatus = .ready + } + self.eventHandler.send(.ready(nil)) + } catch { + stateQueue.sync { + switch error { + case OpenFeatureError.providerFatalError(_): + self.providerStatus = .fatal + default: + self.providerStatus = .error + } + } + switch error { + case OpenFeatureError.providerFatalError(let message): + self.eventHandler.send(.error(ProviderEventDetails(message: message, errorCode: .providerFatal))) + default: + self.eventHandler.send(.error(ProviderEventDetails(message: error.localizedDescription))) + } } } } private func updateContext(evaluationContext: EvaluationContext) async { - do { - let oldContext = self.evaluationContext - self.evaluationContext = evaluationContext - self.providerStatus = .reconciling + await unifiedQueue.run(lastWins: true) { [self] in + // Get old context, set new context, and update status atomically + let (oldContext, provider) = stateQueue.sync { () -> (EvaluationContext?, FeatureProvider?) in + let oldContext = self.evaluationContext + self.evaluationContext = evaluationContext + + // Only update status if provider is set + if let provider = self.providerSubject.value { + self.providerStatus = .reconciling + return (oldContext, provider) + } + + return (oldContext, nil) + } + + // Early return if no provider is set - nothing to reconcile + guard let provider = provider else { + return + } + eventHandler.send(.reconciling(nil)) - try await self.providerSubject.value?.onContextSet( - oldContext: oldContext, - newContext: evaluationContext - ) - self.providerStatus = .ready - eventHandler.send(.contextChanged(nil)) - } catch { - self.providerStatus = .error - eventHandler.send(.error(ProviderEventDetails(message: error.localizedDescription))) + + // Call provider's onContextSet - this entire operation is atomic + do { + try await provider.onContextSet( + oldContext: oldContext, + newContext: evaluationContext + ) + stateQueue.sync { + self.providerStatus = .ready + } + eventHandler.send(.contextChanged(nil)) + } catch { + stateQueue.sync { + self.providerStatus = .error + } + eventHandler.send(.error(ProviderEventDetails(message: error.localizedDescription))) + } } } diff --git a/Sources/OpenFeature/OpenFeatureClient.swift b/Sources/OpenFeature/OpenFeatureClient.swift index b6507bf..70fd15d 100644 --- a/Sources/OpenFeature/OpenFeatureClient.swift +++ b/Sources/OpenFeature/OpenFeatureClient.swift @@ -117,7 +117,7 @@ extension OpenFeatureClient { clientMetadata: self.metadata, providerMetadata: provider.metadata) hookLock.lock() - let mergedHooks = provider.hooks + options.hooks + hooks + openFeatureApi.hooks + let mergedHooks = provider.hooks + options.hooks + hooks + openFeatureApi.getHooks() hookLock.unlock() do { hookSupport.beforeHooks(flagValueType: T.flagValueType, hookCtx: hookCtx, hooks: mergedHooks, hints: hints) diff --git a/Tests/OpenFeatureTests/FlagEvaluationTests.swift b/Tests/OpenFeatureTests/FlagEvaluationTests.swift index 9f78812..018d786 100644 --- a/Tests/OpenFeatureTests/FlagEvaluationTests.swift +++ b/Tests/OpenFeatureTests/FlagEvaluationTests.swift @@ -26,10 +26,10 @@ final class FlagEvaluationTests: XCTestCase { OpenFeatureAPI.shared.addHooks(hooks: hook1) - XCTAssertEqual(OpenFeatureAPI.shared.hooks.count, 1) + XCTAssertEqual(OpenFeatureAPI.shared.getHooks().count, 1) OpenFeatureAPI.shared.addHooks(hooks: hook2) - XCTAssertEqual(OpenFeatureAPI.shared.hooks.count, 2) + XCTAssertEqual(OpenFeatureAPI.shared.getHooks().count, 2) } func testNamedClient() { diff --git a/Tests/OpenFeatureTests/ProviderOperationsQueueTests.swift b/Tests/OpenFeatureTests/ProviderOperationsQueueTests.swift new file mode 100644 index 0000000..a19b040 --- /dev/null +++ b/Tests/OpenFeatureTests/ProviderOperationsQueueTests.swift @@ -0,0 +1,662 @@ +import XCTest +import Combine +@testable import OpenFeature + +// swiftlint:disable type_body_length file_length trailing_closure +class ProviderOperationsQueueTests: XCTestCase { + override func setUp() { + super.setUp() + OpenFeatureAPI.shared.clearProvider() + } + + func testConcurrentSetEvaluationContextRaceCondition() async throws { + let provider = MockProvider() + let readyExpectation = XCTestExpectation(description: "Ready") + let cancellable = OpenFeatureAPI.shared.observe().sink { event in + if case .ready = event { + readyExpectation.fulfill() + } + } + OpenFeatureAPI.shared.setProvider(provider: provider) + await fulfillment(of: [readyExpectation], timeout: 2.0) + + let concurrentOperations = 100 + let expectedTargetingKeys = Set((0.. [String] { + return executedOperations + } + } + + let tracker = ExecutionTracker() + + // Create a provider with a slow onContextSet to ensure operations overlap + let provider = MockProvider( + onContextSet: { _, newContext in + // Add delay to simulate slow provider operation + // This ensures that when tasks 2 and 3 are queued, task 1 is still running + let targetingKey = newContext.getTargetingKey() + await tracker.recordExecution(targetingKey) + try await Task.sleep(nanoseconds: 200_000_000) // 200ms + } + ) + + // Set up provider first + await OpenFeatureAPI.shared.setProviderAndWait(provider: provider) + + // Start three concurrent context updates + async let task1: Void = { + let ctx1 = ImmutableContext( + targetingKey: "user1", + structure: ImmutableStructure(attributes: ["id": .integer(1)]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx1) + }() + + // Small delay to ensure task1 starts first + try await Task.sleep(nanoseconds: 10_000_000) // 10ms + + async let task2: Void = { + let ctx2 = ImmutableContext( + targetingKey: "user2", + structure: ImmutableStructure(attributes: ["id": .integer(2)]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx2) + }() + + // Small delay to ensure task2 starts before task3 + try await Task.sleep(nanoseconds: 10_000_000) // 10ms + + async let task3: Void = { + let ctx3 = ImmutableContext( + targetingKey: "user3", + structure: ImmutableStructure(attributes: ["id": .integer(3)]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx3) + }() + + // Wait for all tasks to complete + await task1 + await task2 + await task3 + + let executedOperations = await tracker.getExecutions() + + // Verify coalescence: if working correctly, should have executed at most 2 operations + // (first one + latest one), skipping the middle one + XCTAssertLessThanOrEqual( + executedOperations.count, + 2, + """ + Should execute at most 2 operations due to coalescence (first + latest), \ + but executed: \(executedOperations) + """ + ) + + // Verify user2 was NOT executed (it should be coalesced/skipped) + XCTAssertFalse( + executedOperations.contains("user2"), + "user2 operation should have been skipped due to coalescence" + ) + + // Verify final context is from the last operation + let finalContext = OpenFeatureAPI.shared.getEvaluationContext() + XCTAssertEqual( + finalContext?.getTargetingKey(), + "user3", + "Final context should be from the last operation (user3)" + ) + } + + // MARK: - Edge Case Tests for AsyncCoalescingSerialQueue Coalescence + + func testAsyncCoalescingSerialQueueSingleOperation() async throws { + // Test that a single operation executes normally without coalescence + actor ExecutionTracker { + var executedOperations: [String] = [] + func recordExecution(_ operation: String) { + executedOperations.append(operation) + } + func getExecutions() -> [String] { executedOperations } + } + + let tracker = ExecutionTracker() + let provider = MockProvider( + onContextSet: { _, newContext in + await tracker.recordExecution(newContext.getTargetingKey()) + } + ) + + await OpenFeatureAPI.shared.setProviderAndWait(provider: provider) + + let ctx = ImmutableContext( + targetingKey: "single-user", + structure: ImmutableStructure(attributes: ["id": .integer(1)]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx) + + let executedOperations = await tracker.getExecutions() + XCTAssertEqual(executedOperations.count, 1, "Single operation should execute exactly once") + XCTAssertEqual(executedOperations.first, "single-user", "Should execute the single operation") + XCTAssertEqual(OpenFeatureAPI.shared.getEvaluationContext()?.getTargetingKey(), "single-user") + } + + func testAsyncCoalescingSerialQueueTwoSequentialOperations() async throws { + // Test two operations that don't overlap - both should execute + actor ExecutionTracker { + var executedOperations: [String] = [] + func recordExecution(_ operation: String) { + executedOperations.append(operation) + } + func getExecutions() -> [String] { executedOperations } + } + + let tracker = ExecutionTracker() + let provider = MockProvider( + onContextSet: { _, newContext in + await tracker.recordExecution(newContext.getTargetingKey()) + try await Task.sleep(nanoseconds: 10_000_000) // 10ms + } + ) + + await OpenFeatureAPI.shared.setProviderAndWait(provider: provider) + + // First operation + let ctx1 = ImmutableContext( + targetingKey: "user1", + structure: ImmutableStructure(attributes: ["id": .integer(1)]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx1) + + // Second operation - starts after first completes + let ctx2 = ImmutableContext( + targetingKey: "user2", + structure: ImmutableStructure(attributes: ["id": .integer(2)]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx2) + + let executedOperations = await tracker.getExecutions() + XCTAssertEqual(executedOperations.count, 2, "Both sequential operations should execute") + XCTAssertEqual(executedOperations, ["user1", "user2"], "Operations should execute in order") + XCTAssertEqual(OpenFeatureAPI.shared.getEvaluationContext()?.getTargetingKey(), "user2") + } + + func testAsyncCoalescingSerialQueueRapidBurstCoalescence() async throws { + // Test that rapid bursts of many operations get heavily coalesced + actor ExecutionTracker { + var executedOperations: [String] = [] + func recordExecution(_ operation: String) { + executedOperations.append(operation) + } + func getExecutions() -> [String] { executedOperations } + } + + let tracker = ExecutionTracker() + let provider = MockProvider( + onContextSet: { _, newContext in + await tracker.recordExecution(newContext.getTargetingKey()) + try await Task.sleep(nanoseconds: 100_000_000) // 100ms - long enough for many to queue + } + ) + + await OpenFeatureAPI.shared.setProviderAndWait(provider: provider) + + // Launch 10 operations rapidly + await withTaskGroup(of: Void.self) { group in + for i in 0..<10 { + group.addTask { + let ctx = ImmutableContext( + targetingKey: "user\(i)", + structure: ImmutableStructure(attributes: ["id": .integer(Int64(i))]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx) + } + } + } + + let executedOperations = await tracker.getExecutions() + + // Should execute far fewer than 10 operations due to coalescence + XCTAssertLessThanOrEqual( + executedOperations.count, + 3, + "Rapid burst should heavily coalesce, executed: \(executedOperations)" + ) + + // Final context should be from one of the last operations + let finalContext = OpenFeatureAPI.shared.getEvaluationContext() + let finalKey = finalContext?.getTargetingKey() ?? "" + XCTAssertTrue( + finalKey.hasPrefix("user"), + "Final context should be from one of the operations" + ) + } + + func testAsyncCoalescingSerialQueueOperationsArrivingAfterCompletion() async throws { + // Test that operations arriving after the previous one completes still execute + actor ExecutionTracker { + var executedOperations: [String] = [] + func recordExecution(_ operation: String) { + executedOperations.append(operation) + } + func getExecutions() -> [String] { executedOperations } + } + + let tracker = ExecutionTracker() + let provider = MockProvider( + onContextSet: { _, newContext in + await tracker.recordExecution(newContext.getTargetingKey()) + } + ) + + await OpenFeatureAPI.shared.setProviderAndWait(provider: provider) + + // First operation + let ctx1 = ImmutableContext( + targetingKey: "batch1-user1", + structure: ImmutableStructure(attributes: ["id": .integer(1)]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx1) + + // Wait a bit to ensure first operation is completely done + try await Task.sleep(nanoseconds: 50_000_000) // 50ms + + // Now submit several operations that arrive after the queue is idle + async let task2: Void = { + let ctx2 = ImmutableContext( + targetingKey: "batch2-user2", + structure: ImmutableStructure(attributes: ["id": .integer(2)]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx2) + }() + + try await Task.sleep(nanoseconds: 5_000_000) // 5ms + + async let task3: Void = { + let ctx3 = ImmutableContext( + targetingKey: "batch2-user3", + structure: ImmutableStructure(attributes: ["id": .integer(3)]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx3) + }() + + await task2 + await task3 + + let executedOperations = await tracker.getExecutions() + + // Should have first operation, and at least one from the second batch + XCTAssertGreaterThanOrEqual(executedOperations.count, 2, "Should execute operations from both batches") + XCTAssertEqual(executedOperations.first, "batch1-user1", "First operation should execute") + + let finalContext = OpenFeatureAPI.shared.getEvaluationContext() + XCTAssertTrue( + finalContext?.getTargetingKey().hasPrefix("batch2") ?? false, + "Final context should be from second batch" + ) + } + + func testAsyncCoalescingSerialQueueWithErrorHandling() async throws { + // Test that errors in operations don't break the queue + actor ExecutionTracker { + var executedOperations: [String] = [] + var errorOccurred = false + func recordExecution(_ operation: String) { + executedOperations.append(operation) + } + func recordError() { + errorOccurred = true + } + func getExecutions() -> [String] { executedOperations } + func hasError() -> Bool { errorOccurred } + } + + let tracker = ExecutionTracker() + let provider = MockProvider( + onContextSet: { _, newContext in + let targetingKey = newContext.getTargetingKey() + await tracker.recordExecution(targetingKey) + + // Throw error for "error-user" + if targetingKey == "error-user" { + await tracker.recordError() + throw MockProvider.MockProviderError.message("Simulated error") + } + + try await Task.sleep(nanoseconds: 200_000_000) // 200ms + } + ) + + await OpenFeatureAPI.shared.setProviderAndWait(provider: provider) + + // Launch operations including one that will error + async let task1: Void = { + let ctx1 = ImmutableContext( + targetingKey: "user1", + structure: ImmutableStructure(attributes: ["id": .integer(1)]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx1) + }() + + try await Task.sleep(nanoseconds: 10_000_000) // 10ms + + async let task2: Void = { + let ctx2 = ImmutableContext( + targetingKey: "error-user", + structure: ImmutableStructure(attributes: ["id": .integer(2)]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx2) + }() + + try await Task.sleep(nanoseconds: 10_000_000) // 10ms + + async let task3: Void = { + let ctx3 = ImmutableContext( + targetingKey: "user3", + structure: ImmutableStructure(attributes: ["id": .integer(3)]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx3) + }() + + await task1 + await task2 + await task3 + + let executedOperations = await tracker.getExecutions() + + // Should execute at most 2 operations due to coalescence + XCTAssertLessThanOrEqual(executedOperations.count, 2, "Should coalesce despite errors") + + // Final context depends on which operations were coalesced + // But the queue should still be functional + let finalContext = OpenFeatureAPI.shared.getEvaluationContext() + XCTAssertNotNil(finalContext, "Queue should continue functioning after error") + + // Provider status should reflect the error + let status = OpenFeatureAPI.shared.getProviderStatus() + XCTAssertTrue( + [.ready, .error].contains(status), + "Status should be either ready or error depending on final operation" + ) + } + + func testAsyncCoalescingSerialQueueContinuationResumeCorrectness() async throws { + // Test that all callers get resumed correctly, even those whose operations were skipped + actor CompletionTracker { + var completedTasks: Set = [] + func markCompleted(_ taskId: Int) { + completedTasks.insert(taskId) + } + func getCompleted() -> Set { completedTasks } + } + + let completionTracker = CompletionTracker() + let provider = MockProvider( + onContextSet: { _, _ in + try await Task.sleep(nanoseconds: 50_000_000) // 50ms + } + ) + + await OpenFeatureAPI.shared.setProviderAndWait(provider: provider) + + // Launch multiple tasks and track that they all complete + await withTaskGroup(of: Void.self) { group in + for i in 0..<5 { + group.addTask { + let ctx = ImmutableContext( + targetingKey: "user\(i)", + structure: ImmutableStructure(attributes: ["id": .integer(Int64(i))]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx) + await completionTracker.markCompleted(i) + } + + // Small stagger to ensure they overlap + try? await Task.sleep(nanoseconds: 5_000_000) // 5ms + } + } + + let completedTasks = await completionTracker.getCompleted() + + // All 5 tasks should have completed (received their continuation resume) + XCTAssertEqual( + completedTasks.count, + 5, + "All tasks should complete even if their operations were coalesced" + ) + XCTAssertEqual( + completedTasks, + Set([0, 1, 2, 3, 4]), + "All task IDs should be marked as completed" + ) + } + + func testAsyncCoalescingSerialQueueNoStarvation() async throws { + // Test that the queue doesn't cause starvation - operations eventually execute + actor ExecutionTracker { + var executedOperations: [String] = [] + func recordExecution(_ operation: String) { + executedOperations.append(operation) + } + func getExecutions() -> [String] { executedOperations } + } + + let tracker = ExecutionTracker() + let provider = MockProvider( + onContextSet: { _, newContext in + await tracker.recordExecution(newContext.getTargetingKey()) + try await Task.sleep(nanoseconds: 20_000_000) // 20ms + } + ) + + await OpenFeatureAPI.shared.setProviderAndWait(provider: provider) + + // Submit operations in waves to test that later waves aren't starved + for wave in 0..<3 { + await withTaskGroup(of: Void.self) { group in + for i in 0..<5 { + group.addTask { + let ctx = ImmutableContext( + targetingKey: "wave\(wave)-user\(i)", + structure: ImmutableStructure(attributes: [ + "wave": .integer(Int64(wave)), + "id": .integer(Int64(i)), + ]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx) + } + } + } + + // Small delay between waves + try await Task.sleep(nanoseconds: 30_000_000) // 30ms + } + + let executedOperations = await tracker.getExecutions() + + // Should have executed at least one operation from each wave + let hasWave0 = executedOperations.contains { $0.hasPrefix("wave0") } + let hasWave1 = executedOperations.contains { $0.hasPrefix("wave1") } + let hasWave2 = executedOperations.contains { $0.hasPrefix("wave2") } + + XCTAssertTrue(hasWave0, "Should execute operations from wave 0") + XCTAssertTrue(hasWave1, "Should execute operations from wave 1") + XCTAssertTrue(hasWave2, "Should execute operations from wave 2") + + // Final context should be from the last wave + let finalContext = OpenFeatureAPI.shared.getEvaluationContext() + XCTAssertTrue( + finalContext?.getTargetingKey().hasPrefix("wave2") ?? false, + "Final context should be from the last wave" + ) + } +} +// swiftlint:enable type_body_length file_length trailing_closure diff --git a/docs/AsyncProviderOperationsQueue.md b/docs/AsyncProviderOperationsQueue.md new file mode 100644 index 0000000..e0e31c5 --- /dev/null +++ b/docs/AsyncProviderOperationsQueue.md @@ -0,0 +1,170 @@ +# AsyncProviderOperationsQueue + +## Overview + +`AsyncProviderOperationsQueue` is a specialized serial async task queue that provides **operation-type-aware last-wins semantics** for handling OpenFeature provider operations. It ensures thread-safe, ordered execution of async operations while optimizing performance by coalescing redundant operations. + +## Key Characteristics + +- **Serial Execution**: Operations execute one at a time, preserving order +- **Actor-based**: Thread-safe through Swift's actor isolation +- **Smart Coalescing**: Automatically skips redundant operations based on last-wins semantics +- **Continuation Management**: All callers receive completion notification, even if their operation was skipped + +## Core Concepts + +### Operation Types + +The queue distinguishes between two types of operations: + +1. **Non-Last-Wins (`lastWins: false`)** + - Always executes + - Processes in strict FIFO order + - Used for critical state changes that must not be skipped + - Examples: `setProvider()`, `clearProvider()` + +2. **Last-Wins (`lastWins: true`)** + - May be skipped if superseded by newer last-wins operations + - Optimizes away intermediate states + - Used for operations where only the final state matters + - Examples: `setEvaluationContext()` + +### Batching Logic + +When processing the queue, operations are grouped into "batches": + +- **Batch 1**: A single non-last-wins operation +- **Batch 2**: Consecutive last-wins operations → only the last one executes + +## How It Works + +### Architecture + +``` +┌─────────────────────────────────────────┐ +│ AsyncProviderOperationsQueue (Actor) │ +├─────────────────────────────────────────┤ +│ - queue: [QueuedOperation] │ +│ - currentTask: Task? │ +├─────────────────────────────────────────┤ +│ + run(lastWins:operation:) async │ +│ - processNext() │ +└─────────────────────────────────────────┘ + +QueuedOperation { + operation: () async -> Void + continuation: CheckedContinuation + lastWins: Bool +} +``` + +### Execution Flow + +``` +1. Caller invokes run(lastWins:operation:) + ↓ +2. Operation wrapped with continuation and enqueued + ↓ +3. If no task running → processNext() + ↓ +4. Determine batch type + ├─ Non-last-wins: Execute single operation + └─ Last-wins: Find consecutive last-wins ops + → Execute only the LAST one + → Skip all others + ↓ +5. Resume ALL continuations (skipped + executed) + ↓ +6. Recursively processNext() until queue empty +``` + +### Example Scenarios + +#### Scenario 1: Non-Last-Wins Operations + +```swift +// Queue: Empty, currentTask: nil + +await queue.run(lastWins: false) { setProvider(A) } // Op1 +await queue.run(lastWins: false) { setProvider(B) } // Op2 +await queue.run(lastWins: false) { clearProvider() } // Op3 + +// Execution order: +// 1. setProvider(A) ✓ Executed +// 2. setProvider(B) ✓ Executed +// 3. clearProvider() ✓ Executed +// All three operations execute in order +``` + +#### Scenario 2: Last-Wins Coalescing + +```swift +// Queue: Empty, currentTask: nil + +await queue.run(lastWins: true) { setContext(ctx1) } // Op1 +await queue.run(lastWins: true) { setContext(ctx2) } // Op2 +await queue.run(lastWins: true) { setContext(ctx3) } // Op3 + +// Assume Op1 starts executing before Op2/Op3 are enqueued: +// 1. setContext(ctx1) ✓ Executed (already running) +// 2. setContext(ctx2) ✗ Skipped (superseded by ctx3) +// 3. setContext(ctx3) ✓ Executed (last in batch) + +// Result: Only ctx1 and ctx3 execute +// Op2's continuation still resumes immediately when Op3 completes +``` + +#### Scenario 3: Mixed Operations + +```swift +// Queue: Empty, currentTask: nil + +await queue.run(lastWins: false) { setProvider(A) } // Op1 +await queue.run(lastWins: true) { setContext(ctx1) } // Op2 +await queue.run(lastWins: true) { setContext(ctx2) } // Op3 +await queue.run(lastWins: false) { setProvider(B) } // Op4 +await queue.run(lastWins: true) { setContext(ctx3) } // Op5 + +// Execution flow: +// Batch 1: [Op1] non-last-wins +// → setProvider(A) ✓ Executed + +// Batch 2: [Op2, Op3] consecutive last-wins +// → setContext(ctx1) ✗ Skipped +// → setContext(ctx2) ✓ Executed (last in batch) + +// Batch 3: [Op4] non-last-wins +// → setProvider(B) ✓ Executed + +// Batch 4: [Op5] last-wins +// → setContext(ctx3) ✓ Executed + +// Total executions: Op1, Op2(skipped), Op3, Op4, Op5 +``` + +## Implementation Details + +### Actor Isolation + +The queue is implemented as a Swift `actor`, providing: +- Automatic serialization of all property access +- Thread-safe state management +- No manual locking required + +### Continuation Management + +```swift +await withCheckedContinuation { continuation in + queue.append(QueuedOperation( + operation: operation, + continuation: continuation, + lastWins: lastWins + )) + // ... +} +``` + +**Key Points:** +- Each caller gets a continuation that suspends their async context +- Continuations resume when the operation completes OR is skipped +- This ensures all callers receive notification, preventing deadlocks \ No newline at end of file