Skip to content

Commit

Permalink
feat: add manual flush capabilities (#152)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicklasl committed May 27, 2024
1 parent d4efefc commit 157eeb9
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 7 deletions.
4 changes: 4 additions & 0 deletions Confidence/src/main/java/com/spotify/confidence/Confidence.kt
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ class Confidence internal constructor(
eventSenderEngine.emit(eventName, message, getContext())
}

override fun flush() {
eventSenderEngine.flush()
}

private val networkExceptionHandler by lazy {
CoroutineExceptionHandler { _, _ ->
// network failed, provider is ready but with default/cache values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ interface EventSender : Contextual {

fun stop()

fun flush()

override fun withContext(context: Map<String, ConfidenceValue>): EventSender
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ import java.io.File
internal interface EventSenderEngine {
fun onLowMemoryChannel(): Channel<List<File>>
fun emit(eventName: String, message: ConfidenceFieldsType, context: Map<String, ConfidenceValue>)

fun flush()
fun stop()
}

internal class EventSenderEngineImpl(
private val eventStorage: EventStorage,
private val clientSecret: String,
private val uploader: EventSenderUploader,
private val flushPolicies: List<FlushPolicy> = listOf(),
private val flushPolicies: MutableList<FlushPolicy> = mutableListOf(),
private val clock: Clock = Clock.CalendarBacked.systemUTC(),
private val dispatcher: CoroutineDispatcher = Dispatchers.IO,
private val sdkMetadata: SdkMetadata
Expand All @@ -44,9 +44,13 @@ internal class EventSenderEngineImpl(
}

init {
flushPolicies.add(ManualFlushPolicy)
coroutineScope.launch(exceptionHandler) {
for (event in writeReqChannel) {
eventStorage.writeEvent(event)
if (event.eventDefinition != manualFlushEvent.eventDefinition) {
// skip storing manual flush event
eventStorage.writeEvent(event)
}
for (policy in flushPolicies) {
policy.hit(event)
}
Expand Down Expand Up @@ -109,6 +113,12 @@ internal class EventSenderEngineImpl(
}
}

override fun flush() {
coroutineScope.launch {
writeReqChannel.send(manualFlushEvent)
}
}

override fun stop() {
coroutineScope.cancel()
eventStorage.stop()
Expand All @@ -129,7 +139,7 @@ internal class EventSenderEngineImpl(
EventStorageImpl(context),
clientSecret,
uploader = EventSenderUploaderImpl(OkHttpClient(), dispatcher),
flushPolicies = flushPolicies,
flushPolicies = flushPolicies.toMutableList(),
dispatcher = dispatcher,
sdkMetadata = sdkMetadata
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.spotify.confidence

import java.util.Date

internal val manualFlushEvent = EngineEvent("confidence_manual_flush", Date(), mapOf())

internal object ManualFlushPolicy : FlushPolicy {
private var flushRequested = false
override fun reset() {
flushRequested = false
}

override fun hit(event: EngineEvent) {
flushRequested = event.eventDefinition == manualFlushEvent.eventDefinition
}

override fun shouldFlush(): Boolean = flushRequested
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class EventSenderIntegrationTest {
val engine = EventSenderEngineImpl(
eventStorage,
clientSecret,
flushPolicies = listOf(flushPolicy),
flushPolicies = mutableListOf(flushPolicy),
dispatcher = testDispatcher,
sdkMetadata = SdkMetadata("kotlin_test", ""),
uploader = uploader
Expand Down Expand Up @@ -175,7 +175,7 @@ class EventSenderIntegrationTest {
val engine = EventSenderEngineImpl(
eventStorage,
clientSecret,
flushPolicies = listOf(flushPolicy),
flushPolicies = mutableListOf(flushPolicy),
dispatcher = testDispatcher,
sdkMetadata = SdkMetadata("kotlin_test", ""),
uploader = uploader
Expand Down Expand Up @@ -232,7 +232,7 @@ class EventSenderIntegrationTest {
val engine = EventSenderEngineImpl(
eventStorage,
clientSecret,
flushPolicies = listOf(flushPolicy),
flushPolicies = mutableListOf(flushPolicy),
dispatcher = testDispatcher,
sdkMetadata = SdkMetadata("kotlin_test", ""),
uploader = uploader
Expand Down Expand Up @@ -268,4 +268,49 @@ class EventSenderIntegrationTest {
Assert.assertEquals(eventStorage.eventsFor(currentFile).size, 2)
}
}

@Test
fun running_flush_will_batch_and_upload() = runTest {
val eventStorage = EventStorageImpl(mockContext)
val testDispatcher = UnconfinedTestDispatcher(testScheduler)
val batchSize = 4
val flushPolicy = object : FlushPolicy {
private var size = 0
override fun reset() {
size = 0
}

override fun hit(event: EngineEvent) {
size++
}

override fun shouldFlush(): Boolean {
return size >= batchSize
}
}
val uploader = object : EventSenderUploader {
val requests: MutableList<EventBatchRequest> = mutableListOf()

override suspend fun upload(events: EventBatchRequest): Boolean {
requests.add(events)
return true
}
}
val engine = EventSenderEngineImpl(
eventStorage,
clientSecret,
flushPolicies = mutableListOf(flushPolicy),
dispatcher = testDispatcher,
sdkMetadata = SdkMetadata("kotlin_test", ""),
uploader = uploader
)

engine.emit("my_event", mapOf("a" to ConfidenceValue.Integer(0)), mapOf("a" to ConfidenceValue.Integer(1)))
engine.emit("my_event", mapOf("a" to ConfidenceValue.Integer(0)), mapOf("a" to ConfidenceValue.Integer(1)))
Assert.assertEquals(uploader.requests.size, 0)
engine.flush()
advanceUntilIdle()
Assert.assertEquals(1, uploader.requests.size)
Assert.assertEquals(2, uploader.requests[0].events.size)
}
}

0 comments on commit 157eeb9

Please sign in to comment.