diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeImpl.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeImpl.kt index 6bed38ceb..2f641fb83 100644 --- a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeImpl.kt +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeImpl.kt @@ -30,6 +30,8 @@ import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.isActive import kotlinx.coroutines.job import kotlinx.coroutines.launch +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import kotlinx.serialization.json.buildJsonObject internal class RealtimeImpl(override val supabaseClient: SupabaseClient, override val config: Realtime.Config) : Realtime { @@ -41,6 +43,7 @@ internal class RealtimeImpl(override val supabaseClient: SupabaseClient, overrid override val subscriptions: Map get() = _subscriptions.toMap() private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob()) + private val mutex = Mutex() var heartbeatJob: Job? = null var messageJob: Job? = null var ref by atomic(0) @@ -56,7 +59,7 @@ internal class RealtimeImpl(override val supabaseClient: SupabaseClient, overrid override suspend fun connect() = connect(false) - suspend fun connect(reconnect: Boolean) { + suspend fun connect(reconnect: Boolean): Unit = mutex.withLock { if (reconnect) { delay(config.reconnectDelay) Logger.d("Realtime") { "Reconnecting..." }