# Introducing Kotlin **Flows**

- Kotlin **Flows** is introduced.
- **Transforming**, **Mapping** and **Filtering** are discussed along with other operators.
- **Collecting**, **Launching** and **Cancelling** are explained.
- **Buffering** and **Flattenng** are covered.


In [1]:
// Importing some neccessary libs in jupyter notebook
@file:DependsOn("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.5.2")
import kotlinx.coroutines.*
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.flow.*
import java.util.concurrent.Executors
import kotlin.system.measureTimeMillis

## Creating and Using Asynchronous **Flows**

- Kotlin `Flow<T>` is a part of Kotlin **Coroutines** and are designed to handle asynchronous streams of data in a **reactive way**.
- **Cold Stream**: A Flow is a **cold stream**, meaning that it doesn’t produce values until it is collected somewhere. 
- **Flow Emission**: The `emit()` function inside the `flow<T> {}` block allows you to emit values one by one.
- **Collecting**: The `collect()` function is used to collect the emitted values from the Flow. Collection is a **suspending function**, so *it needs to be called from a coroutine or another suspending function*.

In [2]:
// Creating a simple 'Flow' and collecting emitted values

// 'flow {}' is a 'Flow' builder and in '{...}' we can 'emit' data
fun simpleFlow(): Flow<Int> = flow<Int> { // This function returns a flow.
    for (i in 1 until 6) {
        emit(i)  // Emits values sequentially; it is also a suspend function.
    }
}

fun main() = runBlocking {
    println("Program started.")

    // Creating a flow
    val myFlow = simpleFlow(); // This 'emit's data only when it is collected.

    println("Collecting:")

    /*
    IMPORTANT:
    If we try to collect a single instance of a flow
    from multiple collectors AT THE SAME TIME (like by launching 'collect()'),
    it will result in an 'IllegalStateException'.
    */

    // IMPORTANT: Collect is a non-blocking suspend function.
    // Collecting the flow; 'collect' is a suspend function and must be called in a coroutine scope.
    myFlow.collect { value -> 
        println(value)
    }
    
    println("Collecting again:")
    
    // Every time 'collect' is called, the flow start emitting.
    myFlow.collect { value -> 
        println(value)
    }
    println("Program finished.")
}
main()

Program started.
Collecting:
1
2
3
4
5
Collecting again:
1
2
3
4
5
Program finished.


In [3]:
// Using corotuines and suspending functions in flow builder

fun simpleFlow(): Flow<Int> = flow {
    /*
    In 'flow {}' builder, corotuines and suspend functions can be used.
    IMPORTANT: the context of the coroutines in this flow builder is assigned and inherited from 
    where it is collected. 'flow {}' itself is neigher suspend function nor coroutine scope; it is just
    a 'Flow' builder.
    */
    for (i in 1 until 6) {
        emit(i)  // Emit values sequentially
        delay(500) // A suspending function which is attached to the collector context.
        
        coroutineScope { // Attached to the collector context.
            launch {
                println("emission {$i} done.")
            }
        }
        
    }
}

fun main() = runBlocking {
    println("Program started.")

    // Creating a flow
    val myFlow = simpleFlow(); // This 'emit's data only when it is collected.
    
    println("Collecting:")

    // Collect the flow; 'collect' is a suspend function and must be called in a coroutine scope.
    myFlow.collect { value -> 
        println(value)
    }
    
    println("Program finished.")
}
main()

Program started.
Collecting:
1
emission {1} done.
2
emission {2} done.
3
emission {3} done.
4
emission {4} done.
5
emission {5} done.
Program finished.


In [4]:
// Creating a flow from a collection using 'asFlow'

fun main() = runBlocking {
    val myList = listOf(1, 2, 3, 4, 5)  // A collection (list)
    
    // Convert the list to a Flow using asFlow()
    val myFlow = myList.asFlow()
    
    myFlow.collect { value -> 
        println("Value: $value")  // Collect and print each emitted value
    }

    // or
    (1 until 6).asFlow().collect { value -> 
        println("Value: $value")  // Collect and print each emitted value
    }
    
}

