Skip to content

Commit

Permalink
Merge pull request #2 from rcardin/moving_kbehavior_factory_methods
Browse files Browse the repository at this point in the history
Created a KBehaviorScope and moved the factory methods as its extension functions
  • Loading branch information
rcardin committed May 3, 2023
2 parents 3d3af24 + 82717b4 commit 416cdfe
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 34 deletions.
27 changes: 15 additions & 12 deletions core/src/main/kotlin/in/rcard/kactor/KActor.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import kotlin.coroutines.CoroutineContext
internal class KActor<T>(
name: String,
private val receiveChannel: Channel<T>,
scope: CoroutineScope
scope: CoroutineScope,
) {

private val ctx: KActorContext<T> =
Expand Down Expand Up @@ -60,7 +60,7 @@ internal class KActor<T>(

private suspend fun nextBehavior(
newBehavior: KBehavior<T>,
behavior: KBehavior<T>
behavior: KBehavior<T>,
) {
when (newBehavior) {
is KBehaviorSame -> {
Expand All @@ -77,28 +77,28 @@ internal class KActor<T>(
class KActorContext<T> internal constructor(
val self: KActorRef<T>,
name: String,
internal val scope: CoroutineScope = CoroutineScope(SupervisorJob())
internal val scope: CoroutineScope = CoroutineScope(SupervisorJob()),
) {
val log: Logger = LoggerFactory.getLogger(name)
}

fun <T> KActorContext<*>.spawn(
name: String,
behavior: KBehavior<T>,
finally: ((ex: Throwable?) -> Unit)? = null
finally: ((ex: Throwable?) -> Unit)? = null,
): KActorRef<T> {
return spawnKActor(
name,
behavior,
scope,
buildContext(name, behavior),
finally
finally,
)
}

private fun buildContext(
name: String,
behavior: KBehavior<*>
behavior: KBehavior<*>,
): CoroutineContext {
val job = resolveJob(behavior)
val dispatcher = resolveDispatcher(behavior)
Expand All @@ -118,15 +118,18 @@ private fun <T> resolveJob(behavior: KBehavior<T>): CoroutineContext =
}

fun <T> resolveDispatcher(behavior: KBehavior<T>): CoroutineContext =
if (behavior.blocking) Dispatchers.IO
else Dispatchers.Default
if (behavior.blocking) {
Dispatchers.IO
} else {
Dispatchers.Default
}

fun <T> CoroutineScope.kactorSystem(behavior: KBehavior<T>): KActorRef<T> {
return spawnKActor(
"kactor-system",
behavior,
this,
CoroutineName("kactor-system") + MDCContext(mapOf("kactor" to "kactor-system"))
CoroutineName("kactor-system") + MDCContext(mapOf("kactor" to "kactor-system")),
)
}

Expand All @@ -135,14 +138,14 @@ private fun <T> spawnKActor(
behavior: KBehavior<T>,
scope: CoroutineScope,
context: CoroutineContext,
finally: ((ex: Throwable?) -> Unit)? = null
finally: ((ex: Throwable?) -> Unit)? = null,
): KActorRef<T> {
val mailbox = Channel<T>(capacity = Channel.UNLIMITED)
val job = scope.launch(context) {
val actor = KActor(name, mailbox, this)
actor.run(behavior)
}
finally?.run { job.invokeOnCompletion(this) }
finally?.apply { job.invokeOnCompletion(this) }
return KActorRef(mailbox)
}

Expand Down Expand Up @@ -171,7 +174,7 @@ fun <T> KActorContext<*>.router(name: String, poolSize: Int, behavior: KBehavior
fun <T, R> CoroutineScope.ask(
toKActorRef: KActorRef<T>,
timeoutInMillis: Long = 1000L,
msgFactory: (ref: KActorRef<R>) -> T
msgFactory: (ref: KActorRef<R>) -> T,
): Deferred<R> {
val mailbox = Channel<R>(capacity = Channel.RENDEZVOUS)
val result = async {
Expand Down
37 changes: 20 additions & 17 deletions core/src/main/kotlin/in/rcard/kactor/KBehavior.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ internal object KBehaviorSame : KBehavior<Nothing>

internal object KBehaviorStop : KBehavior<Nothing>

internal class KBehaviorExtension<T>(private val receivedBehaviour: suspend (ctx: KActorContext<T>, msg: T) -> KBehavior<T>) :
internal class KBehaviorExtension<T>(private val receivedBehaviour: suspend KBehaviorScope.(ctx: KActorContext<T>, msg: T) -> KBehavior<T>) :
KBehavior<T> {
suspend fun receive(ctx: KActorContext<T>, msg: T): KBehavior<T> {
return receivedBehaviour(ctx, msg)
}
suspend fun receive(ctx: KActorContext<T>, msg: T): KBehavior<T> =
KBehaviorScope().receivedBehaviour(ctx, msg)
}

internal class KBehaviorSetup<T>(private val setupBehavior: suspend (ctx: KActorContext<T>) -> KBehavior<T>) :
Expand All @@ -30,43 +29,47 @@ internal abstract class KBehaviorDecorator<T>(internal val decorated: KBehavior<

internal class KBehaviorSupervised<T>(
supervisedBehavior: KBehavior<T>,
val strategy: SupervisorStrategy = SupervisorStrategy.ESCALATE
val strategy: SupervisorStrategy = SupervisorStrategy.ESCALATE,
) : KBehaviorDecorator<T>(supervisedBehavior)

enum class SupervisorStrategy {
STOP,
ESCALATE
ESCALATE,
}

internal class KBehaviorBlocking<T>(decorated: KBehavior<T>) : KBehaviorDecorator<T>(decorated) {
override val blocking: Boolean
get() = true
}

internal class KBehaviorWithTimers<T>(internal val timedBehavior: suspend (timer: TimerScheduler<T>) -> KBehavior<T>) : KBehavior<T>
internal class KBehaviorWithTimers<T>(internal val timedBehavior: suspend (timer: TimerScheduler<T>) -> KBehavior<T>) :
KBehavior<T>

fun <T> setup(behavior: suspend (ctx: KActorContext<T>) -> KBehavior<T>): KBehavior<T> =
fun <T> setup(behavior: suspend KBehaviorScope.(ctx: KActorContext<T>) -> KBehavior<T>): KBehavior<T> =
KBehaviorSetup { ctx ->
behavior(ctx)
KBehaviorScope().behavior(ctx)
}

fun <T> receive(receivedBehaviour: suspend (ctx: KActorContext<T>, msg: T) -> KBehavior<T>): KBehavior<T> =
fun <T> receive(receivedBehaviour: suspend KBehaviorScope.(ctx: KActorContext<T>, msg: T) -> KBehavior<T>): KBehavior<T> =
KBehaviorExtension(receivedBehaviour)

fun <T> receiveMessage(receivedBehaviour: suspend (msg: T) -> KBehavior<T>): KBehavior<T> =
fun <T> receiveMessage(receivedBehaviour: suspend KBehaviorScope.(msg: T) -> KBehavior<T>): KBehavior<T> =
KBehaviorExtension { _, msg ->
receivedBehaviour(msg)
}

@Suppress("UNCHECKED_CAST")
fun <T> same(): KBehavior<T> = KBehaviorSame as KBehavior<T>
class KBehaviorScope internal constructor()

@Suppress("UNCHECKED_CAST", "UnusedReceiverParameter")
fun <T> KBehaviorScope.same(): KBehavior<T> = KBehaviorSame as KBehavior<T>

@Suppress("UNCHECKED_CAST")
fun <T> stopped(): KBehavior<T> = KBehaviorStop as KBehavior<T>
@Suppress("UNCHECKED_CAST", "UnusedReceiverParameter")
fun <T> KBehaviorScope.stopped(): KBehavior<T> = KBehaviorStop as KBehavior<T>

fun <T> supervise(
@Suppress("UnusedReceiverParameter")
fun <T> KBehaviorScope.supervise(
supervisedBehavior: KBehavior<T>,
withStrategy: SupervisorStrategy
withStrategy: SupervisorStrategy,
): KBehavior<T> =
KBehaviorSupervised(supervisedBehavior, withStrategy)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object ExceptionHandling {
repeat(1000) {
val ref = ctx.spawn(
"kactor_$it",
supervise(PrintCount.behavior, withStrategy = SupervisorStrategy.STOP)
supervise(PrintCount.behavior, withStrategy = SupervisorStrategy.STOP),
)
ref `!` PrintCount.Count(it)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ object FinallyPattern {
val kRef = ctx.spawn(
"resKactor",
ResourceKActor.behavior(res),
finally = { res.close() }
finally = { res.close() },
)
kRef `!` ResourceKActor.UseIt
same()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import `in`.rcard.kactor.KBehavior
import `in`.rcard.kactor.TimerScheduler
import `in`.rcard.kactor.kactorSystem
import `in`.rcard.kactor.receive
import `in`.rcard.kactor.setup
import `in`.rcard.kactor.stopped
import `in`.rcard.kactor.withTimers
import kotlinx.coroutines.coroutineScope
Expand All @@ -21,12 +20,11 @@ object TimersExample {
object TimerKey
object Tick

fun behavior(): KBehavior<Tick> = setup { _ ->
fun behavior(): KBehavior<Tick> =
withTimers { timers ->
timers.startSingleTimer(TimerKey, Tick, 1.seconds)
processTick(0, timers)
}
}

private fun processTick(counter: Int, timers: TimerScheduler<Tick>): KBehavior<Tick> =
receive { ctx, _ ->
Expand Down

0 comments on commit 416cdfe

Please sign in to comment.