In [1]:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

In [11]:
fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

runBlocking<Unit> {
    val time = measureTimeMillis {
        simple().collect { value ->
            delay(500)
            println(value)
        }
    }

    println("Collected in $time ms")
}

1
2
3
Collected in 1815 ms


In [12]:
fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

runBlocking<Unit> {
    val time = measureTimeMillis {
        simple().buffer() // 버퍼 추가해서 송신 측에서 기다리지 않게 구성함
            .collect{ value ->
                delay(500)
                println(value)
            }
    }

    println("Collected in $time ms")
}

1
2
3
Collected in 1608 ms


In [15]:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun simple(): Flow<Int> = flow {
    for (i in 1..10) {
        delay(100)
        emit(i)
    }
}

runBlocking<Unit> {
    val time = measureTimeMillis {
        simple().conflate() // 처리보다 빨리 발생한 데이터의 중간 값들을 누락
            .collect { value ->
                delay(300)
                println(value)
            }
    }

    println("Collected in $time ms")
}

1
5
10
Collected in 1614 ms


In [17]:
fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

runBlocking<Unit> {
    val time = measureTimeMillis {
        // conflate와 같이 방출되는 값을 누락할 수도 있지만 수집 측이 느릴 경우
        // 새로운 데이터가 있을 때 수집 측을 종료시키고 새로 시작
        simple().collectLatest { value ->
            println("값 $value 를 처리하기 시작합니다.")
            delay(300)
            println(value)
            println("처리를 완료했습니다.")
        }
    }

    println("Collected in $time ms")
}

값 1 를 처리하기 시작합니다.
값 2 를 처리하기 시작합니다.
값 3 를 처리하기 시작합니다.
3
처리를 완료했습니다.
Collected in 617 ms


In [19]:
// zip은 양쪽의 데이터를 한꺼번에 묶어 새로운 데이터를 만듦
runBlocking<Unit> {
    val nums = (1..3).asFlow()
    val strs = flowOf("일", "이", "삼")

    nums.zip(strs) { a, b -> "${a}은(는) ${b}" }
        .collect { println(it) }
}

1은(는) 일
2은(는) 이
3은(는) 삼


In [20]:
// combine은 양쪽의 데이터를 같은 시점에 묶지 않고 한 쪽이 갱신되면 새로 묶어 데이터를 만듦
runBlocking {
    val nums = (1..3).asFlow().onEach { delay(100L) }
    val strs = flowOf("일", "이", "삼").onEach { delay(200L) }

    nums.combine(strs) { a, b -> "${a}은(는) ${b}"}
        .collect{ println(it) }
}

1은(는) 일
2은(는) 일
3은(는) 일
3은(는) 이
3은(는) 삼


In [21]:

fun requestFlow(i: Int): Flow<String>  = flow {
    emit("$i: First")
    delay(500)
    emit("$i: Second")
}

runBlocking<Unit> {
    val startTime = System.currentTimeMillis()

    (1..3).asFlow()
                 .onEach { delay(100) }
                 .flatMapConcat { requestFlow(it) } // flatMapConcat은 첫번째 요소에 대해서 플레트닝을 하고 나서 두번째 요소를 처리함
                 .collect { value -> println("$value at ${System.currentTimeMillis() - startTime} ms from start")}
}

1: First at 106 ms from start
1: Second at 612 ms from start
2: First at 715 ms from start
2: Second at 1217 ms from start
3: First at 1321 ms from start
3: Second at 1823 ms from start


In [23]:
fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500)
    emit("$i: Second")
}

runBlocking<Unit> {
    val startTime = System.currentTimeMillis()

    (1..3).asFlow()
                .onEach { delay(100) }
                .flatMapMerge { requestFlow(it) }// flatMapMerge는 첫 요소의 프레트닝을 시작하며 이어 다음 요소의 플레트닝을 시작
                .collect{ value ->  println("$value at ${System.currentTimeMillis() - startTime} ms from start") }
}