main()

Value: 1
Value: 2
Value: 3
Value: 4
Value: 5
Value: 1
Value: 2
Value: 3
Value: 4
Value: 5


## Using `transform`, `map`, `filter` and `take` operators on flows

In [5]:
// Mapping flow data

fun simpleFlow(): Flow<Int> = flow {
    for (i in 1 until 6) {
        emit(i)  
        delay(500) 
    }
}

fun main() = runBlocking {
    println("Program started.")

    // Creating a flow
    val myFlow = simpleFlow(); 
    
    println("Collecting:")

    myFlow.map { value -> // Applies a map on the flow's data
        value*2 // Returns this value to downstream.
    }.collect { value -> 
        // Mapped data are received here.
        println(value)
    }
    
    println("Program finished.")
}
main()

Program started.
Collecting:
2
4
6
8
10
Program finished.


In [6]:
// Filtering flow data

fun simpleFlow(): Flow<Int> = flow {
    for (i in 1 until 11) {
        emit(i) 
        delay(500) 
    }
}

fun main() = runBlocking {
    println("Program started.")

    // Creating a flow
    val myFlow = simpleFlow(); 
    
    println("Collecting:")

    
    myFlow.filter { value -> // Applies a filter on the flow's data
        value%2 == 0  // Delivers it to downstream if the value is even otherwise ignore it.
    }.collect { value -> 
        // Filtered data are received here.
        println(value)
    }
    
    println("Program finished.")
}
main()

Program started.
Collecting:
2
4
6
8
10
Program finished.


In [7]:
// Transforming flow data

/*
The transform operator allows you to perform more complex operations on the emitted values, 
and it gives you the flexibility to 'emit' multiple values, modify the original values, 
or emit different types of values. 
*/
fun simpleFlow(): Flow<Int> = flow {
    for (i in 1 until 6) {
        emit(i) 
        delay(500) 
    }
}

fun main() = runBlocking {
    println("Program started.")

    // Creating a flow
    val myFlow = simpleFlow(); // This 'emit's data only when it is collected.
    
    println("Collecting:")
    
    myFlow.transform { value -> 
        emit(value)  // Returns this value 
        emit("The second emitted value is: $value")  // Returns this value too
    }.collect { value -> 
        // Transformed data are received here.
        println(value)
    }
    
    println("Program finished.")
}
main()

Program started.
Collecting:
1
The second emitted value is: 1
2
The second emitted value is: 2
3
The second emitted value is: 3
4
The second emitted value is: 4
5
The second emitted value is: 5
Program finished.


In [8]:
// Limiting the number of data from the flow

fun simpleFlow(): Flow<Int> = flow {
    for (i in 1 until 11) {
        emit(i)  
        delay(500)
    }
}

fun main() = runBlocking {
    println("Program started.")

    // Creating a flow
    val myFlow = simpleFlow(); 
    
    println("Collecting:")

    
    myFlow.take(3).collect { value -> 
        println(value)
    } // Finishes collection after collecting 3 items.
    
    println("Program finished.")
}
main()

Program started.
Collecting:
1
2
3
Program finished.


## Using `onStart`, `onEach` and `onCompletion`

In [9]:
// 'onEach' is a way to apply a side-effect on emitted values.

fun simpleFlow(): Flow<Int> = flow {
    for (i in 1 until 6) {
        emit(i) 
        delay(500)
    }
}

