Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.MutableSharedFlow
Expand All @@ -26,7 +24,6 @@ object OpenFeatureAPI {
private var setProviderJob: Job? = null
private var setEvaluationContextJob: Job? = null
private var observeProviderEventsJob: Job? = null
private var providerEventObservationScope: CoroutineScope? = null

private val NOOP_PROVIDER = NoOpProvider()
private var provider: FeatureProvider = NOOP_PROVIDER
Expand Down Expand Up @@ -242,8 +239,6 @@ object OpenFeatureAPI {
observeProviderEventsJob?.cancel(
CancellationException("Provider event observe job was cancelled due to shutdown")
)
providerEventObservationScope?.coroutineContext?.cancelChildren()
providerEventObservationScope?.coroutineContext?.cancel()
clearProvider()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ import dev.openfeature.kotlin.sdk.Value
import dev.openfeature.kotlin.sdk.events.OpenFeatureProviderEvents
import dev.openfeature.kotlin.sdk.events.toOpenFeatureStatusError
import dev.openfeature.kotlin.sdk.exceptions.OpenFeatureError
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
Expand All @@ -21,6 +25,7 @@ import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.launch

/**
* Type alias for a function that evaluates a feature flag using a FeatureProvider.
Expand Down Expand Up @@ -154,6 +159,8 @@ class MultiProvider(
}
}

private var observeProviderEventsJob: Job? = null

/**
* @return Number of unique providers
*/
Expand All @@ -169,20 +176,27 @@ class MultiProvider(
*/
override suspend fun initialize(initialContext: EvaluationContext?) {
coroutineScope {
// Listen to events emitted by providers to emit our own set of events
// according to https://openfeature.dev/specification/appendix-a/#status-and-event-handling
childFeatureProviders.forEach { provider ->
provider.observe()
.onEach { event ->
handleProviderEvent(provider, event)
}
.launchIn(this)
observeProviderEventsJob?.cancel(
cause = CancellationException("Observe provider events job cancelled due to new initialize call")
)
observeProviderEventsJob = CoroutineScope(this.coroutineContext + SupervisorJob()).launch {
// Listen to events emitted by providers to emit our own set of events
// according to https://openfeature.dev/specification/appendix-a/#status-and-event-handling
childFeatureProviders.forEach { provider ->
provider.observe()
.onEach { event ->
handleProviderEvent(provider, event)
}
.launchIn(this)
}
}

// State updates captured by observing individual Feature Flag providers
childFeatureProviders
.map { async { it.initialize(initialContext) } }
.awaitAll()
launch {
// State updates captured by observing individual Feature Flag providers
childFeatureProviders
.map { async { it.initialize(initialContext) } }
.awaitAll()
}
}
}

Expand Down Expand Up @@ -221,6 +235,10 @@ class MultiProvider(
* This allows providers to clean up resources and complete any pending operations.
*/
override fun shutdown() {
observeProviderEventsJob?.cancel(
cause = CancellationException("Observe provider events job cancelled due to shutdown")
)

val shutdownErrors = mutableListOf<Pair<String, Throwable>>()
childFeatureProviders.forEach { provider ->
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import kotlinx.coroutines.flow.first
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withTimeout
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
Expand Down Expand Up @@ -368,6 +369,17 @@ class MultiProviderTests {
assertTrue(suppressedMessages.any { it.contains("Provider 'bad1' shutdown failed: oops1") })
assertTrue(suppressedMessages.any { it.contains("Provider '<unnamed>' shutdown failed: oops2") })
}

@Test
fun initializeFunctionCompletesWhenObservingNeverCompletingFlows() = runTest {
val fakeEventProvider = FakeEventProvider(name = "ok")

// Should complete immediately
withTimeout(1000) {
val multi = MultiProvider(listOf(fakeEventProvider))
multi.initialize(null)
}
}
}

// Helpers
Expand Down