Skip to content

Waiting plugin #274

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ class StorageTests {
fun `system reset action removes system`() = runTest {
val action = object : Action<System> {
override fun reduce(state: System): System {
return System(state.configuration, null, state.running, state.initializedPlugins, state.enabled)
return System(state.configuration, null, state.running, state.initializedPlugins, state.waitingPlugins, state.enabled)
}
}
store.dispatch(action, System::class)
Expand Down
34 changes: 15 additions & 19 deletions core/src/main/java/com/segment/analytics/kotlin/core/Settings.kt
Original file line number Diff line number Diff line change
Expand Up @@ -84,31 +84,27 @@ suspend fun Analytics.checkSettings() {
val writeKey = configuration.writeKey
val cdnHost = configuration.cdnHost

store.currentState(System::class) ?: return
store.dispatch(System.ToggleRunningAction(running = false), System::class)
pauseEventProcessing()

withContext(networkIODispatcher) {
val settingsObj = withContext(networkIODispatcher) {
log("Fetching settings on ${Thread.currentThread().name}")
val settingsObj: Settings? = fetchSettings(writeKey, cdnHost)

withContext(analyticsDispatcher) {

settingsObj?.let {
log("Dispatching update settings on ${Thread.currentThread().name}")
store.dispatch(System.UpdateSettingsAction(settingsObj), System::class)
}
return@withContext fetchSettings(writeKey, cdnHost)
}

store.currentState(System::class)?.let { system ->
system.settings?.let { settings ->
log("Propagating settings on ${Thread.currentThread().name}")
update(settings)
}
}
settingsObj?.let {
log("Dispatching update settings on ${Thread.currentThread().name}")
store.dispatch(System.UpdateSettingsAction(settingsObj), System::class)
}

// we're good to go back to a running state.
store.dispatch(System.ToggleRunningAction(running = true), System::class)
store.currentState(System::class)?.let { system ->
system.settings?.let { settings ->
log("Propagating settings on ${Thread.currentThread().name}")
update(settings)
}
}

// we're good to go back to a running state.
resumeEventProcessing()
}

internal fun Analytics.fetchSettings(
Expand Down
58 changes: 55 additions & 3 deletions core/src/main/java/com/segment/analytics/kotlin/core/State.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ import java.util.*
data class System(
var configuration: Configuration = Configuration(""),
var settings: Settings?,
var running: Boolean,
var initializedPlugins: Set<Int>,
var enabled: Boolean
var running: Boolean = false,
var initializedPlugins: Set<Int> = emptySet(),
var waitingPlugins: Set<Int> = emptySet(),
var enabled: Boolean = true
) : State {

companion object {
Expand Down Expand Up @@ -62,6 +63,7 @@ data class System(
settings = settings,
running = false,
initializedPlugins = setOf(),
waitingPlugins = setOf(),
enabled = true
)
}
Expand All @@ -74,18 +76,37 @@ data class System(
settings,
state.running,
state.initializedPlugins,
state.waitingPlugins,
state.enabled
)
}
}

class ToggleRunningAction(var running: Boolean) : Action<System> {
override fun reduce(state: System): System {
if (running && state.waitingPlugins.isNotEmpty()) {
running = false
}

return System(
state.configuration,
state.settings,
running,
state.initializedPlugins,
state.waitingPlugins,
state.enabled
)
}
}

class ForceRunningAction : Action<System> {
override fun reduce(state: System): System {
return System(
state.configuration,
state.settings,
true,
state.initializedPlugins,
state.waitingPlugins,
state.enabled
)
}
Expand All @@ -105,6 +126,7 @@ data class System(
newSettings,
state.running,
state.initializedPlugins,
state.waitingPlugins,
state.enabled
)
}
Expand All @@ -120,6 +142,7 @@ data class System(
state.settings,
state.running,
initializedPlugins,
state.waitingPlugins,
state.enabled
)
}
Expand All @@ -132,10 +155,39 @@ data class System(
state.settings,
state.running,
state.initializedPlugins,
state.waitingPlugins,
enabled
)
}
}

class AddWaitingPlugin(val plugin: Int): Action<System> {
override fun reduce(state: System): System {
val waitingPlugins = state.waitingPlugins + plugin
return System(
state.configuration,
state.settings,
state.running,
state.initializedPlugins,
waitingPlugins,
state.enabled
)
}
}

class RemoveWaitingPlugin(val plugin: Int): Action<System> {
override fun reduce(state: System): System {
val waitingPlugins = state.waitingPlugins - plugin
return System(
state.configuration,
state.settings,
state.running,
state.initializedPlugins,
waitingPlugins,
state.enabled
)
}
}
}

/**
Expand Down
62 changes: 62 additions & 0 deletions core/src/main/java/com/segment/analytics/kotlin/core/Waiting.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.segment.analytics.kotlin.core

import com.segment.analytics.kotlin.core.platform.Plugin
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

/**
* An interface that provides functionality of pausing and resuming event processing on Analytics.
*
* By default plugins that implement this interface pauses processing when it is added to
* analytics (via `setup()`) and resumes after 30s.
*
* To customize pausing and resuming, override `setup()` and call `pause()/resumes()` as needed
*/
interface WaitingPlugin: Plugin {
override fun setup(analytics: Analytics) {
super.setup(analytics)
pause()
}

fun pause() {
analytics.pauseEventProcessing(this)
}

fun resume() {
analytics.resumeEventProcessing(this)
}
}

fun Analytics.pauseEventProcessing(plugin: WaitingPlugin) = analyticsScope.launch(analyticsDispatcher) {
store.dispatch(System.AddWaitingPlugin(plugin.hashCode()), System::class)
pauseEventProcessing()
}


fun Analytics.resumeEventProcessing(plugin: WaitingPlugin) = analyticsScope.launch(analyticsDispatcher) {
store.dispatch(System.RemoveWaitingPlugin(plugin.hashCode()), System::class)
resumeEventProcessing()
}

internal suspend fun Analytics.running(): Boolean {
val system = store.currentState(System::class)
return system?.running ?: false
}

internal suspend fun Analytics.pauseEventProcessing(timeout: Long = 30_000) {
if (!running()) return

store.dispatch(System.ToggleRunningAction(false), System::class)
startProcessingAfterTimeout(timeout)
}

internal suspend fun Analytics.resumeEventProcessing() {
if (running()) return
store.dispatch(System.ToggleRunningAction(true), System::class)
}

internal fun Analytics.startProcessingAfterTimeout(timeout: Long) = analyticsScope.launch(analyticsDispatcher) {
delay(timeout)
store.dispatch(System.ForceRunningAction(), System::class)
}

Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ abstract class DestinationPlugin : EventPlugin {

final override fun execute(event: BaseEvent): BaseEvent? = process(event)

internal fun isDestinationEnabled(event: BaseEvent?): Boolean {
open fun isDestinationEnabled(event: BaseEvent?): Boolean {
// if event payload has integration marked false then its disabled by customer
val customerEnabled = event?.integrations?.getBoolean(key) ?: true // default to true when missing

Expand Down
Loading