In [1]:
%use coroutines

In [2]:
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

runBlocking {
    val channel = Channel<Int>(1)
    val outputChannel = Channel<Int>(1)

    val inputJob = launch {
        println("starting producer")
//        delay(200)
        (1..10).forEach {
            println("sending $it")
            channel.send(it)
            println("sent $it")
        }
    }

    val processor =launch {
        println("launching")
        for(it in channel) {
//            delay(100)
            outputChannel.send(it)
        }
    }

    launch {
        println("final")
        for(it in outputChannel) {
            delay(100)
            println("processed $it")
            if(it == 2) {
                println("Cancelling")
                inputJob.cancel()
                processor.cancel()
                cancel()
                return@launch
            }
        }
    }


}
println("done blocking")

starting producer
sending 1
sent 1
sending 2
launching
final
sent 2
sending 3
sent 3
sending 4
sent 4
sending 5
processed 1
sent 5
sending 6
processed 2
Cancelling
done blocking


# Use a pool of workers to handle events, but publish them in order

When we have an event we publish it along with a Deferred to two channels:
- we hash the key and send it to the worker that corresponds to the hash, it will complete the Deferred when it is done
- we also publish the Deferred to a separate channel that is a proxy for the Kafka publisher, it will await the completion of event processing 

This means we maintain the original order, but allow the work to be fanned out to multiple workers in a consistent way.

In [4]:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlin.random.Random

data class Event(val key: String, val value: String)
data class ProcessEventResult(val event: Event, val processedValue: String)

fun CoroutineScope.launchEventWorker(workerId: Int, channel: Channel<Pair<Event, CompletableDeferred<ProcessEventResult>>>) =
    launch {
        for ((event, completion) in channel) {
            // random delay to simulate work
            delay(Random.nextLong(2_000L))
            val result = ProcessEventResult(event, "Processed by $workerId: ${event.value}")
            completion.complete(result)
            println("marked as completed: $result")
        }
    }

fun CoroutineScope.launchKafkaPublisher(channel: Channel<CompletableDeferred<ProcessEventResult>>) =
    launch {
        for (completion in channel) {
            val result = completion.await()
            println("Finished: $result")
        }
    }

fun main() = runBlocking {
    val numberOfWorkers = 4 // Number of workers
    val workersChannels = List(numberOfWorkers) { Channel<Pair<Event, CompletableDeferred<ProcessEventResult>>>(2) }
    val kafkaSendChannel = Channel<CompletableDeferred<ProcessEventResult>>(100)

    workersChannels.forEachIndexed { index, channel ->
        launchEventWorker(index, channel)
    }

    launchKafkaPublisher(kafkaSendChannel)
    
    // emulate kafka consumer which is getting via polling and will send values to the appropriate worker
    repeat(10) { i ->
        // dummy event with a key/value that shows what offset we're at
        val eventData = Event("key$i", "value$i")
        
        val deferred = CompletableDeferred<ProcessEventResult>()
        
        // spread the work across the pool of workers 
        val workerIndex = i % numberOfWorkers
        
        // send the event and the deferred to the worker for it to complete
        workersChannels[workerIndex].send(eventData to deferred)
        
        // also send the deferred to the kafka publisher, we're maintaining the original order of events
        // it will wait for the deferred to be completed by a worker
        kafkaSendChannel.send(deferred)
    }
    
    delay(10_000)
    println("Done")
    
    // Cleanup: close channels, etc.
    workersChannels.forEach { it.close() }
    kafkaSendChannel.close()
}


println("Notice that the Finished messages are in the same order as the orginal events\n")
main()