fun main() = runBlocking {
    println("Program started.")

    // Creating a flow
    val myFlow = simpleFlow();
    
    println("Collecting:")

    // We can use '.onEahc{}.collect()' combination.
    myFlow.onEach { value -> 
        // The 'onEach' operator in Kotlin Flow is designed for side effects.
        println("Doing some job in onEach: $value")
        // 'onEach' bypasses the 'value' to collect directly.
    }.collect() // Just starts collecting process.

    println("Collecting again:")
    
    // We can use '.onEach{}.collect{}' combination.
    myFlow.onEach { value -> 
        // The 'onEach' operator in Kotlin Flow is designed for side effects.
        println("Doing some job in onEach: $value")
        // 'onEach' bypasses the 'value' to collect directly.
    }.collect { value -> 
        println("Received value in collect: $value")
    }
    
    println("Program finished.")
}
main()

Program started.
Collecting:
Doing some job in onEach: 1
Doing some job in onEach: 2
Doing some job in onEach: 3
Doing some job in onEach: 4
Doing some job in onEach: 5
Collecting again:
Doing some job in onEach: 1
Received value in collect: 1
Doing some job in onEach: 2
Received value in collect: 2
Doing some job in onEach: 3
Received value in collect: 3
Doing some job in onEach: 4
Received value in collect: 4
Doing some job in onEach: 5
Received value in collect: 5
Program finished.


In [10]:
// Using 'onStart' and 'onCompletion'

fun simpleFlow(): Flow<Int> = flow {
    for (i in 1 until 6) {
        emit(i)  
        delay(500) 
    }
}

fun main() = runBlocking {
    println("Program started.")

    // Creating a flow
    val myFlow = simpleFlow(); 
    
    println("Collecting:")

    // 'onStart' executed only on start of the flow
    myFlow.onStart { 
        println("Flow Started")
        emit(0) // onStart can 'emit' the data of the same type
    }.onEach { value -> 
        println("Doing some job in onEach: $value")
    }.onCompletion { cause ->
        /*
        'cause' is the cause of completion.

        If the Flow completed normally, meaning all values were successfully emitted
        and the Flow reached its end, it will be null.
        
        If the Flow completed due to an exception, it will be non-null and 
        hold the exception that caused the Flow to terminate.
        */
        println("Flow completed: $cause")
        
        // onCompletion can also 'emit' the data of the same type then we can collect it in 'collect {}'
        emit(100)  
        
    }.collect { value ->
        println("Received value in collect: $value")
    }

    println("Program finished.")
}
main()

Program started.
Collecting:
Flow Started
Doing some job in onEach: 0
Received value in collect: 0
Doing some job in onEach: 1
Received value in collect: 1
Doing some job in onEach: 2
Received value in collect: 2
Doing some job in onEach: 3
Received value in collect: 3
Doing some job in onEach: 4
Received value in collect: 4
Doing some job in onEach: 5
Received value in collect: 5
Flow completed: null
Received value in collect: 100
Program finished.


## Error Handling

In [11]:
// Catching exception

fun simpleFlow(): Flow<Int> = flow {
    for (i in 1 until 6) {
        // Throwing an exception
        if (i > 3) throw RuntimeException() 
        
        emit(i) 
        delay(500)
    }
}

fun main() = runBlocking {
    println("Program started.")

    // Creating a flow
    val myFlow = simpleFlow(); 
    
    println("Collecting:")

    myFlow.onEach { value -> 
        println("Doing some job in onEach: $value")
    }.onCompletion { cause -> // 'onCompletion' is somehow similar to 'finally'
        // If we use 'onCompletion' after 'catch', it will be executed but 'cause' will be null.
        println("Flow completed: $cause")
        emit(101) // Won't be received in 'collect' because 'catch' has caught an error.
    }.catch { 
        // if 'catch' is used above 'onCompletion', then 'onCompletion' won't receive the 'cause' of exception.
        e -> println("Caught exception: $e") 
        emit(102) // 'catch' can also 'emit' extra data
    }.collect { value ->
        println("Received value in collect: $value")
    } // Or we can use 'collect()' to just starts getting data; in that case we'll miss the emited data in 'catch'

    println("Program finished.")
}
main()

