Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 29 additions & 12 deletions Sources/OpenFeature/OpenFeatureAPI.swift
Original file line number Diff line number Diff line change
@@ -1,19 +1,36 @@
import Combine
import Foundation

/// Simple serial async task queue for serializing operations
private actor AsyncSerialQueue {
private var last: Task<Void, Never>?

/// Runs the given operation after previously enqueued work completes.
/// Async queue that only executes the latest operation, cancelling pending ones
/// This implements "last wins" semantics where intermediate operations that haven't
/// started yet will be skipped in favor of the most recently queued operation.
internal actor AsyncLastWinsQueue {
private var currentTask: Task<Void, Never>?
private var pendingOperation: (() async -> Void)?

/// Runs the given operation, but only the latest one will execute.
/// Any pending operations that haven't started yet will be skipped.
func run(_ operation: @Sendable @escaping () async -> Void) async {
let previous = last
let task = Task {
_ = await previous?.result
await operation()
// Store this as the pending operation
pendingOperation = operation

// If there's already a task running, it will pick up this new operation when done
if currentTask == nil {
await executeNext()
}
}

private func executeNext() async {
while let operation = pendingOperation {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we have while here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a pendingOperation has been added while the current operation is running, the while ensures we pick it up

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but if we wait for nextExecute in run, and we only get run happening sequentially because it's in an actor, then we can't have another operation running?

pendingOperation = nil // Clear pending before starting

let task = Task {
await operation()
}
currentTask = task
await task.value
currentTask = nil
}
last = task
await task.value
}
}

Expand All @@ -22,7 +39,7 @@ private actor AsyncSerialQueue {
public class OpenFeatureAPI {
private let eventHandler = EventHandler()
private let stateQueue = DispatchQueue(label: "com.openfeature.state.queue")
private let atomicOperationsQueue = AsyncSerialQueue()
private let atomicOperationsQueue = AsyncLastWinsQueue()

private(set) var providerSubject = CurrentValueSubject<FeatureProvider?, Never>(nil)
private(set) var evaluationContext: EvaluationContext?
Expand Down
256 changes: 256 additions & 0 deletions Tests/OpenFeatureTests/AsyncLastWinsQueueTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
import XCTest
@testable import OpenFeature

class AsyncLastWinsQueueTests: XCTestCase {

// MARK: - Basic Behavior Tests

func testSingleOperationExecutes() async throws {
let queue = AsyncLastWinsQueue()
let executed = ActorBox<Bool>(false)

await queue.run {
await executed.set(true)
}

let result = await executed.get()
XCTAssertTrue(result, "Single operation should execute")
}

func testSequentialOperationsAllExecute() async throws {
let queue = AsyncLastWinsQueue()
let counter = ActorBox<Int>(0)

// Execute 3 operations sequentially (one at a time)
await queue.run {
await counter.increment()
}

await queue.run {
await counter.increment()
}

await queue.run {
await counter.increment()
}

let result = await counter.get()
XCTAssertEqual(result, 3, "All sequential operations should execute")
}

// MARK: - Core "Last Wins" Tests

func testConcurrentOperationsSkipIntermediate() async throws {
let queue = AsyncLastWinsQueue()
let executionOrder = ActorBox<[Int]>([])
let blockFirstOperation = ActorBox<Bool>(true)

// Start 5 operations concurrently
// The first one will block, the middle ones should be skipped,
// only the last one should execute after the first completes
await withTaskGroup(of: Void.self) { group in
for i in 0..<5 {
group.addTask {
await queue.run {
// First operation blocks until we release it
if i == 0 {
while await blockFirstOperation.get() {
try? await Task.sleep(nanoseconds: 10_000_000) // 10ms
}
}
await executionOrder.append(i)
}
}
}

// Give time for all operations to be queued
try? await Task.sleep(nanoseconds: 50_000_000) // 50ms

// Release the first operation
await blockFirstOperation.set(false)
}

let order = await executionOrder.get()

// Should execute: operation 0 (first, was running) and operation 4 (last wins)
XCTAssertEqual(order.count, 2, "Should only execute 2 operations: first and last")
XCTAssertEqual(order[0], 0, "First operation should execute first")
XCTAssertEqual(order[1], 4, "Last operation should execute second")
}

func testRapidFireOnlyExecutesFirstAndLast() async throws {
let queue = AsyncLastWinsQueue()
let executed = ActorBox<Set<Int>>([])

await withTaskGroup(of: Void.self) { group in
// Launch 100 operations that all try to start simultaneously
for i in 0..<100 {
group.addTask {
await queue.run {
// Simulate some work
try? await Task.sleep(nanoseconds: 1_000_000) // 1ms
await executed.insert(i)
}
}
}
}

let executedSet = await executed.get()

// Should execute much fewer than 100 operations
XCTAssertLessThan(executedSet.count, 100, "Should skip many intermediate operations")

// First operation should execute (it started immediately)
XCTAssertTrue(executedSet.contains(0), "First operation should execute")

// Last operation should execute (last wins)
XCTAssertTrue(executedSet.contains(99), "Last operation should execute")

// Total executed should be small (first + maybe a few more + last)
XCTAssertLessThan(executedSet.count, 10, "Should execute very few operations in rapid fire")
}

// MARK: - Ordering and Consistency Tests

func testOperationsNeverRunConcurrently() async throws {
let queue = AsyncLastWinsQueue()
let concurrentExecutions = ActorBox<Int>(0)
let maxConcurrent = ActorBox<Int>(0)
let errors = ActorBox<[String]>([])

await withTaskGroup(of: Void.self) { group in
for i in 0..<50 {
group.addTask {
await queue.run {
let current = await concurrentExecutions.increment()

if current > 1 {
await errors.append("Concurrent execution detected at operation \(i)")
}

await maxConcurrent.updateMax(current)

// Simulate work
try? await Task.sleep(nanoseconds: 5_000_000) // 5ms

await concurrentExecutions.decrement()
}
}
}
}

let max = await maxConcurrent.get()
let errorList = await errors.get()

XCTAssertEqual(max, 1, "Should never have more than 1 concurrent execution")
XCTAssertTrue(errorList.isEmpty, "Should have no concurrent execution errors: \(errorList)")
}

func testFinalStateReflectsLastOperation() async throws {
let queue = AsyncLastWinsQueue()
let finalValue = ActorBox<String?>(nil)
let slowOperationStarted = ActorBox<Bool>(false)
let slowOperationCanProceed = ActorBox<Bool>(false)

await withTaskGroup(of: Void.self) { group in
// Start a slow operation
group.addTask {
await queue.run {
await slowOperationStarted.set(true)
// Wait for signal
while !(await slowOperationCanProceed.get()) {
try? await Task.sleep(nanoseconds: 10_000_000)
}
await finalValue.set("slow")
}
}

// Wait for slow operation to start
while !(await slowOperationStarted.get()) {
try? await Task.sleep(nanoseconds: 10_000_000)
}

// Queue multiple operations while slow one is running
group.addTask {
await queue.run {
await finalValue.set("middle1")
}
}

group.addTask {
await queue.run {
await finalValue.set("middle2")
}
}

group.addTask {
await queue.run {
await finalValue.set("last")
}
}

// Give time for all to be queued
try? await Task.sleep(nanoseconds: 50_000_000)

// Release slow operation
await slowOperationCanProceed.set(true)
}

let result = await finalValue.get()
XCTAssertEqual(result, "last", "Final state should reflect the last queued operation")
}
}

// MARK: - Helper Actor for Thread-Safe State

actor ActorBox<T> {
private var value: T

init(_ initialValue: T) {
self.value = initialValue
}

func get() -> T {
return value
}

func set(_ newValue: T) {
self.value = newValue
}
}

extension ActorBox where T == Int {
@discardableResult
func increment() -> Int {
value += 1
return value
}

func decrement() {
value -= 1
}

func updateMax(_ candidate: Int) {
if candidate > value {
value = candidate
}
}
}

extension ActorBox where T == [Int] {
func append(_ element: Int) {
value.append(element)
}
}

extension ActorBox where T == [String] {
func append(_ element: String) {
value.append(element)
}
}

extension ActorBox where T == Set<Int> {
func insert(_ element: Int) {
value.insert(element)
}
}