Notice that the Finished messages are in the same order as the orginal events
marked as completed: ProcessEventResult(event=Event(key=key0, value=value0), processedValue=Processed by 0: value0)
Finished: ProcessEventResult(event=Event(key=key0, value=value0), processedValue=Processed by 0: value0)
marked as completed: ProcessEventResult(event=Event(key=key4, value=value4), processedValue=Processed by 0: value4)
marked as completed: ProcessEventResult(event=Event(key=key3, value=value3), processedValue=Processed by 3: value3)
marked as completed: ProcessEventResult(event=Event(key=key1, value=value1), processedValue=Processed by 1: value1)
Finished: ProcessEventResult(event=Event(key=key1, value=value1), processedValue=Processed by 1: value1)
marked as completed: ProcessEventResult(event=Event(key=key2, value=value2), processedValue=Processed by 2: value2)
Finished: ProcessEventResult(event=Event(key=key2, value=value2), processedValue=Processed by 2: value2)
Finished: ProcessEventResul

true

# Refactored version that decouples the event source (kafka consumer) from what happens to those events

### This pattern can be extended for acknowledging the kafka publish

Could also use a ticker rendesvous channel to do the committing of offsets back to the kafka brokers

In [33]:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.ticker
import kotlinx.coroutines.future.asCompletableFuture
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import kotlin.random.Random

data class UpstreamEvent(val partition: Int, val offset: Int, val key: String, val value: String) {
    override fun toString(): String {
        return "$partition:$offset $key -> $value"
    }
}

data class DownstreamEvent(val key: String, val value: String) {
    override fun toString(): String {
        return "$key -> $value"
    }
}

sealed class TransformResult {
    data class Forward(val downstreamEvent: DownstreamEvent) : TransformResult()

    // allow for many events to be published downstream for a single upstream event
    data class ForwardMany(val downstreamEvents: List<DownstreamEvent>) : TransformResult()
    data class Drop(val reason: String) : TransformResult()
    data class Error(val error: Throwable) : TransformResult()
}

// mock RecordMetadata
data class RecordMetadata(val topic: String, val partition: Int, val offset: Long)

data class EventTransformDeferredResult(
    val upstreamEvent: UpstreamEvent,
    val deferred: CompletableDeferred<TransformResult>
)


// kafka publishing uses completable futures, not deferreds, this will be completed for
// publish many when the last event is published
data class TransformedEventPublishFuture(
    val upstreamEvent: UpstreamEvent,
    val publishFuture: CompletableFuture<RecordMetadata>
)

// does the per-event work and marks the deferred as completed when work is done
fun CoroutineScope.launchEventWorker(
    eventChannel: ReceiveChannel<EventTransformDeferredResult>,
    transformEvent: suspend (UpstreamEvent) -> TransformResult
) = launch(Dispatchers.Default) {// using Dispatchers.Default as this is constrained by CPU work and is limited to the number of CPUs, consider Dispatchers.IO if we expect users to block
    for ((upstreamEvent, completion) in eventChannel) {
        val result = transformEvent(upstreamEvent)
        completion.complete(result)
    }
}

