Skip to content

Commit

Permalink
ActorRegistry :: Using kotlinx.coroutines.sync.Mutex to prevent concu…
Browse files Browse the repository at this point in the history
…rrent operations in the registry.
  • Loading branch information
smyrgeorge committed Jun 7, 2024
1 parent c3864d0 commit 86b9a93
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 35 deletions.
5 changes: 5 additions & 0 deletions actor4k/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ java {
withSourcesJar()
}

// Disable this task, because the protobuf plugin generates too many warnings.
tasks.withType<Javadoc> {
enabled = false
}

publishing {
repositories {
maven {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.time.Instant
import kotlin.reflect.KClass

Expand Down Expand Up @@ -57,15 +58,15 @@ object ActorRegistry {

// Limit the concurrent access to one at a time.
// This is critical, because we need to ensure that only one Actor (with the same key) will be created.
mutex.lock()
val ref: Actor.Ref = mutex.withLock {

val address = Actor.addressOf(actor, key)
// Calculate the actor address.
val address: String = Actor.addressOf(actor, key)

// Check if the actor already exists in the local storage.
local[address]?.let { return it.ref() }
// Check if the actor already exists in the local storage.
local[address]?.let { return it.ref() }

// Create Local/Remote actor.
val ref: Actor.Ref =
// Create Local/Remote actor.
if (ActorSystem.clusterMode
&& ActorSystem.cluster.nodeOf(shard).dc != ActorSystem.cluster.conf.alias
) {
Expand All @@ -76,9 +77,11 @@ object ActorRegistry {
// Case Remote.
// Forward the [Envelope.Spawn] message to the correct cluster node.
val msg = Envelope.GetActor(shard, actor.name, key)
ActorSystem.cluster.msg(msg).getOrThrow<Envelope.GetActor.Ref>().toRef(shard).also {
remote[address] = it
}
ActorSystem.cluster
.msg(msg)
.getOrThrow<Envelope.GetActor.Ref>()
.toRef(shard)
.also { remote[address] = it }
} else {
// Case Local.
// Spawn the actor.
Expand All @@ -97,9 +100,9 @@ object ActorRegistry {

a.ref()
}
}

// Drop the lock here.
mutex.unlock()
log.debug { "Actor $ref created." }

return ref
}
Expand All @@ -123,37 +126,45 @@ object ActorRegistry {
?: get(ref.actor, ref.key).let { local[ref.address]!! }
}

fun <A : Actor> unregister(actor: Class<A>, key: String) {
suspend fun <A : Actor> unregister(actor: Class<A>, key: String) {
val address = Actor.addressOf(actor, key)
local[address]?.let {
if (it.status() != Actor.Status.FINISHED) error("Cannot unregister $address while is ${it.status()}.")
local.remove(address)
ShardManager.operation(ShardManager.Op.UNREGISTER, it.shard)
mutex.withLock {
local[address]?.let {
if (it.status() != Actor.Status.FINISHED) error("Cannot unregister $address while is ${it.status()}.")
local.remove(address)
ShardManager.operation(ShardManager.Op.UNREGISTER, it.shard)
log.debug { "Unregistered actor $address." }
}
}
}

suspend fun stopAll(): Unit = mutex.withLock {
log.debug { "Stop all local actors. Total: ${local.size}" }
local.values.chunked(local.size, 4).forEachParallel { l ->
l.forEach { it.stop() }
}
}

suspend fun stopAll(): Unit =
local.values.chunked(local.size, 4)
.forEachParallel { l -> l.forEach { it.stop() } }

private suspend fun stopLocalExpired(): Unit =
local.values.chunked(local.size, 4)
.forEachParallel { l ->
l.forEach {
val df = Instant.now().epochSecond - it.stats().last.epochSecond
if (df > ActorSystem.Conf.actorExpiration.inWholeSeconds) {
log.debug { "Closing ${it.address()} (expired)." }
it.stop()
}
private suspend fun stopLocalExpired(): Unit = mutex.withLock {
log.debug { "Stop all local expired actors. Total: ${local.size}" }
local.values.chunked(local.size, 4).forEachParallel { l ->
l.forEach {
val df = Instant.now().epochSecond - it.stats().last.epochSecond
if (df > ActorSystem.Conf.actorExpiration.inWholeSeconds) {
log.debug { "Closing ${it.address()} (expired)." }
it.stop()
}
}
}
}

private suspend fun removeRemoteExpired(): Unit =
remote.values
.chunked(remote.size, 4)
.forEachParallel { l -> l.forEach { if (Instant.now().isAfter(it.exp)) remote.remove(it.address) } }
private suspend fun removeRemoteExpired(): Unit = mutex.withLock {
log.debug { "Removing all remote expired actors. Total: ${remote.size}" }
remote.values.chunked(remote.size, 4).forEachParallel { l ->
l.forEach { if (Instant.now().isAfter(it.exp)) remote.remove(it.address) }
}
}

fun count(): Int = local.count()

fun asJava(): JActorRegistry = JActorRegistry
}
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
group = "io.github.smyrgeorge"
version = "0.5.0"
version = "0.5.1"

// https://mvnrepository.com/artifact/io.grpc/grpc-api
val grpcVersion: String by extra { "1.60.1" }
Expand Down

0 comments on commit 86b9a93

Please sign in to comment.