From faeed4db6c5e0023438ed0e75c6b36ff65ae8e04 Mon Sep 17 00:00:00 2001 From: Fabrizio Demaria Date: Mon, 10 Nov 2025 16:18:05 +0100 Subject: [PATCH 1/3] feat: Performance in burst of update context Signed-off-by: Fabrizio Demaria --- Sources/OpenFeature/OpenFeatureAPI.swift | 51 ++++++-- .../ConcurrencyRaceConditionTests.swift | 119 ++++++++++++++++++ 2 files changed, 162 insertions(+), 8 deletions(-) diff --git a/Sources/OpenFeature/OpenFeatureAPI.swift b/Sources/OpenFeature/OpenFeatureAPI.swift index 736bc85..00886fd 100644 --- a/Sources/OpenFeature/OpenFeatureAPI.swift +++ b/Sources/OpenFeature/OpenFeatureAPI.swift @@ -1,19 +1,54 @@ import Combine import Foundation -/// Simple serial async task queue for serializing operations +/// Simple serial async task queue that coalesces operations. +/// Only the currently running task and at most one pending operation are kept. +/// Intermediate operations are skipped to avoid queue buildup. private actor AsyncSerialQueue { - private var last: Task? + private var currentTask: Task? + private var pendingOperation: (() async -> Void)? + private var pendingContinuations: [CheckedContinuation] = [] - /// Runs the given operation after previously enqueued work completes. + /// Runs the given operation serially. If an operation is already running, + /// this operation replaces any previously pending operation (which gets skipped). + /// All callers whose operations were replaced will wait for the latest operation to complete. func run(_ operation: @Sendable @escaping () async -> Void) async { - let previous = last - let task = Task { - _ = await previous?.result + await withCheckedContinuation { continuation in + // Replace any pending operation with this new one + pendingOperation = operation + pendingContinuations.append(continuation) + + // If nothing is currently running, start processing + if currentTask == nil { + processNext() + } + } + } + + private func processNext() { + guard let operation = pendingOperation else { + // No pending work + currentTask = nil + return + } + + // Clear pending state and capture continuations + pendingOperation = nil + let continuations = pendingContinuations + pendingContinuations = [] + + // Start the task + currentTask = Task { [weak self] in await operation() + + // Resume all waiting callers + for continuation in continuations { + continuation.resume() + } + + // Process next operation if any arrived while we were running + await self?.processNext() } - last = task - await task.value } } diff --git a/Tests/OpenFeatureTests/ConcurrencyRaceConditionTests.swift b/Tests/OpenFeatureTests/ConcurrencyRaceConditionTests.swift index fbbf65c..beae82d 100644 --- a/Tests/OpenFeatureTests/ConcurrencyRaceConditionTests.swift +++ b/Tests/OpenFeatureTests/ConcurrencyRaceConditionTests.swift @@ -167,4 +167,123 @@ class ConcurrencyRaceConditionTests: XCTestCase { XCTFail("Provider or Evaluation Context unexpectedly nil") } } + + func testAsyncSerialQueueCoalescence() async throws { + print("\n========== AsyncSerialQueue Coalescence Test ==========\n") + + // Track which operations actually executed + actor ExecutionTracker { + var executedOperations: [String] = [] + + func recordExecution(_ operation: String) { + executedOperations.append(operation) + print("πŸ“ Recorded execution: \(operation)") + } + + func getExecutions() -> [String] { + return executedOperations + } + } + + let tracker = ExecutionTracker() + + // Create a provider with a slow onContextSet to ensure operations overlap + let provider = MockProvider( + onContextSet: { oldContext, newContext in + // Add delay to simulate slow provider operation + // This ensures that when tasks 2 and 3 are queued, task 1 is still running + let targetingKey = newContext.getTargetingKey() + print("βš™οΈ Provider.onContextSet running for: \(targetingKey)") + await tracker.recordExecution(targetingKey) + try await Task.sleep(nanoseconds: 50_000_000) // 50ms + print("βœ“ Provider.onContextSet completed for: \(targetingKey)") + } + ) + + // Set up provider first + print("πŸ”§ Setting up provider...") + await OpenFeatureAPI.shared.setProviderAndWait(provider: provider) + print("βœ… Provider ready\n") + + // Start three concurrent context updates + print("πŸš€ Starting three concurrent setEvaluationContext calls:\n") + + async let task1: Void = { + print("πŸ”΅ Task 1: STARTING - Creating context for user1") + let ctx1 = ImmutableContext( + targetingKey: "user1", + structure: ImmutableStructure(attributes: ["id": .integer(1)]) + ) + print("πŸ”΅ Task 1: CALLING setEvaluationContextAndWait(user1)") + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx1) + print("πŸ”΅ Task 1: RETURNED from setEvaluationContextAndWait") + print("πŸ”΅ Task 1: COMPLETED\n") + }() + + // Small delay to ensure task1 starts first + try await Task.sleep(nanoseconds: 10_000_000) // 10ms + + async let task2: Void = { + print("🟒 Task 2: STARTING - Creating context for user2") + let ctx2 = ImmutableContext( + targetingKey: "user2", + structure: ImmutableStructure(attributes: ["id": .integer(2)]) + ) + print("🟒 Task 2: CALLING setEvaluationContextAndWait(user2)") + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx2) + print("🟒 Task 2: RETURNED from setEvaluationContextAndWait") + print("🟒 Task 2: COMPLETED\n") + }() + + // Small delay to ensure task2 starts before task3 + try await Task.sleep(nanoseconds: 10_000_000) // 10ms + + async let task3: Void = { + print("πŸ”΄ Task 3: STARTING - Creating context for user3") + let ctx3 = ImmutableContext( + targetingKey: "user3", + structure: ImmutableStructure(attributes: ["id": .integer(3)]) + ) + print("πŸ”΄ Task 3: CALLING setEvaluationContextAndWait(user3)") + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx3) + print("πŸ”΄ Task 3: RETURNED from setEvaluationContextAndWait") + print("πŸ”΄ Task 3: COMPLETED\n") + }() + + // Wait for all tasks to complete + await task1 + await task2 + await task3 + + let executedOperations = await tracker.getExecutions() + + print("========== RESULTS ==========") + print("Operations that actually executed: \(executedOperations)") + print("Total operations executed: \(executedOperations.count)") + print("Expected: 2 operations (user1 and user3)") + print("Expected skipped: user2 (replaced by user3)") + print("========================================\n") + + // Verify coalescence: if working correctly, should have executed at most 2 operations + // (first one + latest one), skipping the middle one + XCTAssertLessThanOrEqual( + executedOperations.count, + 2, + "Should execute at most 2 operations due to coalescence (first + latest), but executed: \(executedOperations)" + ) + + // Verify user2 was NOT executed (it should be coalesced/skipped) + XCTAssertFalse( + executedOperations.contains("user2"), + "user2 operation should have been skipped due to coalescence" + ) + + // Verify final context is from the last operation + let finalContext = OpenFeatureAPI.shared.getEvaluationContext() + XCTAssertEqual( + finalContext?.getTargetingKey(), + "user3", + "Final context should be from the last operation (user3)" + ) + } } From bbe1f957abed8fdc63db62c6c75141402a04c624 Mon Sep 17 00:00:00 2001 From: Fabrizio Demaria Date: Tue, 11 Nov 2025 13:23:19 +0100 Subject: [PATCH 2/3] refactor: AsyncSerialQueueTests and verbose mode Signed-off-by: Fabrizio Demaria --- Sources/OpenFeature/AsyncSerialQueue.swift | 125 ++++ Sources/OpenFeature/OpenFeatureAPI.swift | 56 +- .../AsyncSerialQueueTests.swift | 641 ++++++++++++++++++ .../ConcurrencyRaceConditionTests.swift | 289 -------- 4 files changed, 770 insertions(+), 341 deletions(-) create mode 100644 Sources/OpenFeature/AsyncSerialQueue.swift create mode 100644 Tests/OpenFeatureTests/AsyncSerialQueueTests.swift delete mode 100644 Tests/OpenFeatureTests/ConcurrencyRaceConditionTests.swift diff --git a/Sources/OpenFeature/AsyncSerialQueue.swift b/Sources/OpenFeature/AsyncSerialQueue.swift new file mode 100644 index 0000000..355d3bd --- /dev/null +++ b/Sources/OpenFeature/AsyncSerialQueue.swift @@ -0,0 +1,125 @@ +import Foundation + +/// Simple serial async task queue that coalesces operations. +/// Only the currently running task and at most one pending operation are kept. +/// Intermediate operations are skipped to avoid queue buildup. +internal actor AsyncSerialQueue { + private var currentTask: Task? + private var pendingOperation: (() async -> Void)? + private var pendingContinuations: [CheckedContinuation] = [] + private var operationCounter: Int = 0 + + /// Verbose mode controls whether debug logging is enabled + private let verbose: Bool + + /// Initialize the queue with optional verbose logging + /// - Parameter verbose: If true, detailed debug logs will be printed with [ASQ] prefix + init(verbose: Bool = false) { + self.verbose = verbose + } + + /// Runs the given operation serially. If an operation is already running, + /// this operation replaces any previously pending operation (which gets skipped). + /// All callers whose operations were replaced will wait for the latest operation to complete. + func run(_ operation: @Sendable @escaping () async -> Void) async { + await withCheckedContinuation { continuation in + operationCounter += 1 + let operationId = operationCounter + + if verbose { + print("[ASQ] πŸ”΅ run() called - Operation #\(operationId)") + print("[ASQ] β”œβ”€ currentTask == nil: \(currentTask == nil)") + print("[ASQ] β”œβ”€ pendingOperation == nil (before): \(pendingOperation == nil)") + print("[ASQ] β”œβ”€ pendingContinuations.count (before): \(pendingContinuations.count)") + } + + // Replace any pending operation with this new one + let hadPendingOperation = pendingOperation != nil + pendingOperation = operation + pendingContinuations.append(continuation) + + if verbose { + if hadPendingOperation { + print("[ASQ] β”œβ”€ ⚠️ REPLACED previous pending operation with Operation #\(operationId)") + } else { + print("[ASQ] β”œβ”€ βœ“ Set Operation #\(operationId) as pending operation") + } + print("[ASQ] β”œβ”€ pendingContinuations.count (after): \(pendingContinuations.count)") + } + + // If nothing is currently running, start processing + if currentTask == nil { + if verbose { + print("[ASQ] └─ ▢️ No task running, calling processNext() for Operation #\(operationId)") + } + processNext() + } else { + if verbose { + print("[ASQ] └─ ⏸️ Task already running, Operation #\(operationId) will wait") + } + } + } + } + + private func processNext() { + if verbose { + print("[ASQ] 🟒 processNext() called") + print("[ASQ] β”œβ”€ pendingOperation == nil: \(pendingOperation == nil)") + print("[ASQ] β”œβ”€ pendingContinuations.count: \(pendingContinuations.count)") + } + + guard let operation = pendingOperation else { + // No pending work + if verbose { + print("[ASQ] β”œβ”€ β›” No pending operation, cleaning up") + } + currentTask = nil + if verbose { + print("[ASQ] └─ βœ“ currentTask set to nil, queue is now idle") + } + return + } + + // Clear pending state and capture continuations + pendingOperation = nil + let continuations = pendingContinuations + pendingContinuations = [] + + if verbose { + print("[ASQ] β”œβ”€ βœ“ Captured \(continuations.count) continuation(s) to resume") + print("[ASQ] β”œβ”€ βœ“ Cleared pendingOperation and pendingContinuations") + print("[ASQ] └─ πŸš€ Starting new Task to execute operation") + } + + // Start the task + currentTask = Task { [weak self, verbose] in + if verbose { + print("[ASQ] πŸ”„ Task execution started") + } + await operation() + if verbose { + print("[ASQ] βœ… Task execution completed") + } + + // Resume all waiting callers + if verbose { + print("[ASQ] πŸ“€ Resuming \(continuations.count) continuation(s)") + } + for (index, continuation) in continuations.enumerated() { + if verbose { + print("[ASQ] β”œβ”€ Resuming continuation #\(index + 1)") + } + continuation.resume() + } + if verbose { + print("[ASQ] βœ“ All continuations resumed") + } + + // Process next operation if any arrived while we were running + if verbose { + print("[ASQ] πŸ” Checking for next operation...") + } + await self?.processNext() + } + } +} diff --git a/Sources/OpenFeature/OpenFeatureAPI.swift b/Sources/OpenFeature/OpenFeatureAPI.swift index 00886fd..53b3c12 100644 --- a/Sources/OpenFeature/OpenFeatureAPI.swift +++ b/Sources/OpenFeature/OpenFeatureAPI.swift @@ -1,63 +1,12 @@ import Combine import Foundation -/// Simple serial async task queue that coalesces operations. -/// Only the currently running task and at most one pending operation are kept. -/// Intermediate operations are skipped to avoid queue buildup. -private actor AsyncSerialQueue { - private var currentTask: Task? - private var pendingOperation: (() async -> Void)? - private var pendingContinuations: [CheckedContinuation] = [] - - /// Runs the given operation serially. If an operation is already running, - /// this operation replaces any previously pending operation (which gets skipped). - /// All callers whose operations were replaced will wait for the latest operation to complete. - func run(_ operation: @Sendable @escaping () async -> Void) async { - await withCheckedContinuation { continuation in - // Replace any pending operation with this new one - pendingOperation = operation - pendingContinuations.append(continuation) - - // If nothing is currently running, start processing - if currentTask == nil { - processNext() - } - } - } - - private func processNext() { - guard let operation = pendingOperation else { - // No pending work - currentTask = nil - return - } - - // Clear pending state and capture continuations - pendingOperation = nil - let continuations = pendingContinuations - pendingContinuations = [] - - // Start the task - currentTask = Task { [weak self] in - await operation() - - // Resume all waiting callers - for continuation in continuations { - continuation.resume() - } - - // Process next operation if any arrived while we were running - await self?.processNext() - } - } -} - /// A global singleton which holds base configuration for the OpenFeature library. /// Configuration here will be shared across all ``Client``s. public class OpenFeatureAPI { private let eventHandler = EventHandler() private let stateQueue = DispatchQueue(label: "com.openfeature.state.queue") - private let atomicOperationsQueue = AsyncSerialQueue() + private let atomicOperationsQueue: AsyncSerialQueue private(set) var providerSubject = CurrentValueSubject(nil) private(set) var evaluationContext: EvaluationContext? @@ -68,6 +17,9 @@ public class OpenFeatureAPI { static public let shared = OpenFeatureAPI() public init() { + // Check for OPENFEATURE_ASQ_VERBOSE environment variable to enable verbose logging + let verboseMode = ProcessInfo.processInfo.environment["OPENFEATURE_ASQ_VERBOSE"] != nil + atomicOperationsQueue = AsyncSerialQueue(verbose: verboseMode) } /** diff --git a/Tests/OpenFeatureTests/AsyncSerialQueueTests.swift b/Tests/OpenFeatureTests/AsyncSerialQueueTests.swift new file mode 100644 index 0000000..9ed40ca --- /dev/null +++ b/Tests/OpenFeatureTests/AsyncSerialQueueTests.swift @@ -0,0 +1,641 @@ +import XCTest +import Combine +@testable import OpenFeature + +class AsyncSerialQueueTests: XCTestCase { + override func setUp() { + super.setUp() + OpenFeatureAPI.shared.clearProvider() + } + + func testConcurrentSetEvaluationContextRaceCondition() async throws { + let provider = MockProvider() + let readyExpectation = XCTestExpectation(description: "Ready") + let cancellable = OpenFeatureAPI.shared.observe().sink { event in + if case .ready = event { + readyExpectation.fulfill() + } + } + OpenFeatureAPI.shared.setProvider(provider: provider) + await fulfillment(of: [readyExpectation], timeout: 2.0) + + let concurrentOperations = 100 + let expectedTargetingKeys = Set((0.. [String] { + return executedOperations + } + } + + let tracker = ExecutionTracker() + + // Create a provider with a slow onContextSet to ensure operations overlap + let provider = MockProvider( + onContextSet: { oldContext, newContext in + // Add delay to simulate slow provider operation + // This ensures that when tasks 2 and 3 are queued, task 1 is still running + let targetingKey = newContext.getTargetingKey() + await tracker.recordExecution(targetingKey) + try await Task.sleep(nanoseconds: 50_000_000) // 50ms + } + ) + + // Set up provider first + await OpenFeatureAPI.shared.setProviderAndWait(provider: provider) + + // Start three concurrent context updates + async let task1: Void = { + let ctx1 = ImmutableContext( + targetingKey: "user1", + structure: ImmutableStructure(attributes: ["id": .integer(1)]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx1) + }() + + // Small delay to ensure task1 starts first + try await Task.sleep(nanoseconds: 10_000_000) // 10ms + + async let task2: Void = { + let ctx2 = ImmutableContext( + targetingKey: "user2", + structure: ImmutableStructure(attributes: ["id": .integer(2)]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx2) + }() + + // Small delay to ensure task2 starts before task3 + try await Task.sleep(nanoseconds: 10_000_000) // 10ms + + async let task3: Void = { + let ctx3 = ImmutableContext( + targetingKey: "user3", + structure: ImmutableStructure(attributes: ["id": .integer(3)]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx3) + }() + + // Wait for all tasks to complete + await task1 + await task2 + await task3 + + let executedOperations = await tracker.getExecutions() + + // Verify coalescence: if working correctly, should have executed at most 2 operations + // (first one + latest one), skipping the middle one + XCTAssertLessThanOrEqual( + executedOperations.count, + 2, + "Should execute at most 2 operations due to coalescence (first + latest), but executed: \(executedOperations)" + ) + + // Verify user2 was NOT executed (it should be coalesced/skipped) + XCTAssertFalse( + executedOperations.contains("user2"), + "user2 operation should have been skipped due to coalescence" + ) + + // Verify final context is from the last operation + let finalContext = OpenFeatureAPI.shared.getEvaluationContext() + XCTAssertEqual( + finalContext?.getTargetingKey(), + "user3", + "Final context should be from the last operation (user3)" + ) + } + + // MARK: - Edge Case Tests for AsyncSerialQueue Coalescence + + func testAsyncSerialQueueSingleOperation() async throws { + // Test that a single operation executes normally without coalescence + actor ExecutionTracker { + var executedOperations: [String] = [] + func recordExecution(_ operation: String) { + executedOperations.append(operation) + } + func getExecutions() -> [String] { executedOperations } + } + + let tracker = ExecutionTracker() + let provider = MockProvider( + onContextSet: { oldContext, newContext in + await tracker.recordExecution(newContext.getTargetingKey()) + } + ) + + await OpenFeatureAPI.shared.setProviderAndWait(provider: provider) + + let ctx = ImmutableContext( + targetingKey: "single-user", + structure: ImmutableStructure(attributes: ["id": .integer(1)]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx) + + let executedOperations = await tracker.getExecutions() + XCTAssertEqual(executedOperations.count, 1, "Single operation should execute exactly once") + XCTAssertEqual(executedOperations.first, "single-user", "Should execute the single operation") + XCTAssertEqual(OpenFeatureAPI.shared.getEvaluationContext()?.getTargetingKey(), "single-user") + } + + func testAsyncSerialQueueTwoSequentialOperations() async throws { + // Test two operations that don't overlap - both should execute + actor ExecutionTracker { + var executedOperations: [String] = [] + func recordExecution(_ operation: String) { + executedOperations.append(operation) + } + func getExecutions() -> [String] { executedOperations } + } + + let tracker = ExecutionTracker() + let provider = MockProvider( + onContextSet: { oldContext, newContext in + await tracker.recordExecution(newContext.getTargetingKey()) + try await Task.sleep(nanoseconds: 10_000_000) // 10ms + } + ) + + await OpenFeatureAPI.shared.setProviderAndWait(provider: provider) + + // First operation + let ctx1 = ImmutableContext( + targetingKey: "user1", + structure: ImmutableStructure(attributes: ["id": .integer(1)]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx1) + + // Second operation - starts after first completes + let ctx2 = ImmutableContext( + targetingKey: "user2", + structure: ImmutableStructure(attributes: ["id": .integer(2)]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx2) + + let executedOperations = await tracker.getExecutions() + XCTAssertEqual(executedOperations.count, 2, "Both sequential operations should execute") + XCTAssertEqual(executedOperations, ["user1", "user2"], "Operations should execute in order") + XCTAssertEqual(OpenFeatureAPI.shared.getEvaluationContext()?.getTargetingKey(), "user2") + } + + func testAsyncSerialQueueRapidBurstCoalescence() async throws { + // Test that rapid bursts of many operations get heavily coalesced + actor ExecutionTracker { + var executedOperations: [String] = [] + func recordExecution(_ operation: String) { + executedOperations.append(operation) + } + func getExecutions() -> [String] { executedOperations } + } + + let tracker = ExecutionTracker() + let provider = MockProvider( + onContextSet: { oldContext, newContext in + await tracker.recordExecution(newContext.getTargetingKey()) + try await Task.sleep(nanoseconds: 100_000_000) // 100ms - long enough for many to queue + } + ) + + await OpenFeatureAPI.shared.setProviderAndWait(provider: provider) + + // Launch 10 operations rapidly + await withTaskGroup(of: Void.self) { group in + for i in 0..<10 { + group.addTask { + let ctx = ImmutableContext( + targetingKey: "user\(i)", + structure: ImmutableStructure(attributes: ["id": .integer(Int64(i))]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx) + } + } + } + + let executedOperations = await tracker.getExecutions() + + // Should execute far fewer than 10 operations due to coalescence + XCTAssertLessThanOrEqual( + executedOperations.count, + 3, + "Rapid burst should heavily coalesce, executed: \(executedOperations)" + ) + + // Final context should be from one of the last operations + let finalContext = OpenFeatureAPI.shared.getEvaluationContext() + let finalKey = finalContext?.getTargetingKey() ?? "" + XCTAssertTrue( + finalKey.hasPrefix("user"), + "Final context should be from one of the operations" + ) + } + + func testAsyncSerialQueueOperationsArrivingAfterCompletion() async throws { + // Test that operations arriving after the previous one completes still execute + actor ExecutionTracker { + var executedOperations: [String] = [] + func recordExecution(_ operation: String) { + executedOperations.append(operation) + } + func getExecutions() -> [String] { executedOperations } + } + + let tracker = ExecutionTracker() + let provider = MockProvider( + onContextSet: { oldContext, newContext in + await tracker.recordExecution(newContext.getTargetingKey()) + } + ) + + await OpenFeatureAPI.shared.setProviderAndWait(provider: provider) + + // First operation + let ctx1 = ImmutableContext( + targetingKey: "batch1-user1", + structure: ImmutableStructure(attributes: ["id": .integer(1)]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx1) + + // Wait a bit to ensure first operation is completely done + try await Task.sleep(nanoseconds: 50_000_000) // 50ms + + // Now submit several operations that arrive after the queue is idle + async let task2: Void = { + let ctx2 = ImmutableContext( + targetingKey: "batch2-user2", + structure: ImmutableStructure(attributes: ["id": .integer(2)]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx2) + }() + + try await Task.sleep(nanoseconds: 5_000_000) // 5ms + + async let task3: Void = { + let ctx3 = ImmutableContext( + targetingKey: "batch2-user3", + structure: ImmutableStructure(attributes: ["id": .integer(3)]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx3) + }() + + await task2 + await task3 + + let executedOperations = await tracker.getExecutions() + + // Should have first operation, and at least one from the second batch + XCTAssertGreaterThanOrEqual(executedOperations.count, 2, "Should execute operations from both batches") + XCTAssertEqual(executedOperations.first, "batch1-user1", "First operation should execute") + + let finalContext = OpenFeatureAPI.shared.getEvaluationContext() + XCTAssertTrue( + finalContext?.getTargetingKey().hasPrefix("batch2") ?? false, + "Final context should be from second batch" + ) + } + + func testAsyncSerialQueueWithErrorHandling() async throws { + // Test that errors in operations don't break the queue + actor ExecutionTracker { + var executedOperations: [String] = [] + var errorOccurred = false + func recordExecution(_ operation: String) { + executedOperations.append(operation) + } + func recordError() { + errorOccurred = true + } + func getExecutions() -> [String] { executedOperations } + func hasError() -> Bool { errorOccurred } + } + + let tracker = ExecutionTracker() + let provider = MockProvider( + onContextSet: { oldContext, newContext in + let targetingKey = newContext.getTargetingKey() + await tracker.recordExecution(targetingKey) + + // Throw error for "error-user" + if targetingKey == "error-user" { + await tracker.recordError() + throw MockProvider.MockProviderError.message("Simulated error") + } + + try await Task.sleep(nanoseconds: 30_000_000) // 30ms + } + ) + + await OpenFeatureAPI.shared.setProviderAndWait(provider: provider) + + // Launch operations including one that will error + async let task1: Void = { + let ctx1 = ImmutableContext( + targetingKey: "user1", + structure: ImmutableStructure(attributes: ["id": .integer(1)]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx1) + }() + + try await Task.sleep(nanoseconds: 10_000_000) // 10ms + + async let task2: Void = { + let ctx2 = ImmutableContext( + targetingKey: "error-user", + structure: ImmutableStructure(attributes: ["id": .integer(2)]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx2) + }() + + try await Task.sleep(nanoseconds: 10_000_000) // 10ms + + async let task3: Void = { + let ctx3 = ImmutableContext( + targetingKey: "user3", + structure: ImmutableStructure(attributes: ["id": .integer(3)]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx3) + }() + + await task1 + await task2 + await task3 + + let executedOperations = await tracker.getExecutions() + + // Should execute at most 2 operations due to coalescence + XCTAssertLessThanOrEqual(executedOperations.count, 2, "Should coalesce despite errors") + + // Final context depends on which operations were coalesced + // But the queue should still be functional + let finalContext = OpenFeatureAPI.shared.getEvaluationContext() + XCTAssertNotNil(finalContext, "Queue should continue functioning after error") + + // Provider status should reflect the error + let status = OpenFeatureAPI.shared.getProviderStatus() + XCTAssertTrue( + [.ready, .error].contains(status), + "Status should be either ready or error depending on final operation" + ) + } + + func testAsyncSerialQueueContinuationResumeCorrectness() async throws { + // Test that all callers get resumed correctly, even those whose operations were skipped + actor CompletionTracker { + var completedTasks: Set = [] + func markCompleted(_ taskId: Int) { + completedTasks.insert(taskId) + } + func getCompleted() -> Set { completedTasks } + } + + let completionTracker = CompletionTracker() + let provider = MockProvider( + onContextSet: { oldContext, newContext in + try await Task.sleep(nanoseconds: 50_000_000) // 50ms + } + ) + + await OpenFeatureAPI.shared.setProviderAndWait(provider: provider) + + // Launch multiple tasks and track that they all complete + await withTaskGroup(of: Void.self) { group in + for i in 0..<5 { + group.addTask { + let ctx = ImmutableContext( + targetingKey: "user\(i)", + structure: ImmutableStructure(attributes: ["id": .integer(Int64(i))]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx) + await completionTracker.markCompleted(i) + } + + // Small stagger to ensure they overlap + try? await Task.sleep(nanoseconds: 5_000_000) // 5ms + } + } + + let completedTasks = await completionTracker.getCompleted() + + // All 5 tasks should have completed (received their continuation resume) + XCTAssertEqual( + completedTasks.count, + 5, + "All tasks should complete even if their operations were coalesced" + ) + XCTAssertEqual( + completedTasks, + Set([0, 1, 2, 3, 4]), + "All task IDs should be marked as completed" + ) + } + + func testAsyncSerialQueueNoStarvation() async throws { + // Test that the queue doesn't cause starvation - operations eventually execute + actor ExecutionTracker { + var executedOperations: [String] = [] + func recordExecution(_ operation: String) { + executedOperations.append(operation) + } + func getExecutions() -> [String] { executedOperations } + } + + let tracker = ExecutionTracker() + let provider = MockProvider( + onContextSet: { oldContext, newContext in + await tracker.recordExecution(newContext.getTargetingKey()) + try await Task.sleep(nanoseconds: 20_000_000) // 20ms + } + ) + + await OpenFeatureAPI.shared.setProviderAndWait(provider: provider) + + // Submit operations in waves to test that later waves aren't starved + for wave in 0..<3 { + await withTaskGroup(of: Void.self) { group in + for i in 0..<5 { + group.addTask { + let ctx = ImmutableContext( + targetingKey: "wave\(wave)-user\(i)", + structure: ImmutableStructure(attributes: ["wave": .integer(Int64(wave)), "id": .integer(Int64(i))]) + ) + await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx) + } + } + } + + // Small delay between waves + try await Task.sleep(nanoseconds: 30_000_000) // 30ms + } + + let executedOperations = await tracker.getExecutions() + + // Should have executed at least one operation from each wave + let hasWave0 = executedOperations.contains { $0.hasPrefix("wave0") } + let hasWave1 = executedOperations.contains { $0.hasPrefix("wave1") } + let hasWave2 = executedOperations.contains { $0.hasPrefix("wave2") } + + XCTAssertTrue(hasWave0, "Should execute operations from wave 0") + XCTAssertTrue(hasWave1, "Should execute operations from wave 1") + XCTAssertTrue(hasWave2, "Should execute operations from wave 2") + + // Final context should be from the last wave + let finalContext = OpenFeatureAPI.shared.getEvaluationContext() + XCTAssertTrue( + finalContext?.getTargetingKey().hasPrefix("wave2") ?? false, + "Final context should be from the last wave" + ) + } +} diff --git a/Tests/OpenFeatureTests/ConcurrencyRaceConditionTests.swift b/Tests/OpenFeatureTests/ConcurrencyRaceConditionTests.swift deleted file mode 100644 index beae82d..0000000 --- a/Tests/OpenFeatureTests/ConcurrencyRaceConditionTests.swift +++ /dev/null @@ -1,289 +0,0 @@ -import XCTest -import Combine -@testable import OpenFeature - -class ConcurrencyRaceConditionTests: XCTestCase { - override func setUp() { - super.setUp() - OpenFeatureAPI.shared.clearProvider() - } - - func testConcurrentSetEvaluationContextRaceCondition() async throws { - let provider = MockProvider() - let readyExpectation = XCTestExpectation(description: "Ready") - let cancellable = OpenFeatureAPI.shared.observe().sink { event in - if case .ready = event { - readyExpectation.fulfill() - } - } - OpenFeatureAPI.shared.setProvider(provider: provider) - await fulfillment(of: [readyExpectation], timeout: 2.0) - - let concurrentOperations = 100 - let expectedTargetingKeys = Set((0.. [String] { - return executedOperations - } - } - - let tracker = ExecutionTracker() - - // Create a provider with a slow onContextSet to ensure operations overlap - let provider = MockProvider( - onContextSet: { oldContext, newContext in - // Add delay to simulate slow provider operation - // This ensures that when tasks 2 and 3 are queued, task 1 is still running - let targetingKey = newContext.getTargetingKey() - print("βš™οΈ Provider.onContextSet running for: \(targetingKey)") - await tracker.recordExecution(targetingKey) - try await Task.sleep(nanoseconds: 50_000_000) // 50ms - print("βœ“ Provider.onContextSet completed for: \(targetingKey)") - } - ) - - // Set up provider first - print("πŸ”§ Setting up provider...") - await OpenFeatureAPI.shared.setProviderAndWait(provider: provider) - print("βœ… Provider ready\n") - - // Start three concurrent context updates - print("πŸš€ Starting three concurrent setEvaluationContext calls:\n") - - async let task1: Void = { - print("πŸ”΅ Task 1: STARTING - Creating context for user1") - let ctx1 = ImmutableContext( - targetingKey: "user1", - structure: ImmutableStructure(attributes: ["id": .integer(1)]) - ) - print("πŸ”΅ Task 1: CALLING setEvaluationContextAndWait(user1)") - await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx1) - print("πŸ”΅ Task 1: RETURNED from setEvaluationContextAndWait") - print("πŸ”΅ Task 1: COMPLETED\n") - }() - - // Small delay to ensure task1 starts first - try await Task.sleep(nanoseconds: 10_000_000) // 10ms - - async let task2: Void = { - print("🟒 Task 2: STARTING - Creating context for user2") - let ctx2 = ImmutableContext( - targetingKey: "user2", - structure: ImmutableStructure(attributes: ["id": .integer(2)]) - ) - print("🟒 Task 2: CALLING setEvaluationContextAndWait(user2)") - await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx2) - print("🟒 Task 2: RETURNED from setEvaluationContextAndWait") - print("🟒 Task 2: COMPLETED\n") - }() - - // Small delay to ensure task2 starts before task3 - try await Task.sleep(nanoseconds: 10_000_000) // 10ms - - async let task3: Void = { - print("πŸ”΄ Task 3: STARTING - Creating context for user3") - let ctx3 = ImmutableContext( - targetingKey: "user3", - structure: ImmutableStructure(attributes: ["id": .integer(3)]) - ) - print("πŸ”΄ Task 3: CALLING setEvaluationContextAndWait(user3)") - await OpenFeatureAPI.shared.setEvaluationContextAndWait(evaluationContext: ctx3) - print("πŸ”΄ Task 3: RETURNED from setEvaluationContextAndWait") - print("πŸ”΄ Task 3: COMPLETED\n") - }() - - // Wait for all tasks to complete - await task1 - await task2 - await task3 - - let executedOperations = await tracker.getExecutions() - - print("========== RESULTS ==========") - print("Operations that actually executed: \(executedOperations)") - print("Total operations executed: \(executedOperations.count)") - print("Expected: 2 operations (user1 and user3)") - print("Expected skipped: user2 (replaced by user3)") - print("========================================\n") - - // Verify coalescence: if working correctly, should have executed at most 2 operations - // (first one + latest one), skipping the middle one - XCTAssertLessThanOrEqual( - executedOperations.count, - 2, - "Should execute at most 2 operations due to coalescence (first + latest), but executed: \(executedOperations)" - ) - - // Verify user2 was NOT executed (it should be coalesced/skipped) - XCTAssertFalse( - executedOperations.contains("user2"), - "user2 operation should have been skipped due to coalescence" - ) - - // Verify final context is from the last operation - let finalContext = OpenFeatureAPI.shared.getEvaluationContext() - XCTAssertEqual( - finalContext?.getTargetingKey(), - "user3", - "Final context should be from the last operation (user3)" - ) - } -} From 68cd7fc7a516463077a988490955671437a71340 Mon Sep 17 00:00:00 2001 From: Fabrizio Demaria Date: Tue, 11 Nov 2025 13:36:32 +0100 Subject: [PATCH 3/3] refactor: renaming Signed-off-by: Fabrizio Demaria --- ...swift => AsyncCoalescingSerialQueue.swift} | 2 +- Sources/OpenFeature/OpenFeatureAPI.swift | 8 ++++---- .../AsyncSerialQueueTests.swift | 20 +++++++++---------- 3 files changed, 15 insertions(+), 15 deletions(-) rename Sources/OpenFeature/{AsyncSerialQueue.swift => AsyncCoalescingSerialQueue.swift} (99%) diff --git a/Sources/OpenFeature/AsyncSerialQueue.swift b/Sources/OpenFeature/AsyncCoalescingSerialQueue.swift similarity index 99% rename from Sources/OpenFeature/AsyncSerialQueue.swift rename to Sources/OpenFeature/AsyncCoalescingSerialQueue.swift index 355d3bd..82f4e49 100644 --- a/Sources/OpenFeature/AsyncSerialQueue.swift +++ b/Sources/OpenFeature/AsyncCoalescingSerialQueue.swift @@ -3,7 +3,7 @@ import Foundation /// Simple serial async task queue that coalesces operations. /// Only the currently running task and at most one pending operation are kept. /// Intermediate operations are skipped to avoid queue buildup. -internal actor AsyncSerialQueue { +internal actor AsyncCoalescingSerialQueue { private var currentTask: Task? private var pendingOperation: (() async -> Void)? private var pendingContinuations: [CheckedContinuation] = [] diff --git a/Sources/OpenFeature/OpenFeatureAPI.swift b/Sources/OpenFeature/OpenFeatureAPI.swift index 53b3c12..2a4a17b 100644 --- a/Sources/OpenFeature/OpenFeatureAPI.swift +++ b/Sources/OpenFeature/OpenFeatureAPI.swift @@ -6,7 +6,7 @@ import Foundation public class OpenFeatureAPI { private let eventHandler = EventHandler() private let stateQueue = DispatchQueue(label: "com.openfeature.state.queue") - private let atomicOperationsQueue: AsyncSerialQueue + private let contextQueue: AsyncCoalescingSerialQueue private(set) var providerSubject = CurrentValueSubject(nil) private(set) var evaluationContext: EvaluationContext? @@ -19,7 +19,7 @@ public class OpenFeatureAPI { public init() { // Check for OPENFEATURE_ASQ_VERBOSE environment variable to enable verbose logging let verboseMode = ProcessInfo.processInfo.environment["OPENFEATURE_ASQ_VERBOSE"] != nil - atomicOperationsQueue = AsyncSerialQueue(verbose: verboseMode) + contextQueue = AsyncCoalescingSerialQueue(verbose: verboseMode) } /** @@ -139,7 +139,7 @@ public class OpenFeatureAPI { } private func setProviderInternal(provider: FeatureProvider, initialContext: EvaluationContext? = nil) async { - await atomicOperationsQueue.run { [self] in + await contextQueue.run { [self] in // Set initial state atomically stateQueue.sync { self.providerStatus = .notReady @@ -176,7 +176,7 @@ public class OpenFeatureAPI { } private func updateContext(evaluationContext: EvaluationContext) async { - await atomicOperationsQueue.run { [self] in + await contextQueue.run { [self] in // Get old context and set new context atomically let (oldContext, provider) = stateQueue.sync { () -> (EvaluationContext?, FeatureProvider?) in let oldContext = self.evaluationContext diff --git a/Tests/OpenFeatureTests/AsyncSerialQueueTests.swift b/Tests/OpenFeatureTests/AsyncSerialQueueTests.swift index 9ed40ca..ba263ea 100644 --- a/Tests/OpenFeatureTests/AsyncSerialQueueTests.swift +++ b/Tests/OpenFeatureTests/AsyncSerialQueueTests.swift @@ -2,7 +2,7 @@ import XCTest import Combine @testable import OpenFeature -class AsyncSerialQueueTests: XCTestCase { +class AsyncCoalescingSerialQueueTests: XCTestCase { override func setUp() { super.setUp() OpenFeatureAPI.shared.clearProvider() @@ -168,7 +168,7 @@ class AsyncSerialQueueTests: XCTestCase { } } - func testAsyncSerialQueueCoalescence() async throws { + func testAsyncCoalescingSerialQueueCoalescence() async throws { // Track which operations actually executed actor ExecutionTracker { var executedOperations: [String] = [] @@ -259,9 +259,9 @@ class AsyncSerialQueueTests: XCTestCase { ) } - // MARK: - Edge Case Tests for AsyncSerialQueue Coalescence + // MARK: - Edge Case Tests for AsyncCoalescingSerialQueue Coalescence - func testAsyncSerialQueueSingleOperation() async throws { + func testAsyncCoalescingSerialQueueSingleOperation() async throws { // Test that a single operation executes normally without coalescence actor ExecutionTracker { var executedOperations: [String] = [] @@ -292,7 +292,7 @@ class AsyncSerialQueueTests: XCTestCase { XCTAssertEqual(OpenFeatureAPI.shared.getEvaluationContext()?.getTargetingKey(), "single-user") } - func testAsyncSerialQueueTwoSequentialOperations() async throws { + func testAsyncCoalescingSerialQueueTwoSequentialOperations() async throws { // Test two operations that don't overlap - both should execute actor ExecutionTracker { var executedOperations: [String] = [] @@ -332,7 +332,7 @@ class AsyncSerialQueueTests: XCTestCase { XCTAssertEqual(OpenFeatureAPI.shared.getEvaluationContext()?.getTargetingKey(), "user2") } - func testAsyncSerialQueueRapidBurstCoalescence() async throws { + func testAsyncCoalescingSerialQueueRapidBurstCoalescence() async throws { // Test that rapid bursts of many operations get heavily coalesced actor ExecutionTracker { var executedOperations: [String] = [] @@ -383,7 +383,7 @@ class AsyncSerialQueueTests: XCTestCase { ) } - func testAsyncSerialQueueOperationsArrivingAfterCompletion() async throws { + func testAsyncCoalescingSerialQueueOperationsArrivingAfterCompletion() async throws { // Test that operations arriving after the previous one completes still execute actor ExecutionTracker { var executedOperations: [String] = [] @@ -447,7 +447,7 @@ class AsyncSerialQueueTests: XCTestCase { ) } - func testAsyncSerialQueueWithErrorHandling() async throws { + func testAsyncCoalescingSerialQueueWithErrorHandling() async throws { // Test that errors in operations don't break the queue actor ExecutionTracker { var executedOperations: [String] = [] @@ -531,7 +531,7 @@ class AsyncSerialQueueTests: XCTestCase { ) } - func testAsyncSerialQueueContinuationResumeCorrectness() async throws { + func testAsyncCoalescingSerialQueueContinuationResumeCorrectness() async throws { // Test that all callers get resumed correctly, even those whose operations were skipped actor CompletionTracker { var completedTasks: Set = [] @@ -582,7 +582,7 @@ class AsyncSerialQueueTests: XCTestCase { ) } - func testAsyncSerialQueueNoStarvation() async throws { + func testAsyncCoalescingSerialQueueNoStarvation() async throws { // Test that the queue doesn't cause starvation - operations eventually execute actor ExecutionTracker { var executedOperations: [String] = []