In [1]:
@file:DependsOn("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.4")

In [2]:
:classpath

Current classpath (16 paths):
/Users/a06411/opt/anaconda3/envs/kotlin/lib/python3.10/site-packages/run_kotlin_kernel/jars/lib-0.11.0-170.jar
/Users/a06411/opt/anaconda3/envs/kotlin/lib/python3.10/site-packages/run_kotlin_kernel/jars/api-0.11.0-170.jar
/Users/a06411/opt/anaconda3/envs/kotlin/lib/python3.10/site-packages/run_kotlin_kernel/jars/kotlin-script-runtime-1.8.0-dev-3517.jar
/Users/a06411/opt/anaconda3/envs/kotlin/lib/python3.10/site-packages/run_kotlin_kernel/jars/kotlin-reflect-1.7.10.jar
/Users/a06411/opt/anaconda3/envs/kotlin/lib/python3.10/site-packages/run_kotlin_kernel/jars/kotlin-stdlib-1.7.10.jar
/Users/a06411/opt/anaconda3/envs/kotlin/lib/python3.10/site-packages/run_kotlin_kernel/jars/kotlin-stdlib-common-1.7.10.jar
/Users/a06411/opt/anaconda3/envs/kotlin/lib/python3.10/site-packages/run_kotlin_kernel/jars/annotations-13.0.jar
/Users/a06411/opt/anaconda3/envs/kotlin/lib/python3.10/site-packages/run_kotlin_kernel/jars/kotlinx-serialization-json-jvm-1.3.3.jar
/Users/a06

## 액터

- 액터는 코루틴에서 쓰레드간 동기화를 지원하기 위한 도구이다.

- 액터는 두 가지 강력한 도구의 조합이다.

> - 상태 엑세스를 단일 스레드로 한정한다.

> - 다른 쓰레드는 채널을 통해서 상태 수정을 요청한다.

In [3]:
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.actor

## 액터 함수 알아보기

In [4]:
// interface ActorScope<E> : CoroutineScope, ReceiveChannel<E>  // 액터스코프는 코루틴과 리스브채널


// public fun <E> CoroutineScope.actor(                         // 액터 빌더 
//    context: CoroutineContext = EmptyCoroutineContext,        // 컨텍스트
//    capacity: Int = 0,                                        // 용량 
//    start: CoroutineStart = CoroutineStart.DEFAULT,
//    onCompletion: CompletionHandler? = null,                  // 핸들러 
//    block: suspend ActorScope<E>.() -> Unit                   // 일시중단 람다표현식 
// ): SendChannel<E>                                            // 전송 채널 

## 액터에 데이터 전송하고 출력하기 

In [5]:
fun main() = runBlocking {                                      // 런블러킹 스코프 생성 
  val actor1 = actor<String>(capacity = 10) {                   // 액터 빌더 
    for (data in channel) {                                     // 액터 내부의 수신된 데이터 출력                     
      println(data +" Thread : " + Thread.currentThread().name)
    }
  }
  (1..5).forEach { 
      actor1.send(it.toString())                                // 액터에 데이터 전송
  }                                                      
  actor1.close()                                                // 액터 종료
  delay(500L)                                                   // 전체 지연 
  println(" closed ")
}

main()

1 Thread : Thread-17
2 Thread : Thread-17
3 Thread : Thread-17
4 Thread : Thread-17
5 Thread : Thread-17
 closed 


## 확장함수로 액터 생성하고 메시지 전송만 하기 

- Client 입장에서는 Actor는 채널과 똑같기 때문에 메시지를 보낼 때 send() 를 사용한다.

In [6]:
fun CoroutineScope.actorCounter() = actor<Int> {       // 코루틴 스코프에 액터를 정의 
    println(channel::class.simpleName)
    var count = 0                                      // actor 로컬 변수
    for (msg in channel) {                             // channel을 순회하는 For-loop문
        count++                                        // 수신마다 count 로컬변수 값 증가
        println("수신받은 메시지 : $msg")                  // 수신마다  출력
    }                                                   // channel이 닫히고 for-loop문을 벗어나면 출력
    println("result $count")
}