// this function needs to react differently based on the type of result, we still want to send it downstream regardless
// of the type of results so that we can commit progress as we're done with processing that event (or we blow up if it is an error)
fun CoroutineScope.launchKafkaPublisher(
    channel: ReceiveChannel<EventTransformDeferredResult>,
    kafkaAcknowledgementChannel: SendChannel<TransformedEventPublishFuture>
) =
    launch {
        var publishedOffset: Long = 1
        for (eventTransformeDeferredResult in channel) {
            val result = eventTransformeDeferredResult.deferred.await()

            when (result) {
                is TransformResult.Forward -> {
                    println("${Thread.currentThread().name} Publishing: ${eventTransformeDeferredResult.upstreamEvent} -> ${result.downstreamEvent}")
                    kafkaAcknowledgementChannel.send(
                        TransformedEventPublishFuture(
                            eventTransformeDeferredResult.upstreamEvent,
                            CompletableFuture.completedFuture(RecordMetadata("topic", 0, publishedOffset++))
                        )
                    )
                }

                is TransformResult.ForwardMany -> {
                    // TODO would want to commit only after on the last event
                    // we could wrap all of the individual acks in a single future and send that downstream
                    result.downstreamEvents.forEach {
                        println("Publishing: ${eventTransformeDeferredResult.upstreamEvent} -> $it")
                        kafkaAcknowledgementChannel.send(
                            TransformedEventPublishFuture(
                                eventTransformeDeferredResult.upstreamEvent,
                                CompletableFuture.completedFuture(RecordMetadata("topic", 0, publishedOffset++))
                            )
                        )
                    }
                }

                is TransformResult.Drop -> {
                    println("${Thread.currentThread().name} Publishing: ${eventTransformeDeferredResult.upstreamEvent} -> Dropped: ${result.reason}")

                    // still need to send downstream to the kafka acknowledger for commit purposes, sending already completed
                    kafkaAcknowledgementChannel.send(
                        TransformedEventPublishFuture(
                            eventTransformeDeferredResult.upstreamEvent,
                            // TODO don't actually send RecordMetadata for dropped events, optional or null?
                            CompletableFuture.completedFuture(RecordMetadata("topic", 0, -1))
                        )
                    )
                }

                is TransformResult.Error -> {
                    // TODO blow up?  Still send downstream?
                    println("${Thread.currentThread().name} Publishing: ${eventTransformeDeferredResult.upstreamEvent} -> Error: ${result.error}")
                }
            }
        }
    }

fun CoroutineScope.launchKafkaAcknowledger(
    channel: ReceiveChannel<TransformedEventPublishFuture>,
    partitionToOffset: ConcurrentHashMap<Int, Long>
) =
    launch(Dispatchers.IO) { // using Dispatchers.IO as this will block on the kafka future
        for (transformedEventPublishFuture in channel) {
            val recordMetadata = transformedEventPublishFuture.publishFuture.join()
            println("${Thread.currentThread().name} Acked: ${transformedEventPublishFuture.upstreamEvent} -> $recordMetadata")
            if(recordMetadata.offset >= 0) {
                partitionToOffset[recordMetadata.partition] = recordMetadata.offset
            }
        }
    }

fun CoroutineScope.launchKafkaCommitter(
    tickerChannel: ReceiveChannel<Unit>,
    partitionToOffset: ConcurrentHashMap<Int, Long> // TODO should be a TopicPartition to Long
) = launch {
    for (unit in tickerChannel) {
        partitionToOffset.replaceAll { partition, offset ->
            // if we have a positive value, we've made progress on this partition since the last tick
            if (offset >= 0) {
                println("${Thread.currentThread().name} Tick - Committing $partition -> $offset")
            } else {
                println("${Thread.currentThread().name} Tick - nothing new to commit!")
            }
            -1L// replace with a negative value so we don't commit on this partition again till we've made real progress
        }
    }
}

