Extensions to the Kotlin Flow library.
dependencies {
implementation "com.github.akarnokd:kotlin-flow-extensions:0.0.2"
}Table of contents
- Hot Flows
- Sources
rangetimer
- Intermediate Flow operators (
FlowExtensions)Flow.concatWithFlow.groupByFlow.parallelFlow.publishFlow.replayFlow.startCollectOnFlow.takeUntil
ParallelFlowoperators (FlowExtensions)ParallelFlow.concatMapParallelFlow.filterParallelFlow.mapParallelFlow.reduceParallelFlow.sequentialParallelFlow.transform
ConnectableFlow
Multicasts values to one or more flow collectors in a coordinated fashion, awaiting each collector to be ready to receive the next item or termination.
import hu.akarnokd.kotlin.flow.*
runBlocking {
val publishSubject = PublishSubject<Int>()
val job = launch(Dispatchers.IO) {
publishSubject.collect {
println(it)
}
println("Done")
}
// wait for the collector to arrive
while (!publishSubject.hasCollectors()) {
delay(1)
}
publishSubject.emit(1)
publishSubject.complete()
job.join()
}Caches and replays some or all items to collectors. Constructors for size-bound, time-bound and both size-and-time bound
replays are available. An additional constructor with a TimeUnit -> Long has been defined to allow virtualizing
the progression of time for testing purposes
import hu.akarnokd.kotlin.flow.*
runBlocking {
val replaySubject = ReplaySubject<Int>()
val job = launch(Dispatchers.IO) {
replaySubject.collect {
println(it)
}
println("Done")
}
// wait for the collector to arrive
while (!replaySubject.hasCollectors()) {
delay(1)
}
replaySubject.emit(1)
replaySubject.emit(2)
replaySubject.emit(3)
replaySubject.complete()
job.join()
replaySubject.collect {
println(it)
}
println("Done 2")
}Caches the last item received and multicasts it and subsequent items (continuously) to collectors, awaiting each collector to be ready to receive the next item or termination. It is possible to set an initial value to be sent to fresh collectors via a constructor.
import hu.akarnokd.kotlin.flow.*
runBlocking {
val behaviorSubject = BehaviorSubject<Int>()
behaviorSubject.emit(1)
// OR
// val behaviorSubject = BehaviorSubject<Int>(1)
val job = launch(Dispatchers.IO) {
behaviorSubject.collect {
println(it)
}
println("Done")
}
// wait for the collector to arrive
while (!behaviorSubject.hasCollectors()) {
delay(1)
}
behaviorSubject.emit(2)
behaviorSubject.emit(3)
behaviorSubject.complete()
job.join()
}