fun main() = runBlocking {
        val counter = actorCounter()                   // 액터 공유 채널 만들기 
        val workA = async {                            // 코루틴에서 전달 
                repeat(2) {
                counter.send(it)                       // 액터에 전송 
            }
        }
        val workB = async {                            // 코루틴에서 전달 
               repeat(3) {
                    counter.send(it)                   // 액터에 전송 
            }
        }
        workA.await()                                  // 반환값 없음                       
        workB.await()
        counter.close()                                // 액터 종료
}
    
main()

ActorCoroutine
수신받은 메시지 : 0
수신받은 메시지 : 1
수신받은 메시지 : 0
수신받은 메시지 : 1
수신받은 메시지 : 2
result 5


true

## actor() - SendChannel<E> 반환

     : 채널을 통해 코루틴 블럭(Scope)와 외부에서 통신을 통해 전송 / 처리의 루틴을 실행하는 빌더이다.

       actor 빌더는 SendChannel<E>를 반환, Send채널을 통해 actor()블록으로 채널을 통해 전송을 할 수 있다. 

       즉, actor{} 블록 내부는 수신자(Receiver)가 되고 / 반환된 SendChannel이 송신자(Sender)라고 보면 된다

In [7]:
fun CoroutineScope.basicActor() = actor<Int> {
    var count = 0                                  // actor 로컬 변수
    for (msg in channel) {                         // channel을 순회하는 For-loop문
        count++                                    // 수신마다 count 로컬변수 값 증가
        println("수신받은 메시지 : $msg")              // 수신마다  출력
    }
                                                   // channel이 닫히고 for-loop문을 벗어나면 출력
    println("result $count")
    
}
                                                   // 0.5초 딜레이를 갖고 3번 채널에 send(송신) 반복
val rb = runBlocking {                             // 런블러킹 스코프 정의
    val chan = basicActor()                        // 액터 빌더 실행 
    repeat(3) {
        delay(500)
        println("송신한 메시지 : $it")
        chan.send(it)                              // 액터에 데이터 전송 
    }

    chan.close()                                    // 액터 종료
}

송신한 메시지 : 0
수신받은 메시지 : 0
송신한 메시지 : 1
수신받은 메시지 : 1
송신한 메시지 : 2
수신받은 메시지 : 2
result 3


## 액터로 송수신하기

In [8]:
sealed class CounterMsg                                       // 모든 메시지 타입의 부모 클래스
object IncCounter : CounterMsg()                              // 변수를 1 증가시키라는 메시지
class GetCounter(val response: CompletableDeferred<Int>) 
                            : CounterMsg()                   // 변수의 값을 돌려달라는 메시지

In [9]:
fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0                                          // 변수 (state)
    for (msg in channel) {                                   // 들어오는 메시지를 처리한다.
        println("수신메시지 : " + msg::class.simpleName)
        when (msg) {
            is IncCounter -> { counter++                      // 수신처리
                               println("수신 " + Thread.currentThread().name) 
                             }
            is GetCounter -> { msg.response.complete(counter)  // 송신처리
                               println("송신 " + Thread.currentThread().name) 
                              }
            else  -> {println("else " + Thread.currentThread().name)  }   // 버전변경으로 추가 
        }
    }
}

fun main() = runBlocking<Unit> {
    val counter = counterActor()                              // actor 생성
    launch(Dispatchers.Default) {
        launch {
            counter.send(IncCounter)
        }
    }
                                                             // actor로부터 값을 받는다.
    delay(500)
    val response = CompletableDeferred<Int>()                // deferred 객체 생성 
    counter.send(GetCounter(response))                       // 수신받기 위해 액터에 전송 
    println("Counter = ${response.await()}")                 // 수신결과를 확인
    delay(1000)
    counter.close()                                           // actor 종료
}
main()

수신메시지 : IncCounter
수신 Thread-33
수신메시지 : GetCounter
송신 Thread-33
Counter = 1