// 1. eventSource submits events that are sent to the appropriate worker, and sends an uncompleted deferred to the kafka publisher
// 2. the worker calls the supplied `transformEvent` and completes the deferred when done with the work
// 3. the kafka publisher awaits the completion of the transformed event and maintains the original order of events
//       it then does an async publish of the event and sends the future to the kafka acknowledger
// 4. the kafka acknowledger awaits the completion of the publish and then updates the partitionToOffset map
suspend fun CoroutineScope.withActors(
    numberOfWorkers: Int,
    transformEvent: suspend (UpstreamEvent) -> TransformResult,
    eventSource: suspend (suspend (UpstreamEvent) -> Unit) -> Unit
) {
    val partitionToOffset = ConcurrentHashMap<Int, Long>()

    // in real code these should have much larger buffer sizes, the ack one especially needs to be large enough so it
    // aren't blocking the publishing of events
    val workersChannels =
        List(numberOfWorkers) { Channel<EventTransformDeferredResult>(2) }
    val readyToPublishChannel = Channel<EventTransformDeferredResult>(10)
    val kafkaAcknowledgementChannel = Channel<TransformedEventPublishFuture>(10)
    val tickerChannel = ticker(delayMillis = 2000)

    workersChannels.forEach { channel ->
        launchEventWorker(channel, transformEvent)
    }

    launchKafkaPublisher(readyToPublishChannel, kafkaAcknowledgementChannel)

    launchKafkaAcknowledger(kafkaAcknowledgementChannel, partitionToOffset)

    launchKafkaCommitter(tickerChannel, partitionToOffset)

    suspend fun submitEvent(event: UpstreamEvent) {
        val deferred = CompletableDeferred<TransformResult>()
        val eventTransformDeferredResult = EventTransformDeferredResult(event, deferred)
        val workerIndex = event.key.hashCode() % numberOfWorkers
        workersChannels[workerIndex].send(eventTransformDeferredResult)
        readyToPublishChannel.send(eventTransformDeferredResult)
    }

    eventSource(::submitEvent)

    delay(10_000)
    println("Done")

    // Cleanup: close channels, etc.
    workersChannels.forEach { it.close() }
    readyToPublishChannel.close()
    kafkaAcknowledgementChannel.close()
    tickerChannel.cancel()
}

suspend fun transformEvent(event: UpstreamEvent): TransformResult {
    // simulate work, random delay up to 2 seconds
    println("${Thread.currentThread().name} Transforming event: $event")
    delay(Random.nextLong(2_000L))
    if (event.offset % 3 == 0) {
        return TransformResult.Drop("Dropping every 3rd event")
    }
    return TransformResult.Forward(DownstreamEvent(event.key, "Processed by worker: ${event.value}"))
}

// event from poll -> submitEvent -> worker[<for hashed key>] -> kafkaPublisher -> kafkaAcknowledger && kafkaCommitter
// "Polled" -> "Transforming" -> "Publishing" -> "Acked" &&  asynchronous "Tick" will commit updated progress every 2 seconds

In [34]:
// this is the "main" loop that would use a real kafka consumer to poll for events and submit them to the actors
runBlocking {
    Thread.currentThread().name = "main"
    val numberOfWorkers = 4

    // emulate kafka consumer which is getting via polling and will send values to the appropriate worker
    withActors(numberOfWorkers, ::transformEvent) { submitEvent ->
        val partition = 0

        println("${Thread.currentThread().name} polling for events to submit")
        // emulate kafka consumer which is getting via polling and will send values to the appropriate worker
        (1..10).forEach { i ->
            val eventData = UpstreamEvent(partition, i, "key$i", "value$i")
            println("${Thread.currentThread().name} Polled upstream event: $eventData")
            submitEvent(eventData)
        }
    }
}

main polling for events to submit
main Polled upstream event: 0:1 key1 -> value1
main Polled upstream event: 0:2 key2 -> value2
main Polled upstream event: 0:3 key3 -> value3
main Polled upstream event: 0:4 key4 -> value4
main Polled upstream event: 0:5 key5 -> value5
main Polled upstream event: 0:6 key6 -> value6
DefaultDispatcher-worker-4 Transforming event: 0:1 key1 -> value1
DefaultDispatcher-worker-7 Transforming event: 0:4 key4 -> value4
DefaultDispatcher-worker-8 Transforming event: 0:2 key2 -> value2
DefaultDispatcher-worker-6 Transforming event: 0:3 key3 -> value3
main Polled upstream event: 0:7 key7 -> value7
main Polled upstream event: 0:8 key8 -> value8
main Polled upstream event: 0:9 key9 -> value9
main Polled upstream event: 0:10 key10 -> value10
DefaultDispatcher-worker-6 Transforming event: 0:5 key5 -> value5
main Publishing: 0:1 key1 -> value1 -> key1 -> Processed by worker: value1
DefaultDispatcher-worker-6 Acked: 0:1 key1 -> value1 -> RecordMetadata(topic=topic, part