1: First at 2 ms from start
2: First at 2 ms from start
3: First at 2 ms from start
1: Second at 505 ms from start
2: Second at 505 ms from start
3: Second at 505 ms from start


In [24]:
fun requestFlow(i: Int): Flow<String> = flow {
    emit("$i: First")
    delay(500)
    emit("$i: Second")
}

runBlocking<Unit> {
    val startTime = System.currentTimeMillis()

    (1..3).asFlow()
                .onEach { delay(100) }
                .flatMapLatest { requestFlow(it) } // 다음 요소의 플레트닝을 시작하며 이전에 진행 중이던 플레트닝을 취소
                .collect{ value ->  println("$value at ${System.currentTimeMillis() - startTime} ms from start") }
}

1: First at 104 ms from start
2: First at 206 ms from start
3: First at 310 ms from start
3: Second at 813 ms from start


In [25]:
// 예외는 collect을 하는 수집기 측에서도 try-catch 식을 이용할 수 있음

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

runBlocking<Unit> {
    try {
        simple().collect { value ->
            println(value)
            check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    }
}

Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2


In [26]:
// 모든 예외는 처리가능
fun simple(): Flow<String> =
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i)
        }
    }.map{value ->
        check(value <= 1) { "Crashed on $value" }
        "string $value"
    }

runBlocking {
    try {
        simple().collect { println(it) }
    } catch (e: Throwable) {
        println("Caught $e")
    }
}

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2


In [28]:
fun simple(): Flow<String> =
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i)
        }
    }.map{ value ->
        check(value <= 1) { "Crashed on $value" }
        "string $value"
    }

runBlocking {
    simple().catch { e -> emit("Caught $e") } //  플로우에서는 catch 연산자를 이용하는 것을 권함
        // catch 블록에서 예외를 새로운 데이터로 만들어 emit을 하거나, 다시 예외를 던지거나, 로그를 남길 수 있음
        .collect { println(it) }
}

Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2


In [30]:
fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

runBlocking {
    simple().catch { e -> println("Caught $e") } // catch 연산자는 업스트림(catch 연산자를 쓰기 전의 코드)에만 영향을 미치고 다운스트림에는 영향을 미치지 않음
        // 이를 catch 투명성이라고함
        .collect { value ->
            check(value <= 1) { "Collected $value" }
            println(value)
        }
}

Emitting 1
1
Emitting 2


java.lang.IllegalStateException: Collected 2

In [31]:
fun simple(): Flow<Int> = (1..3).asFlow()

runBlocking<Unit> {
    try {
        simple().collect { println(it) }
    } finally {
        println("Done!!")
    }
}


1
2
3
Done!!


In [32]:
fun simple(): Flow<Int> = (1..3).asFlow()

runBlocking {
    simple().onCompletion { println("Done") } // onCompletion 연산자를 선언해서 완료를 처리할
            .collect { println(it) }
}

1
2
3
Done


In [34]:
// onCompletion은 종료 처리를 할 때 예외가 발생되었는지 여부를 알 수 있음
fun simple(): Flow<Int> = flow{
    emit(1)
    throw RuntimeException()
}

runBlocking {
    simple().onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
            .catch { cause -> println("Caught exception")  }
            .collect { println(it) }
}

1
Flow completed exceptionally
Caught exception


In [35]:
fun events(): Flow<Int>  = (1..3).asFlow().onEach { delay(100) }

runBlocking<Unit> {
    events().onEach { println("Event $it") }
            .collect() //  collect가 플로가 끝날 때 까지 기다리는 것이 문제

    println("Done!!")
}

Event 1
Event 2
Event 3
Done!!


In [36]:
// launchIn을 이용하면 별도의 코루틴에서 플로우를 런칭가능
fun evnets(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

runBlocking {
    events().onEach { println("Event $it") }
    println("Done!!")
}

Done!!