In [10]:
// interface CompletableDeferred<T> : Deferred<T>         // 비동기 처리의 결과 

sealed class Message                                      // 봉인 클래스로 상속관계 명확화 
object Receive : Message()                                // 수신 메시지 처리
class  Send (val response: CompletableDeferred<Int>)      // 송신 메시지 처리
                            : Message()                   // 인자로 deferred 객체 수신 필요

In [11]:
fun CoroutineScope.numsActor() = actor<Message> {
    var counter = 0                                         // 내부 상태 관리
    for (msg in channel) {                                  // 들어오는 메시지를 처리한다.
        println("수신메시지 : " + msg::class.simpleName)
        println(coroutineContext)
        when (msg) {
            is Receive -> {  counter++                      // 내부 값 변경 
                           println("수신 " + Thread.currentThread().name) }
                                                            //  송신값 처리 
            is Send ->     {  msg.response.complete(counter * 100)
                            println("송신 " + Thread.currentThread().name) }
            else  -> {println("else " + Thread.currentThread().name)  }   // 버전변경으로 추가 
        }
    }
}

fun main() {
    GlobalScope.launch(Dispatchers.Default) {
        val counter = numsActor()                               // actor 생성
        withContext(Dispatchers.Default) {
            launch {
                counter.send(Receive)                           // 메시지 전송 
            }
        }
        launch {  
            delay(300)                                          // actor로부터 값을 받는다.
            val response = CompletableDeferred<Int>()
            counter.send(Send(response))                        // 반환값을 받기 위한 메시지 전송 
            println("송신되는 값 = ${response.await()}")           // Deferred 처리이므로 await로 결과 수신처리
        }
        delay(2000)
        counter.close()                                          // actor 종료
    }
    Thread.sleep(3000)
}

main()

수신메시지 : Receive
[ActorCoroutine{Active}@1cbed399, Dispatchers.Default]
수신 DefaultDispatcher-worker-2
수신메시지 : Send
[ActorCoroutine{Active}@1cbed399, Dispatchers.Default]
송신 DefaultDispatcher-worker-2
송신되는 값 = 100


## 봉인클래스를 정의해서 메시지 전수송 처리하기

In [12]:
sealed class Message {                                          // 봉인클래스 정의
    class Increment(val value: Int) : Message()                 // 수신처리 클래스 정의
    class GetValue(val deferred: CompletableDeferred<String>)   // 송신처리 클래스 정의  
                                                : Message()     // 송신을 위한 deferred 객체 전닯
}

In [13]:
fun CoroutineScope.channelActor() = actor<Message> {            // 코루틴스코프 확장함수로 액터 빌더 정의
    var counter = 0
    for (message in channel) {
        println("전송 받은 메시지 : " + message::class.simpleName)   // 수신받은 메시지 출력
        when(message) {
            is Message.Increment                                 // 메시지 수신처리
                         -> counter += message.value
            is Message.GetValue                                  // 메시지 송신처리
                         -> message.deferred
                                .complete(counter.toString()
                                          +" 액터")
        }
    }
}

fun main() = runBlocking<Unit> {
    val msgchannel = channelActor()                                 // 액터 빌드하기 
    
    GlobalScope.launch {                                            // 전역스코프에서 액터에 전송                      
        msgchannel.send(Message.Increment(1))
        msgchannel.send(Message.Increment(2))
        msgchannel.send(Message.Increment(200))
    }

    launch {                                       
        val deferred = CompletableDeferred<String>()               // 메소드 받기 위한 deferred 처리
        msgchannel.send(Message.GetValue(deferred))                // 액터로 메시지 전송해서 수신처리
        println("전송받은 메시지 : " + deferred.await())               // 전송을 받은 메시지 처리
    }

    delay(3000)
    msgchannel.close()
}

main()

전송 받은 메시지 : Increment
전송 받은 메시지 : Increment
전송 받은 메시지 : Increment
전송 받은 메시지 : GetValue
전송받은 메시지 : 203 액터
