目次
コルーチンは、Dispatchers.Defaultのようなマルチスレッドディスパッチャーを使用して並行に実行できます。 これは、すべての通常の並行処理の問題を提起します。 主な問題は、共有ミュータブルステートへのアクセスの同期です。 コルーチンの世界でのこの問題に対するいくつかの解決策は、マルチスレッドの世界の解決策と似ていますが、他は独自のものです。
同じアクションを1000回実行する100個のコルーチンを起動しましょう。 さらに比較するために、完了時間を測定します。
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 起動するコルーチンの数
val k = 1000 // 各コルーチンによってアクションが繰り返される回数
val time = measureTimeMillis {
coroutineScope { // コルーチンのスコープ
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
マルチスレッドのDispatchers.Defaultを使用して共有ミュータブル変数をインクリメントする非常に単純なアクションから始めます。
import kotlinx.coroutines.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 起動するコルーチンの数
val k = 1000 // 各コルーチンによってアクションが繰り返される回数
val time = measureTimeMillis {
coroutineScope { // コルーチンのスコープ
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
//sampleStart
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
//sampleEnd
ここで完全なコードを取得できます
最後に何をプリントしますか? 100個のコルーチンが同期せずに複数のスレッドから並行して counter
をインクリメントするため、"Counter = 100000" を出力することはほとんどありません。
変数を volatile
にすると並行性の問題が解決されるという誤解が一般的です。 それを試してみましょう。
import kotlinx.coroutines.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
coroutineScope { // scope for coroutines
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
//sampleStart
@Volatile // Kotlinの `volatile` はアノテーション
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
//sampleEnd
ここで完全なコードを取得できます
このコードはより遅く動作しますが、volatile変数は対応する変数の線形(専門用語で「アトミック」)読み書きを保証するものの、より大きなアクション(この場合はインクリメント)のアトミック性を提供しないため、最後に「Counter = 100000」を得られません。
スレッドとコルーチンの両方で動作する一般的なソリューションは、共有状態で実行する操作に必要なすべての同期を提供するスレッドセーフ(別名、同期、線形化、またはアトミック)データ構造を使用することです。
単純なカウンタの場合、アトミックな incrementAndGet
操作を持つ AtomicInteger
クラスを使うことができます。
import kotlinx.coroutines.*
import java.util.concurrent.atomic.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 起動するコルーチンの数
val k = 1000 // 各コルーチンによってアクションが繰り返される回数
val time = measureTimeMillis {
coroutineScope { // コルーチンのスコープ
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
//sampleStart
val counter = AtomicInteger()
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter.incrementAndGet()
}
}
println("Counter = $counter")
}
//sampleEnd
ここで完全なコードを取得できます
これは、この特定の問題に対する最速の解決策です。 単純なカウンター、コレクション、キュー、その他の標準的なデータ構造とそれらの基本的な操作では機能します。 ただし、複雑な状態やすぐに使用できるスレッドセーフな実装を持たない複雑な操作には、容易に拡張できません。
スレッド制約 は、特定の共有状態へのすべてのアクセスが1つのスレッドに限定されている、共有ミュータブルステートの問題への提案です。 これは通常、すべてのUI状態が単一のイベントディスパッチ/アプリケーションスレッドに限定されるUIアプリケーションで使用されます。 単一スレッドのコンテキストを使用してコルーチンで簡単に適用できます。
import kotlinx.coroutines.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 起動するコルーチンの数
val k = 1000 // 各コルーチンによってアクションが繰り返される回数
val time = measureTimeMillis {
coroutineScope { // コルーチンのスコープ
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
//sampleStart
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// それぞれのインクリメントをシングルスレッドコンテキストに限定する
withContext(counterContext) {
counter++
}
}
}
println("Counter = $counter")
}
//sampleEnd
ここで完全なコードを取得できます
このコードは 細粒度 のスレッド制約を行うため、非常にゆっくりと動作します。 個々のインクリメントはwithContext(counterContext)ブロックを使用してマルチスレッドのDispatchers.Defaultコンテキストからシングルスレッドのコンテキストに切り替わります。
現実にはスレッド制約は大きなチャンクで行われます。例えば、状態を更新するビジネスロジックの大きな部分は単一のスレッドに限定されます。 次の例では、そのようにしてシングルスレッドコンテキストで各コルーチンを起動して実行します。
import kotlinx.coroutines.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 起動するコルーチンの数
val k = 1000 // 各コルーチンによってアクションが繰り返される回数
val time = measureTimeMillis {
coroutineScope { // コルーチンのスコープ
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
//sampleStart
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
// すべてをシングルスレッドコンテキストに限定する
withContext(counterContext) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
//sampleEnd
ここで完全なコードを取得できます
これで、はるかに高速に動作し正しい結果が得られます。
この問題に対する排他制御の解決策は、決して並行に実行されない クリティカルセクション で共有状態のすべての変更を保護することです。
ブロックする世界では通常 synchronized
または ReentrantLock
を使用します。
コルーチンの代案はMutexと呼ばれています。
それはクリティカルセクションを区切るlockとunlock関数を持っています。
主な違いは、 Mutex.lock()
はサスペンド関数であることです。
これはスレッドをブロックしません。
mutex.lock(); try { ... } finally { mutex.unlock() }
パターンを表す便利なwithLock拡張関数もあります。
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 起動するコルーチンの数
val k = 1000 // 各コルーチンによってアクションが繰り返される回数
val time = measureTimeMillis {
coroutineScope { // コルーチンのスコープ
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
//sampleStart
val mutex = Mutex()
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// ロックで各インクリメントを保護する
mutex.withLock {
counter++
}
}
}
println("Counter = $counter")
}
//sampleEnd
ここで完全なコードを取得できます
この例でのロックは細粒度なので、代償を払っています。 ただし、一部の共有状態を定期的に変更する必要がある場合に適していますが、この状態に限定される自然なスレッドはありません。
actorは、コルーチン、このコルーチンに閉じ込められカプセル化された状態、および他のコルーチンと通信するためのチャンネルの組み合わせで構成されるエンティティです 。 単純なアクターは関数として記述できますが、複雑な状態のアクターはクラスに適しています。
actorコルーチンビルダーがアクターのメールボックスチャネルをメッセージを受信するスコープに結合し、 結果のジョブオブジェクトに送信チャネルを結合するので、アクターへの単一の参照をそのハンドルとして持ち運ぶことができます。
アクターを使用する最初のステップは、アクターが処理するメッセージのクラスを定義することです。
Kotlinのシールドクラスはその目的に適しています。
カウンタをインクリメントする IncCounter
メッセージと、その値を取得する GetCounter
メッセージを持つ CounterMsg
シールドクラスを定義します。
後で応答を送信する必要があります。 将来知られる(通信される)単一の値を表すCompletableDeferred通信プリミティブは、その目的のためにここで使用されます。
// counterActorのメッセージ型
sealed class CounterMsg
object IncCounter : CounterMsg() // カウンターをインクリメントする一方向のメッセージ
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 返信を持ったリクエスト
次に、actorコルーチンビルダーを使用してアクターを起動する関数を定義します。
// この関数は、新しいカウンタアクターを起動する
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // アクターの状態
for (msg in channel) { // 受信メッセージを反復処理する
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
メインコードは簡単です。
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlin.system.*
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 起動するコルーチンの数
val k = 1000 // 各コルーチンによってアクションが繰り返される回数
val time = measureTimeMillis {
coroutineScope { // コルーチンのスコープ
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
// counterActorのメッセージ型
sealed class CounterMsg
object IncCounter : CounterMsg() // カウンターをインクリメントする一方向メッセージ
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 返信付きのリクエスト
// この関数は、新しいカウンターアクターを起動する
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // アクターの状態
for (msg in channel) { // 着信メッセージを反復処理する
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
//sampleStart
fun main() = runBlocking<Unit> {
val counter = counterActor() // アクターを作る
withContext(Dispatchers.Default) {
massiveRun {
counter.send(IncCounter)
}
}
// アクターからカウンター値を得るためのメッセージを送る
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // アクターを終了する
}
//sampleEnd
ここで完全なコードを取得できます
アクター自体がどのようなコンテキストで実行されるかは(正確さにおいて)問題ではありません。 アクターはコルーチンであり、コルーチンはシーケンシャルに実行されるため、状態を特定のコルーチンに限定することは、共有ミュータブルステートの問題の解決策として機能します。 実際、アクターは自分のプライベートな状態を変更できますが、メッセージを介してのみ相互に影響を与えることができます(ロックの必要性を回避します)。
この場合、常に実行する作業があり別のコンテキストに切り替える必要がないため、負荷の下ではロックよりもアクターのほうが効率的です。
actorコルーチンビルダーは二重のproduceコルーチンビルダーであることに注意してください。 アクターはメッセージを受信するチャネルに関連付けられ、プロデューサーは要素を送信するチャネルに関連付けられます。