Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions Sources/OpenFeature/AsyncProviderOperationsQueue.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import Foundation

/// Unified serial async task queue with operation-type-aware last-wins semantics.
/// - Non-last-wins operations always execute in order
/// - Consecutive last-wins operations: only the last one executes
/// - Order is always preserved
internal actor AsyncProviderOperationsQueue {
private var currentTask: Task<Void, Never>?

private struct QueuedOperation {
let operation: () async -> Void
let continuation: CheckedContinuation<Void, Never>
let lastWins: Bool
}

private var queue: [QueuedOperation] = []

/// Runs the given operation serially.
/// - If lastWins is false: operation always executes
/// - If lastWins is true: may be skipped if superseded by a later last-wins operation
func run(lastWins: Bool, operation: @Sendable @escaping () async -> Void) async {
await withCheckedContinuation { continuation in
queue.append(QueuedOperation(operation: operation, continuation: continuation, lastWins: lastWins))

if currentTask == nil {
processNext()
}
}
}

private func processNext() {
guard !queue.isEmpty else {
currentTask = nil
return
}

// Find the next batch to execute
// A batch is either:
// 1. A single non-last-wins operation, OR
// 2. Consecutive last-wins operations (we execute only the last one)

let firstOp = queue[0]

if !firstOp.lastWins {
// Non-last-wins operation: execute it immediately
let op = queue.removeFirst()

Check warning on line 46 in Sources/OpenFeature/AsyncProviderOperationsQueue.swift

View workflow job for this annotation

GitHub Actions / SwiftLint

Identifier Name Violation: Variable name 'op' should be between 3 and 40 characters long (identifier_name)
currentTask = Task { [weak self] in
await op.operation()
op.continuation.resume()
await self?.processNext()
}
} else {
// Last-wins operation: find all consecutive last-wins ops
var lastWinsCount = 0
for op in queue {

Check warning on line 55 in Sources/OpenFeature/AsyncProviderOperationsQueue.swift

View workflow job for this annotation

GitHub Actions / SwiftLint

Identifier Name Violation: Variable name 'op' should be between 3 and 40 characters long (identifier_name)
if op.lastWins {
lastWinsCount += 1
} else {
break
}
}

// Execute only the last one in the last-wins batch
let toSkip = Array(queue.prefix(lastWinsCount - 1))
let toExecute = queue[lastWinsCount - 1]
queue.removeFirst(lastWinsCount)

currentTask = Task { [weak self] in
await toExecute.operation()

// Resume all continuations (both skipped and executed)
for op in toSkip {

Check warning on line 72 in Sources/OpenFeature/AsyncProviderOperationsQueue.swift

View workflow job for this annotation

GitHub Actions / SwiftLint

Identifier Name Violation: Variable name 'op' should be between 3 and 40 characters long (identifier_name)
op.continuation.resume()
}
toExecute.continuation.resume()

await self?.processNext()
}
}
}
}
187 changes: 120 additions & 67 deletions Sources/OpenFeature/OpenFeatureAPI.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import Foundation
/// Configuration here will be shared across all ``Client``s.
public class OpenFeatureAPI {
private let eventHandler = EventHandler()
private let queue = DispatchQueue(label: "com.openfeature.providerDescriptor.queue")
// Sync queue to change state atomically
private let stateQueue = DispatchQueue(label: "com.openfeature.state.queue")
// Queue for provider's initialize and onContextSet operations
private let unifiedQueue = AsyncProviderOperationsQueue()

private(set) var providerSubject = CurrentValueSubject<FeatureProvider?, Never>(nil)
private(set) var evaluationContext: EvaluationContext?
Expand All @@ -15,18 +18,15 @@ public class OpenFeatureAPI {
/// The ``OpenFeatureAPI`` singleton
static public let shared = OpenFeatureAPI()

public init() {
}
public init() {}

/**
Set provider and calls its `initialize` in a background thread.
Readiness can be determined from `getState` or listening for `ready` event.
*/
public func setProvider(provider: FeatureProvider, initialContext: EvaluationContext?) {
queue.async {
Task {
await self.setProviderInternal(provider: provider, initialContext: initialContext)
}
Task {
await self.setProviderInternal(provider: provider, initialContext: initialContext)
}
}

Expand All @@ -35,14 +35,7 @@ public class OpenFeatureAPI {
This async function returns when the `initialize` from the provider is completed.
*/
public func setProviderAndWait(provider: FeatureProvider, initialContext: EvaluationContext?) async {
await withCheckedContinuation { continuation in
queue.async {
Task {
await self.setProviderInternal(provider: provider, initialContext: initialContext)
continuation.resume()
}
}
}
await self.setProviderInternal(provider: provider, initialContext: initialContext)
}

/**
Expand All @@ -62,13 +55,31 @@ public class OpenFeatureAPI {
}

public func getProvider() -> FeatureProvider? {
return self.providerSubject.value
return stateQueue.sync {
self.providerSubject.value
}
}

public func clearProvider() {
queue.sync {
self.providerSubject.send(nil)
self.providerStatus = .notReady
Task {
await clearProviderInternal()
}
}

/**
Clear provider.
This async function returns when the clear operation is completed.
*/
public func clearProviderAndWait() async {
await clearProviderInternal()
}

private func clearProviderInternal() async {
await unifiedQueue.run(lastWins: false) { [self] in
stateQueue.sync {
self.providerSubject.send(nil)
self.providerStatus = .notReady
}
}
}

Expand All @@ -77,10 +88,8 @@ public class OpenFeatureAPI {
Readiness can be determined from `getState` or listening for `contextChanged` event.
*/
public func setEvaluationContext(evaluationContext: EvaluationContext) {
queue.async {
Task {
await self.updateContext(evaluationContext: evaluationContext)
}
Task {
await self.updateContext(evaluationContext: evaluationContext)
}
}

Expand All @@ -89,22 +98,19 @@ public class OpenFeatureAPI {
This async function returns when the `onContextSet` from the provider is completed.
*/
public func setEvaluationContextAndWait(evaluationContext: EvaluationContext) async {
await withCheckedContinuation { continuation in
queue.async {
Task {
await self.updateContext(evaluationContext: evaluationContext)
continuation.resume()
}
}
}
await updateContext(evaluationContext: evaluationContext)
}

public func getEvaluationContext() -> EvaluationContext? {
return self.evaluationContext
return stateQueue.sync {
self.evaluationContext
}
}

public func getProviderStatus() -> ProviderStatus {
return self.providerStatus
return stateQueue.sync {
self.providerStatus
}
}

public func getProviderMetadata() -> ProviderMetadata? {
Expand All @@ -120,11 +126,21 @@ public class OpenFeatureAPI {
}

public func addHooks(hooks: (any Hook)...) {
self.hooks.append(contentsOf: hooks)
stateQueue.sync {
self.hooks.append(contentsOf: hooks)
}
}

public func clearHooks() {
self.hooks.removeAll()
stateQueue.sync {
self.hooks.removeAll()
}
}

internal func getHooks() -> [any Hook] {
return stateQueue.sync {
self.hooks
}
}

public func observe() -> AnyPublisher<ProviderEvent?, Never> {
Expand All @@ -143,7 +159,7 @@ public class OpenFeatureAPI {
}

internal func getState() -> OpenFeatureState {
return queue.sync {
return stateQueue.sync {
OpenFeatureState(
provider: providerSubject.value,
evaluationContext: evaluationContext,
Expand All @@ -152,44 +168,81 @@ public class OpenFeatureAPI {
}

private func setProviderInternal(provider: FeatureProvider, initialContext: EvaluationContext? = nil) async {
self.providerStatus = .notReady
self.providerSubject.send(provider)

if let initialContext = initialContext {
self.evaluationContext = initialContext
}
await unifiedQueue.run(lastWins: false) { [self] in
// Set initial state atomically
stateQueue.sync {
self.providerStatus = .notReady
self.providerSubject.send(provider)
if let initialContext = initialContext {
self.evaluationContext = initialContext
}
}

do {
try await provider.initialize(initialContext: initialContext)
self.providerStatus = .ready
self.eventHandler.send(.ready(nil))
} catch {
switch error {
case OpenFeatureError.providerFatalError(let message):
self.providerStatus = .fatal
self.eventHandler.send(.error(ProviderEventDetails(message: message, errorCode: .providerFatal)))
default:
self.providerStatus = .error
self.eventHandler.send(.error(ProviderEventDetails(message: error.localizedDescription)))
// Initialize provider - this entire operation is atomic
do {
try await provider.initialize(initialContext: initialContext)
stateQueue.sync {
self.providerStatus = .ready
}
self.eventHandler.send(.ready(nil))
} catch {
stateQueue.sync {
switch error {
case OpenFeatureError.providerFatalError(_):
self.providerStatus = .fatal
default:
self.providerStatus = .error
}
}
switch error {
case OpenFeatureError.providerFatalError(let message):
self.eventHandler.send(.error(ProviderEventDetails(message: message, errorCode: .providerFatal)))
default:
self.eventHandler.send(.error(ProviderEventDetails(message: error.localizedDescription)))
}
}
}
}

private func updateContext(evaluationContext: EvaluationContext) async {
do {
let oldContext = self.evaluationContext
self.evaluationContext = evaluationContext
self.providerStatus = .reconciling
await unifiedQueue.run(lastWins: true) { [self] in
// Get old context, set new context, and update status atomically
let (oldContext, provider) = stateQueue.sync { () -> (EvaluationContext?, FeatureProvider?) in
let oldContext = self.evaluationContext
self.evaluationContext = evaluationContext

// Only update status if provider is set
if let provider = self.providerSubject.value {
self.providerStatus = .reconciling
return (oldContext, provider)
}

return (oldContext, nil)
}

// Early return if no provider is set - nothing to reconcile
guard let provider = provider else {
return
}

eventHandler.send(.reconciling(nil))
try await self.providerSubject.value?.onContextSet(
oldContext: oldContext,
newContext: evaluationContext
)
self.providerStatus = .ready
eventHandler.send(.contextChanged(nil))
} catch {
self.providerStatus = .error
eventHandler.send(.error(ProviderEventDetails(message: error.localizedDescription)))

// Call provider's onContextSet - this entire operation is atomic
do {
try await provider.onContextSet(
oldContext: oldContext,
newContext: evaluationContext
)
stateQueue.sync {
self.providerStatus = .ready
}
eventHandler.send(.contextChanged(nil))
} catch {
stateQueue.sync {
self.providerStatus = .error
}
eventHandler.send(.error(ProviderEventDetails(message: error.localizedDescription)))
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion Sources/OpenFeature/OpenFeatureClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ extension OpenFeatureClient {
clientMetadata: self.metadata,
providerMetadata: provider.metadata)
hookLock.lock()
let mergedHooks = provider.hooks + options.hooks + hooks + openFeatureApi.hooks
let mergedHooks = provider.hooks + options.hooks + hooks + openFeatureApi.getHooks()
hookLock.unlock()
do {
hookSupport.beforeHooks(flagValueType: T.flagValueType, hookCtx: hookCtx, hooks: mergedHooks, hints: hints)
Expand Down
4 changes: 2 additions & 2 deletions Tests/OpenFeatureTests/FlagEvaluationTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ final class FlagEvaluationTests: XCTestCase {

OpenFeatureAPI.shared.addHooks(hooks: hook1)

XCTAssertEqual(OpenFeatureAPI.shared.hooks.count, 1)
XCTAssertEqual(OpenFeatureAPI.shared.getHooks().count, 1)

OpenFeatureAPI.shared.addHooks(hooks: hook2)
XCTAssertEqual(OpenFeatureAPI.shared.hooks.count, 2)
XCTAssertEqual(OpenFeatureAPI.shared.getHooks().count, 2)
}

func testNamedClient() {
Expand Down
Loading
Loading