Program started.
Collecting:
Doing some job in onEach: 1
Received value in collect: 1
Doing some job in onEach: 2
Received value in collect: 2
Doing some job in onEach: 3
Received value in collect: 3
Flow completed: java.lang.RuntimeException
Caught exception: java.lang.RuntimeException
Received value in collect: 102
Program finished.


## Flow Cancellation

In [12]:
// Cancelling a flow is similar to coroutines

fun simple(): Flow<Int> = flow { 
    for (i in 1 until 5) {
        println("Emitting $i")
        delay(500) // Comment to see what will happen         
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { // Timeout after 250ms 
        simple().onCompletion { cause ->
             println("Completed: $cause")
        }.collect { value -> println(value) } 
        
        /*
        When there is a managed cancelling, 'catch {}' won't catch 'CancellationException' unless
        we directly 'throw' a 'CancellationException'.
        
        Here, catch will not catch 'TimeoutCancellationException':
        simple().catch { e -> println("Exception happened: $e")}.collect { value -> println(value) } 
        */
    }
    println("Done")
}

main()

Emitting 1
Completed: kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 250 ms
Done


## Changing Flow Context

- By default, code in the `flow { ... }` builder runs in the context that is provided by a collector of the corresponding flow.
- It is better to use `log` for coroutine debugging by enabling `kotlinx.coroutines.debug` in your codes; here, we haven't used it.

In [13]:
// Unsing 'withContxet' at call site

val context: CoroutineContext = Dispatchers.IO + CoroutineName("The Flow's Context")

fun simple(): Flow<Int> = flow { 
    for (i in 1 until 5) {
        delay(500) 
        emit(i)
    }
}

fun main() = runBlocking {
    println("${coroutineContext},\n ${Thread.currentThread().toString()}: program started.\n") 
    withContext(context) { 
        simple().collect { value -> 
            println("${coroutineContext},\n ${Thread.currentThread().toString()}: $value\n\n") 
        } 
    }
    println("coroutineContext},\n ${Thread.currentThread().toString()}: program finished.\n") 
}

main()
context.cancel()

[BlockingCoroutine{Active}@1b00218, BlockingEventLoop@74a703c5],
 Thread[#28,Execution of code '// Unsing 'withContx...',5,main]: program started.

[CoroutineName(The Flow's Context), DispatchedCoroutine{Active}@2736afd5, Dispatchers.IO],
 Thread[#73,DefaultDispatcher-worker-1,5,main]: 1


[CoroutineName(The Flow's Context), DispatchedCoroutine{Active}@2736afd5, Dispatchers.IO],
 Thread[#73,DefaultDispatcher-worker-1,5,main]: 2


[CoroutineName(The Flow's Context), DispatchedCoroutine{Active}@2736afd5, Dispatchers.IO],
 Thread[#73,DefaultDispatcher-worker-1,5,main]: 3


[CoroutineName(The Flow's Context), DispatchedCoroutine{Active}@2736afd5, Dispatchers.IO],
 Thread[#73,DefaultDispatcher-worker-1,5,main]: 4


coroutineContext},
 Thread[#28,Execution of code '// Unsing 'withContx...',5,main]: program finished.



In [14]:
// But there is a standard way to do it using 'flowOn' operator

fun simple(): Flow<Int> = flow { 
    for (i in 1 until 5) {
        delay(500)       
        emit(i) // Emits on IO dispatcher.
        println("${Thread.currentThread().toString()}: emitting.") 
    }
}.flowOn(Dispatchers.IO) // Here, 'flowOn' operator is used to change the upstream context, e.g. 'flow {}'.

fun main() = runBlocking {
    println("${coroutineContext},\n ${Thread.currentThread().toString()}: program started.\n") 
    
    simple().collect { value ->
        println("${Thread.currentThread().toString()}: $value\n") // Collected on 'runBlocking' context 
    } 
    println("coroutineContext},\n ${Thread.currentThread().toString()}: program finished.\n") 
    
}

main()

[BlockingCoroutine{Active}@c4c8c0b, BlockingEventLoop@4a9c1596],
 Thread[#28,Execution of code '// But there is a st...',5,main]: program started.

Thread[#73,DefaultDispatcher-worker-1,5,main]: emitting.
Thread[#28,Execution of code '// But there is a st...',5,main]: 1

Thread[#73,DefaultDispatcher-worker-1,5,main]: emitting.
Thread[#28,Execution of code '// But there is a st...',5,main]: 2

Thread[#73,DefaultDispatcher-worker-1,5,main]: emitting.
Thread[#28,Execution of code '// But there is a st...',5,main]: 3

Thread[#73,DefaultDispatcher-worker-1,5,main]: emitting.
Thread[#28,Execution of code '// But there is a st...',5,main]: 4

coroutineContext},
 Thread[#28,Execution of code '// But there is a st...',5,main]: program finished.



In [15]:
// Usig multiple 'flowOn'

val threadNo = 1
val context: CoroutineContext = Executors.newFixedThreadPool(threadNo).asCoroutineDispatcher() +
    CoroutineName("The Flow's Context")

fun simple(): Flow<Int> = flow { 
    for (i in 1 until 5) {
        delay(500)       
        emit(i)
        println("Emitting on `Custom Dispatcher`: $i")
    }
}.flowOn(context)  // changes the upstream context

fun main() = runBlocking {
    println("program started.\n") 

    simple()
        .onEach { v -> println("On `IO Dispatcher`: $v")}
        .flowOn(Dispatchers.IO)  // changes the upstream context
        .onEach { v -> println("On `Default Dispatcher`: $v")}
        .flowOn(Dispatchers.Default)  // changes the upstream context
        .collect { v -> 
            println("Collected: $v")
        }
        
    println("/nprogram finished.") 
}

main()
context.cancel()

program started.

Emitting on `Custom Dispatcher`: 1
On `IO Dispatcher`: 1
On `Default Dispatcher`: 1
Collected: 1
Emitting on `Custom Dispatcher`: 2
On `IO Dispatcher`: 2
On `Default Dispatcher`: 2
Collected: 2
Emitting on `Custom Dispatcher`: 3
On `IO Dispatcher`: 3
On `Default Dispatcher`: 3
Collected: 3
Emitting on `Custom Dispatcher`: 4
On `IO Dispatcher`: 4
On `Default Dispatcher`: 4
Collected: 4
/nprogram finished.


## Using `launchIn`

- `Flow`s can also be launched in a given `CoroutineScope`.
- This is specifically useful when using default coroutine scopes in Android programming.

In [16]:
// Using 'launchIn()' with custom context

val threadNo = 1
val context: CoroutineContext = 
    Executors.newFixedThreadPool(threadNo).asCoroutineDispatcher() +
    CoroutineName("The Flow's Context")
val myScope = CoroutineScope(context)
    
fun simple(): Flow<Int> = flow { 
    for (i in 1 until 5) {
        delay(500)       
        emit(i)
    }
}

fun main() = runBlocking {
    println("program started.\n") 

    val job = simple().onEach { v -> 
        println("Getting emitted value: $v")
    }.launchIn(myScope)  // Launches a coroutine in the given scope and returns the realted 'Job'
    
    job.join()
    
    println("\nprogram finished.") 
}

main()
myScope.cancel()

program started.

Getting emitted value: 1
Getting emitted value: 2
Getting emitted value: 3
Getting emitted value: 4

program finished.


## Buffering

- **Buffering** in flows is useful because it helps decouple the emission of values from their collection, allowing the flow to emit values **faster than they are collected**. This is particularly helpful when:

    1. The **flow emits items faster** than the collector can process them.
    2. You want to avoid backpressure i.e., slowing down the flow's emission **due to slow collection**.

- There are different types of buffers: **Default Buffer**, **Buffer with Custom Size**, **Conflated Buffer** and **Unlimited Buffer**

- **Buffering Guarantees**: **Except** for `conflate()`, using any buffer size guarantees that all emitted values will be collected. The flow will manage the rate of emission based on the collector's pace, **ensuring no values are lost**.


In [17]:
// Experiment without buffer

fun simple(): Flow<Int> = flow { 
    for (i in 0 until 10) {
        delay(500)       
        emit(i)
    }
}

fun main() = runBlocking {
    println("program started.\n") 

    var startTimestamp = 0L
    var endTimestamp = 0L
    simple().onStart {
        startTimestamp = System.currentTimeMillis()
    }.onEach { v -> 
        delay(1_000)
        println("Getting emitted value: $v")
    }.onCompletion {
        endTimestamp = System.currentTimeMillis()
    }.collect()
    
    val duration = (endTimestamp - startTimestamp)/1000L
    println("\nprogram finished in: $duration seconds") 
}

main() // It will take approximately 15 seconds = 10 * (1s + 0.5s)

program started.

Getting emitted value: 0
Getting emitted value: 1
Getting emitted value: 2
Getting emitted value: 3
Getting emitted value: 4
Getting emitted value: 5
Getting emitted value: 6
Getting emitted value: 7
Getting emitted value: 8
Getting emitted value: 9

program finished in: 15 seconds


In [18]:
// Using default size buffer

/*
Default Buffer: The default buffer size is 64 items, 
meaning up to 64 items can be stored in the buffer before backpressure kicks in.
*/

fun simple(): Flow<Int> = flow { 
    for (i in 0 until 10) {
        delay(500)       
        emit(i)
    }
}

fun main() = runBlocking {
    println("program started.\n") 

    var startTimestamp = 0L
    var endTimestamp = 0L

    // Defining a buffer after getting a flow instance.
    simple().buffer().onStart {
        startTimestamp = System.currentTimeMillis()
    }.onEach { v -> 
        delay(1_000)
        println("Getting emitted value: $v")
    }.onCompletion {
        endTimestamp = System.currentTimeMillis()
    }.collect()
    
    val duration = (endTimestamp - startTimestamp)/1000L
    println("\nprogram finished in: $duration seconds") 
}

main() // It will take approximately 10 seconds (10 * 1s) because emissions are faster and are gathered in the buffer

program started.

Getting emitted value: 0
Getting emitted value: 1
Getting emitted value: 2
Getting emitted value: 3
Getting emitted value: 4
Getting emitted value: 5
Getting emitted value: 6
Getting emitted value: 7
Getting emitted value: 8
Getting emitted value: 9

program finished in: 10 seconds


In [19]:
// Using fixed-size and unlimited-size buffer

/*
Fixed-Size Buffer : It has a given size and used as: [flow].buffer([size])
Unlimited-Size Buffer: It size is unlimited and used as: [flow].buffer(capacity = Channel.UNLIMITED)
*/

fun simple(): Flow<Int> = flow { 
    for (i in 0 until 10) {
        delay(500)       
        emit(i)
    }
}

fun main() = runBlocking {
    println("program started.\n") 

    var startTimestamp = 0L
    var endTimestamp = 0L

    /*
    A fixed-size buffer can only buffer a limited counts of item.
    To use unlimited size buffer, we can simply replace size with 'capacity = Channel.UNLIMITED'
    */
    
    simple().buffer(3).onStart {
        startTimestamp = System.currentTimeMillis()
    }.onEach { v -> 
        delay(1_000)
        println("Getting emitted value: $v")
    }.onCompletion {
        endTimestamp = System.currentTimeMillis()
    }.collect()
    
    val duration = (endTimestamp - startTimestamp)/1000L
    println("\nprogram finished in: $duration seconds") 
}

main() 

program started.

Getting emitted value: 0
Getting emitted value: 1
Getting emitted value: 2
Getting emitted value: 3
Getting emitted value: 4
Getting emitted value: 5
Getting emitted value: 6
Getting emitted value: 7
Getting emitted value: 8
Getting emitted value: 9

program finished in: 10 seconds


In [20]:
// Using 'Conflate Buffer'

/*
If you use conflate(), the flow will only keep the latest value in the buffer, 
dropping any previous ones. 
This is useful when you're only interested in the most recent value, and older ones don't matter.
*/

// IMPORTAMT: Collecting by 'Conflate Buffer' might lead to lose of some emitted values. 

fun simple(): Flow<Int> = flow { 
    for (i in 0 until 10) {
        delay(500)       
        emit(i)
    }
}

fun main() = runBlocking {
    println("program started.\n") 

    // Using 'measureTimeMillis' built-in function to measure calc. time.
    val timeMillis = measureTimeMillis { 
        simple().conflate().onEach { v -> 
            delay(1_000)
            println("Getting emitted value: $v")
        }.collect()
    }

    
    
    val duration = timeMillis/1000L
    println("\nprogram finished in: $duration seconds") 
}

main() // As it can be seen some values are missing.

program started.

Getting emitted value: 0
Getting emitted value: 1
Getting emitted value: 3
Getting emitted value: 5
Getting emitted value: 7
Getting emitted value: 9

program finished in: 6 seconds


## Combining flows

- There are some ways like `zip` and `combine` to create a single flow out of them.
- The `zip` operator in Kotlin's `Flow` takes elements from two flows and combines them pairwise. If one flow `emit`s more elements than the other, the zip operator will stop when the shorter flow runs out of elements, effectively **ignoring any extra elements** from the longer flow.
-  Unlike `zip`, the `combine` operator in Kotlin **does not stop** when one of the flows is exhausted. Instead, it `emit`s a new value whenever **any** of the flows emits a value.
-  When flows are **asynchrone**:
    > `zip` always **waits** so that **both flows emit new data**, and then emits value.
     
    > `combine` emits new data when **ONE** of the flows `emit` new data; it keeps other flows' *latest emitted data* and combines them. `combine` starts emitting when all flows have already emitted atleast once. `combine` **ignores** emitted data **before ALL flows emit for the first time**.

In [21]:
// Combining two flows using 'zip'

fun main() = runBlocking {
    // Using 'flowOf' to create a flow of the given elements
    val nums = flowOf(1, 2, 3)
    val strs = flowOf("one", "two", "three", "four")

    // Using 'zip' to combine two flows and get a new flow
    /*
    'zip' takes equal count of elements of two flows and ignores extra elements.
    In this example, 'strs' has an extra element.
    */
    val combFlow = nums.zip(strs) { a, b ->
        "$a " + b
    }
    combFlow.collect { v ->
        println("value: $v")
    }
}

main()                   

value: 1 one
value: 2 two
value: 3 three


In [22]:
// Using 'combine' to create a flow from multiple flows

fun main() = runBlocking {
    // Using 'flowOf' to create a flow of the given elements
    val nums = flowOf(1, 2, 3)
    val strs = flowOf("one", "two", "three", "four")
    val chr = flowOf('A', 'B', 'C', 'D', 'E')

    // Using 'combine' to combine three flows and get a new flow
    /*
    On the contrary, 'combine' keeps using the last element of the flow that has been
    exhausted and repeats it whenever another flow emits new item.
    */
    val combFlow = combine(nums, strs, chr) { a, b, c ->
        "$a, " + b + ", " + c
    }
    combFlow.collect { v ->
        println("value: $v")
    }
}

main() 

value: 1, one, A
value: 2, two, B
value: 3, three, C
value: 3, four, D
value: 3, four, E


In [23]:
// Asynchronous flows and 'zip'

fun main() = runBlocking {
    // Using 'map' to add a delay
    val nums = flowOf(1, 2, 3).onEach {
        delay(500)
    }
    
    val strs = flowOf("one", "two", "three", "four").onEach { 
        delay(2_000)
    }
    
    val combFlow = nums.zip(strs) { a, b ->
        "$a " + b // Waits for both flows to emit.
    }
    val timeMillis = measureTimeMillis {   
        combFlow.collect { v ->
            println("value: $v")
        }
    }
    println("finished in: ${timeMillis/1000} seconds.")
}

main() // Collecting will take 6s = 3*2s

value: 1 one
value: 2 two
value: 3 three
finished in: 6 seconds.


In [24]:
// Asynchronous flows and 'combine'

fun main() = runBlocking {

    val nums = flowOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).onEach {
        delay(500)
    }
    
    val strs = flowOf("one", "two", "three").onEach {
        delay(1_000)
    }

    val chr = flowOf('A', 'B', 'C', 'D', 'E').onEach {
        delay(2_000)
    }
    
    val combFlow = combine(nums, strs, chr) { a, b, c ->
        /*
        IMPORTANT:
        
        * Initial Emission: For combine to start emitting values, each flow involved in the combination
        must have emitted at least one value.
        
        * Reactive Updates: After the initial emission, combine will 'emit' a new value
        whenever any of the flows emits a new value. 
        It takes the most recent value from each flow at the time of emission.
        */
        
        "$a, " + b + ", " + c
    }
    val timeMillis = measureTimeMillis {   
        combFlow.collect { v ->
            println("value: $v")
        }
    }
    println("finished in: ${timeMillis/1000} seconds.")
}

main() 

value: 3, one, A
value: 4, two, A
value: 5, two, A
value: 6, three, A
value: 7, three, A
value: 7, three, B
value: 8, three, B
value: 9, three, B
value: 10, three, B
value: 10, three, C
value: 10, three, D
value: 10, three, E
finished in: 10 seconds.


## Falettening flows of flows

- Flattening is used when we have nested flows, like when a flow initiates another flow.
- Flows can emit flows. Example:
  ```kotlin
  fun getFlow(i: Int): Flow<String> = flow {
      delay(500) 
      emit("message from $i")
  }

  val flw = (1 until 5).asFlow().map {it -> getFlow(it)}
  ```
- The type of `flw` is `Flow<Flow<String>>`. So we need to collect nested flow data when it is emitted.
- To better process data we need to **flatten** these flows.
- We can use `flatMapConcat` operator or other similar ones to do it.
- `flatMapConcat` waits until the inner flows data collected, **then**:
  1. Emits the data to the downstream.
  2. Request the upstream (outer flow) to emit new data.
- There are other options that you can find it in [the documentation](https://kotlinlang.org/docs/flow.html#flatmapconcat).

In [25]:
// Using 'flatMapConcat'

fun getFlow(i: Int): Flow<String> = flow {
    delay(500) 
    emit("message 1 from $i")
    delay(500) 
    emit("message 2 from $i")
}


fun main() = runBlocking {
    // We use 'flatMapConcat' here.
    val flw = (1 until 6).asFlow().onEach { it ->
        println("New emission requested $it")
        delay(2000)
    }.flatMapConcat { it -> 
        getFlow(it)
    }
    val timeMillis = measureTimeMillis {   
        flw.collect { v -> 
            println("Collected value: $v")
        }
    }
    println("Finished in: ${timeMillis/1000} seconds.")
}

main() 

New emission requested 1
Collected value: message 1 from 1
Collected value: message 2 from 1
New emission requested 2
Collected value: message 1 from 2
Collected value: message 2 from 2
New emission requested 3
Collected value: message 1 from 3
Collected value: message 2 from 3
New emission requested 4
Collected value: message 1 from 4
Collected value: message 2 from 4
New emission requested 5
Collected value: message 1 from 5
Collected value: message 2 from 5
Finished in: 15 seconds.


## **DYE**: Do Your Experiment

In [26]:
// Ready for your codes