diff --git a/kotlin-sdk/src/commonMain/kotlin/dev/openfeature/kotlin/sdk/OpenFeatureAPI.kt b/kotlin-sdk/src/commonMain/kotlin/dev/openfeature/kotlin/sdk/OpenFeatureAPI.kt index 36d7f58..6944f3c 100644 --- a/kotlin-sdk/src/commonMain/kotlin/dev/openfeature/kotlin/sdk/OpenFeatureAPI.kt +++ b/kotlin-sdk/src/commonMain/kotlin/dev/openfeature/kotlin/sdk/OpenFeatureAPI.kt @@ -10,8 +10,6 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.cancel -import kotlinx.coroutines.cancelChildren import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.FlowCollector import kotlinx.coroutines.flow.MutableSharedFlow @@ -26,7 +24,6 @@ object OpenFeatureAPI { private var setProviderJob: Job? = null private var setEvaluationContextJob: Job? = null private var observeProviderEventsJob: Job? = null - private var providerEventObservationScope: CoroutineScope? = null private val NOOP_PROVIDER = NoOpProvider() private var provider: FeatureProvider = NOOP_PROVIDER @@ -242,8 +239,6 @@ object OpenFeatureAPI { observeProviderEventsJob?.cancel( CancellationException("Provider event observe job was cancelled due to shutdown") ) - providerEventObservationScope?.coroutineContext?.cancelChildren() - providerEventObservationScope?.coroutineContext?.cancel() clearProvider() } diff --git a/kotlin-sdk/src/commonMain/kotlin/dev/openfeature/kotlin/sdk/multiprovider/MultiProvider.kt b/kotlin-sdk/src/commonMain/kotlin/dev/openfeature/kotlin/sdk/multiprovider/MultiProvider.kt index b80de7b..83a4c2e 100644 --- a/kotlin-sdk/src/commonMain/kotlin/dev/openfeature/kotlin/sdk/multiprovider/MultiProvider.kt +++ b/kotlin-sdk/src/commonMain/kotlin/dev/openfeature/kotlin/sdk/multiprovider/MultiProvider.kt @@ -10,6 +10,10 @@ import dev.openfeature.kotlin.sdk.Value import dev.openfeature.kotlin.sdk.events.OpenFeatureProviderEvents import dev.openfeature.kotlin.sdk.events.toOpenFeatureStatusError import dev.openfeature.kotlin.sdk.exceptions.OpenFeatureError +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll import kotlinx.coroutines.coroutineScope @@ -21,6 +25,7 @@ import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.update +import kotlinx.coroutines.launch /** * Type alias for a function that evaluates a feature flag using a FeatureProvider. @@ -154,6 +159,8 @@ class MultiProvider( } } + private var observeProviderEventsJob: Job? = null + /** * @return Number of unique providers */ @@ -169,20 +176,27 @@ class MultiProvider( */ override suspend fun initialize(initialContext: EvaluationContext?) { coroutineScope { - // Listen to events emitted by providers to emit our own set of events - // according to https://openfeature.dev/specification/appendix-a/#status-and-event-handling - childFeatureProviders.forEach { provider -> - provider.observe() - .onEach { event -> - handleProviderEvent(provider, event) - } - .launchIn(this) + observeProviderEventsJob?.cancel( + cause = CancellationException("Observe provider events job cancelled due to new initialize call") + ) + observeProviderEventsJob = CoroutineScope(this.coroutineContext + SupervisorJob()).launch { + // Listen to events emitted by providers to emit our own set of events + // according to https://openfeature.dev/specification/appendix-a/#status-and-event-handling + childFeatureProviders.forEach { provider -> + provider.observe() + .onEach { event -> + handleProviderEvent(provider, event) + } + .launchIn(this) + } } - // State updates captured by observing individual Feature Flag providers - childFeatureProviders - .map { async { it.initialize(initialContext) } } - .awaitAll() + launch { + // State updates captured by observing individual Feature Flag providers + childFeatureProviders + .map { async { it.initialize(initialContext) } } + .awaitAll() + } } } @@ -221,6 +235,10 @@ class MultiProvider( * This allows providers to clean up resources and complete any pending operations. */ override fun shutdown() { + observeProviderEventsJob?.cancel( + cause = CancellationException("Observe provider events job cancelled due to shutdown") + ) + val shutdownErrors = mutableListOf>() childFeatureProviders.forEach { provider -> try { diff --git a/kotlin-sdk/src/commonTest/kotlin/dev/openfeature/kotlin/sdk/multiprovider/MultiProviderTests.kt b/kotlin-sdk/src/commonTest/kotlin/dev/openfeature/kotlin/sdk/multiprovider/MultiProviderTests.kt index e598f8b..2312848 100644 --- a/kotlin-sdk/src/commonTest/kotlin/dev/openfeature/kotlin/sdk/multiprovider/MultiProviderTests.kt +++ b/kotlin-sdk/src/commonTest/kotlin/dev/openfeature/kotlin/sdk/multiprovider/MultiProviderTests.kt @@ -18,6 +18,7 @@ import kotlinx.coroutines.flow.first import kotlinx.coroutines.launch import kotlinx.coroutines.test.advanceUntilIdle import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.withTimeout import kotlin.test.Test import kotlin.test.assertEquals import kotlin.test.assertFailsWith @@ -368,6 +369,17 @@ class MultiProviderTests { assertTrue(suppressedMessages.any { it.contains("Provider 'bad1' shutdown failed: oops1") }) assertTrue(suppressedMessages.any { it.contains("Provider '' shutdown failed: oops2") }) } + + @Test + fun initializeFunctionCompletesWhenObservingNeverCompletingFlows() = runTest { + val fakeEventProvider = FakeEventProvider(name = "ok") + + // Should complete immediately + withTimeout(1000) { + val multi = MultiProvider(listOf(fakeEventProvider)) + multi.initialize(null) + } + } } // Helpers