From 4ec18ba77e8041d2f40d769dd312b40f9384c388 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 27 Mar 2025 15:59:33 +0200 Subject: [PATCH 1/8] Use single supervisor job for sync --- .../com/powersync/db/PowerSyncDatabaseImpl.kt | 61 +++++++++---------- 1 file changed, 30 insertions(+), 31 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index ccde3ae7..33300e14 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -27,6 +27,7 @@ import com.powersync.utils.toJsonObject import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.Flow @@ -96,8 +97,7 @@ internal class PowerSyncDatabaseImpl( private val mutex = Mutex() private var syncStream: SyncStream? = null - private var syncJob: Job? = null - private var uploadJob: Job? = null + private var syncSupervisorJob: Job? = null // This is set in the init private lateinit var powerSyncVersion: String @@ -164,9 +164,10 @@ internal class PowerSyncDatabaseImpl( this.syncStream = stream val db = this - - syncJob = - scope.launch { + val job = SupervisorJob() + syncSupervisorJob = job + scope.launch(job) { + launch { // Get a global lock for checking mutex maps val streamMutex = resource.group.syncMutex @@ -181,7 +182,7 @@ internal class PowerSyncDatabaseImpl( // (The tryLock should throw if this client already holds the lock). logger.w(streamConflictMessage) } - } catch (ex: IllegalStateException) { + } catch (_: IllegalStateException) { logger.e { "The streaming sync client did not disconnect before connecting" } } @@ -200,26 +201,25 @@ internal class PowerSyncDatabaseImpl( } } - scope.launch { - syncStream!!.status.asFlow().collect { - currentStatus.update( - connected = it.connected, - connecting = it.connecting, - uploading = it.uploading, - downloading = it.downloading, - lastSyncedAt = it.lastSyncedAt, - hasSynced = it.hasSynced, - uploadError = it.uploadError, - downloadError = it.downloadError, - clearDownloadError = it.downloadError == null, - clearUploadError = it.uploadError == null, - priorityStatusEntries = it.priorityStatusEntries, - ) + launch { + stream.status.asFlow().collect { + currentStatus.update( + connected = it.connected, + connecting = it.connecting, + uploading = it.uploading, + downloading = it.downloading, + lastSyncedAt = it.lastSyncedAt, + hasSynced = it.hasSynced, + uploadError = it.uploadError, + downloadError = it.downloadError, + clearDownloadError = it.downloadError == null, + clearUploadError = it.uploadError == null, + priorityStatusEntries = it.priorityStatusEntries, + ) + } } - } - uploadJob = - scope.launch { + launch { internalDb .updatesOnTables() .filter { it.contains(InternalTable.CRUD.toString()) } @@ -228,6 +228,7 @@ internal class PowerSyncDatabaseImpl( syncStream!!.triggerCrudUpload() } } + } } override suspend fun getCrudBatch(limit: Int): CrudBatch? { @@ -364,12 +365,10 @@ internal class PowerSyncDatabaseImpl( override suspend fun disconnect() = mutex.withLock { disconnectInternal() } private suspend fun disconnectInternal() { - if (syncJob != null && syncJob!!.isActive) { - syncJob?.cancelAndJoin() - } - - if (uploadJob != null && uploadJob!!.isActive) { - uploadJob?.cancelAndJoin() + val syncJob = syncSupervisorJob + if (syncJob != null && syncJob.isActive) { + syncJob.cancelAndJoin() + syncSupervisorJob = null } if (syncStream != null) { @@ -470,7 +469,7 @@ internal class PowerSyncDatabaseImpl( /** * Check that a supported version of the powersync extension is loaded. */ - private suspend fun checkVersion(powerSyncVersion: String) { + private fun checkVersion(powerSyncVersion: String) { // Parse version val versionInts: List = try { From 631db90707f33c87c5b94b0b01b32da73767b7d5 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 27 Mar 2025 16:55:35 +0200 Subject: [PATCH 2/8] Use structured concurrency for sync jobs --- .../kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 33300e14..84083fd7 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -24,11 +24,11 @@ import com.powersync.utils.JsonParam import com.powersync.utils.JsonUtil import com.powersync.utils.throttle import com.powersync.utils.toJsonObject +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob -import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.filter @@ -164,7 +164,7 @@ internal class PowerSyncDatabaseImpl( this.syncStream = stream val db = this - val job = SupervisorJob() + val job = SupervisorJob(scope.coroutineContext[Job]) syncSupervisorJob = job scope.launch(job) { launch { @@ -367,7 +367,8 @@ internal class PowerSyncDatabaseImpl( private suspend fun disconnectInternal() { val syncJob = syncSupervisorJob if (syncJob != null && syncJob.isActive) { - syncJob.cancelAndJoin() + syncJob.cancel(CancellationException("disconnect() called")) + syncJob.join() syncSupervisorJob = null } From c331ce16528e8e487078ab8a90696322e3d37923 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 31 Mar 2025 09:28:09 +0200 Subject: [PATCH 3/8] Fix some tests --- .../com/powersync/SyncIntegrationTest.kt | 29 +++++- .../com/powersync/db/PowerSyncDatabaseImpl.kt | 25 +++--- .../kotlin/com/powersync/sync/SyncStatus.kt | 1 + .../com/powersync/sync/SyncStreamTest.kt | 2 +- .../powersync/testutils/MockSyncService.kt | 90 ++++++++++++------- 5 files changed, 97 insertions(+), 50 deletions(-) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt index 2bf2e4bb..d2fe0cbc 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt @@ -27,9 +27,9 @@ import com.powersync.utils.JsonUtil import dev.mokkery.answering.returns import dev.mokkery.everySuspend import dev.mokkery.mock -import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.CompletableDeferred +import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.runTest import kotlinx.serialization.encodeToString @@ -99,8 +99,8 @@ class SyncIntegrationTest { dbFilename = "testdb", ) as PowerSyncDatabaseImpl - private fun CoroutineScope.syncStream(): SyncStream { - val client = MockSyncService.client(this, syncLines.receiveAsFlow()) + private fun syncStream(): SyncStream { + val client = MockSyncService(syncLines) return SyncStream( bucketStorage = database.bucketStorage, connector = connector, @@ -117,6 +117,27 @@ class SyncIntegrationTest { assertEquals(amount, users.size, "Expected $amount users, got $users") } + @Test + @OptIn(DelicateCoroutinesApi::class) + fun closesResponseStreamOnDisconnect() = runTest { + val syncStream = syncStream() + database.connectInternal(syncStream, 1000L) + + turbineScope(timeout = 10.0.seconds) { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected } + + database.close() + turbine.waitFor { !it.connected } + turbine.cancel() + } + + // Closing the database should close the channel + val channelClose = CompletableDeferred() + syncLines.invokeOnClose { channelClose.complete(Unit) } + channelClose.await() + } + @Test fun testPartialSync() = runTest { diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 84083fd7..3525d702 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -96,7 +96,6 @@ internal class PowerSyncDatabaseImpl( override val currentStatus: SyncStatus = SyncStatus() private val mutex = Mutex() - private var syncStream: SyncStream? = null private var syncSupervisorJob: Job? = null // This is set in the init @@ -123,7 +122,7 @@ internal class PowerSyncDatabaseImpl( override suspend fun updateSchema(schema: Schema) = runWrappedSuspending { mutex.withLock { - if (this.syncStream != null) { + if (this.syncSupervisorJob != null) { throw PowerSyncException( "Cannot update schema while connected", cause = Exception("PowerSync client is already connected"), @@ -161,8 +160,6 @@ internal class PowerSyncDatabaseImpl( stream: SyncStream, crudThrottleMs: Long, ) { - this.syncStream = stream - val db = this val job = SupervisorJob(scope.coroutineContext[Job]) syncSupervisorJob = job @@ -195,7 +192,7 @@ internal class PowerSyncDatabaseImpl( // We have a lock if we reached here try { ensureActive() - syncStream!!.streamingSync() + stream.streamingSync() } finally { streamMutex.unlock(db) } @@ -225,10 +222,16 @@ internal class PowerSyncDatabaseImpl( .filter { it.contains(InternalTable.CRUD.toString()) } .throttle(crudThrottleMs) .collect { - syncStream!!.triggerCrudUpload() + stream.triggerCrudUpload() } } } + + job.invokeOnCompletion { + if (it is DisconnectRequestedException) { + stream.invalidateCredentials() + } + } } override suspend fun getCrudBatch(limit: Int): CrudBatch? { @@ -367,16 +370,12 @@ internal class PowerSyncDatabaseImpl( private suspend fun disconnectInternal() { val syncJob = syncSupervisorJob if (syncJob != null && syncJob.isActive) { - syncJob.cancel(CancellationException("disconnect() called")) + // Using this exception type will also make the sync job invalidate credentials. + syncJob.cancel(DisconnectRequestedException) syncJob.join() syncSupervisorJob = null } - if (syncStream != null) { - syncStream?.invalidateCredentials() - syncStream = null - } - currentStatus.update( connected = false, connecting = false, @@ -488,3 +487,5 @@ internal class PowerSyncDatabaseImpl( } } } + +internal object DisconnectRequestedException: CancellationException("disconnect() called") diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt index ce4fde57..d0300611 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt @@ -5,6 +5,7 @@ import com.powersync.connectors.PowerSyncBackendConnector import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.flow.cancel import kotlinx.datetime.Instant @ConsistentCopyVisibility diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt index 30588d47..60a6b8ad 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -210,7 +210,7 @@ class SyncStreamTest { // TODO: It would be neat if we could use in-memory sqlite instances instead of mocking everything // Revisit https://github.com/powersync-ja/powersync-kotlin/pull/117/files at some point val syncLines = Channel() - val client = MockSyncService.client(this, syncLines.receiveAsFlow()) + val client = MockSyncService(syncLines) syncStream = SyncStream( diff --git a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt index eb57a232..edd1bbea 100644 --- a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt +++ b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt @@ -4,53 +4,77 @@ import app.cash.turbine.ReceiveTurbine import com.powersync.sync.SyncLine import com.powersync.sync.SyncStatusData import com.powersync.utils.JsonUtil -import io.ktor.client.engine.HttpClientEngine -import io.ktor.client.engine.mock.MockEngine -import io.ktor.client.engine.mock.MockRequestHandleScope -import io.ktor.client.engine.mock.respond -import io.ktor.client.engine.mock.respondBadRequest +import io.ktor.client.engine.HttpClientEngineBase +import io.ktor.client.engine.HttpClientEngineCapability +import io.ktor.client.engine.HttpClientEngineConfig +import io.ktor.client.engine.callContext +import io.ktor.client.plugins.HttpTimeoutCapability import io.ktor.client.request.HttpRequestData import io.ktor.client.request.HttpResponseData -import io.ktor.utils.io.ByteChannel +import io.ktor.http.HttpProtocolVersion +import io.ktor.http.HttpStatusCode +import io.ktor.http.headersOf +import io.ktor.util.date.GMTDate +import io.ktor.utils.io.InternalAPI import io.ktor.utils.io.writeStringUtf8 +import io.ktor.utils.io.writer import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.launch +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.consumeEach import kotlinx.serialization.encodeToString -internal class MockSyncService private constructor( - private val scope: CoroutineScope, - private val lines: Flow, -) { - private fun handleRequest( - scope: MockRequestHandleScope, - request: HttpRequestData, - ): HttpResponseData = - if (request.url.encodedPath == "/sync/stream") { - val channel = ByteChannel(autoFlush = true) - this.scope.launch { - lines.collect { +/** + * A mock HTTP engine providing sync lines read from a coroutines [ReceiveChannel]. + * + * Note that we can't trivially use ktor's `MockEngine` here because that engine requires a non-suspending handler + * function which makes it very hard to cancel the channel when the sync client closes the request stream. That is + * precisely what we may want to test though. + */ +internal class MockSyncService( + private val lines: ReceiveChannel, +) : HttpClientEngineBase("sync-service") { + + override val config: HttpClientEngineConfig + get() = Config + + override val supportedCapabilities: Set> = setOf( + HttpTimeoutCapability, + ) + + @OptIn(InternalAPI::class) + override suspend fun execute(data: HttpRequestData): HttpResponseData { + val context = callContext() + val scope = CoroutineScope(context) + + return if (data.url.encodedPath == "/sync/stream") { + val job = scope.writer(autoFlush = true) { + lines.consumeEach { val serializedLine = JsonUtil.json.encodeToString(it) channel.writeStringUtf8("$serializedLine\n") } } - scope.respond(channel) + HttpResponseData( + HttpStatusCode.OK, + GMTDate(), + headersOf(), + HttpProtocolVersion.HTTP_1_1, + job.channel, + context, + ) } else { - scope.respondBadRequest() - } - - companion object { - fun client( - scope: CoroutineScope, - lines: Flow, - ): HttpClientEngine { - val service = MockSyncService(scope, lines) - return MockEngine { request -> - service.handleRequest(this, request) - } + HttpResponseData( + HttpStatusCode.BadRequest, + GMTDate(), + headersOf(), + HttpProtocolVersion.HTTP_1_1, + "", + context, + ) } } + + private object Config : HttpClientEngineConfig() } suspend inline fun ReceiveTurbine.waitFor(matcher: (SyncStatusData) -> Boolean) { From 6788119603ca1a612c2950bd91079cc0983afdce Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 31 Mar 2025 09:42:21 +0200 Subject: [PATCH 4/8] Fix flushing --- .../com/powersync/SyncIntegrationTest.kt | 51 ++++++++++++++++--- .../powersync/testutils/MockSyncService.kt | 3 +- 2 files changed, 47 insertions(+), 7 deletions(-) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt index d2fe0cbc..7d35469e 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt @@ -27,7 +27,7 @@ import com.powersync.utils.JsonUtil import dev.mokkery.answering.returns import dev.mokkery.everySuspend import dev.mokkery.mock -import kotlinx.coroutines.CompletableDeferred +import dev.mokkery.verify import kotlinx.coroutines.DelicateCoroutinesApi import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.runBlocking @@ -38,6 +38,7 @@ import kotlin.test.AfterTest import kotlin.test.BeforeTest import kotlin.test.Test import kotlin.test.assertEquals +import kotlin.test.assertFailsWith import kotlin.test.assertFalse import kotlin.test.assertNotNull import kotlin.test.assertTrue @@ -119,7 +120,7 @@ class SyncIntegrationTest { @Test @OptIn(DelicateCoroutinesApi::class) - fun closesResponseStreamOnDisconnect() = runTest { + fun closesResponseStreamOnDatabaseClose() = runTest { val syncStream = syncStream() database.connectInternal(syncStream, 1000L) @@ -132,10 +133,48 @@ class SyncIntegrationTest { turbine.cancel() } - // Closing the database should close the channel - val channelClose = CompletableDeferred() - syncLines.invokeOnClose { channelClose.complete(Unit) } - channelClose.await() + // Closing the database should have closed the channel + assertTrue { syncLines.isClosedForSend } + } + + @Test + @OptIn(DelicateCoroutinesApi::class) + fun cleansResourcesOnDisconnect() = runTest { + val syncStream = syncStream() + database.connectInternal(syncStream, 1000L) + + turbineScope(timeout = 10.0.seconds) { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected } + + database.disconnect() + turbine.waitFor { !it.connected } + turbine.cancel() + } + + // Disconnecting should have closed the channel + assertTrue { syncLines.isClosedForSend } + + // And called invalidateCredentials on the connector + verify { connector.invalidateCredentials() } + } + + @Test + fun cannotUpdateSchemaWhileConnected() = runTest { + val syncStream = syncStream() + database.connectInternal(syncStream, 1000L) + + turbineScope(timeout = 10.0.seconds) { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected } + turbine.cancel() + } + + assertFailsWith("Cannot update schema while connected") { + database.updateSchema(Schema()) + } + + database.close() } @Test diff --git a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt index edd1bbea..8e4ce6f1 100644 --- a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt +++ b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt @@ -47,10 +47,11 @@ internal class MockSyncService( val scope = CoroutineScope(context) return if (data.url.encodedPath == "/sync/stream") { - val job = scope.writer(autoFlush = true) { + val job = scope.writer { lines.consumeEach { val serializedLine = JsonUtil.json.encodeToString(it) channel.writeStringUtf8("$serializedLine\n") + channel.flush() } } From 29eb96852e954e422ce0d24f359d8818baefb997 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 31 Mar 2025 09:47:52 +0200 Subject: [PATCH 5/8] Reformat --- .../com/powersync/SyncIntegrationTest.kt | 87 ++++++++++--------- .../com/powersync/db/PowerSyncDatabaseImpl.kt | 2 +- .../kotlin/com/powersync/sync/SyncStatus.kt | 1 - .../com/powersync/sync/SyncStreamTest.kt | 1 - .../powersync/testutils/MockSyncService.kt | 21 ++--- 5 files changed, 57 insertions(+), 55 deletions(-) diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt index 7d35469e..cfefbf7b 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt @@ -120,62 +120,65 @@ class SyncIntegrationTest { @Test @OptIn(DelicateCoroutinesApi::class) - fun closesResponseStreamOnDatabaseClose() = runTest { - val syncStream = syncStream() - database.connectInternal(syncStream, 1000L) + fun closesResponseStreamOnDatabaseClose() = + runTest { + val syncStream = syncStream() + database.connectInternal(syncStream, 1000L) - turbineScope(timeout = 10.0.seconds) { - val turbine = database.currentStatus.asFlow().testIn(this) - turbine.waitFor { it.connected } + turbineScope(timeout = 10.0.seconds) { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected } - database.close() - turbine.waitFor { !it.connected } - turbine.cancel() - } + database.close() + turbine.waitFor { !it.connected } + turbine.cancel() + } - // Closing the database should have closed the channel - assertTrue { syncLines.isClosedForSend } - } + // Closing the database should have closed the channel + assertTrue { syncLines.isClosedForSend } + } @Test @OptIn(DelicateCoroutinesApi::class) - fun cleansResourcesOnDisconnect() = runTest { - val syncStream = syncStream() - database.connectInternal(syncStream, 1000L) + fun cleansResourcesOnDisconnect() = + runTest { + val syncStream = syncStream() + database.connectInternal(syncStream, 1000L) - turbineScope(timeout = 10.0.seconds) { - val turbine = database.currentStatus.asFlow().testIn(this) - turbine.waitFor { it.connected } + turbineScope(timeout = 10.0.seconds) { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected } - database.disconnect() - turbine.waitFor { !it.connected } - turbine.cancel() - } + database.disconnect() + turbine.waitFor { !it.connected } + turbine.cancel() + } - // Disconnecting should have closed the channel - assertTrue { syncLines.isClosedForSend } + // Disconnecting should have closed the channel + assertTrue { syncLines.isClosedForSend } - // And called invalidateCredentials on the connector - verify { connector.invalidateCredentials() } - } + // And called invalidateCredentials on the connector + verify { connector.invalidateCredentials() } + } @Test - fun cannotUpdateSchemaWhileConnected() = runTest { - val syncStream = syncStream() - database.connectInternal(syncStream, 1000L) - - turbineScope(timeout = 10.0.seconds) { - val turbine = database.currentStatus.asFlow().testIn(this) - turbine.waitFor { it.connected } - turbine.cancel() - } + fun cannotUpdateSchemaWhileConnected() = + runTest { + val syncStream = syncStream() + database.connectInternal(syncStream, 1000L) - assertFailsWith("Cannot update schema while connected") { - database.updateSchema(Schema()) - } + turbineScope(timeout = 10.0.seconds) { + val turbine = database.currentStatus.asFlow().testIn(this) + turbine.waitFor { it.connected } + turbine.cancel() + } - database.close() - } + assertFailsWith("Cannot update schema while connected") { + database.updateSchema(Schema()) + } + + database.close() + } @Test fun testPartialSync() = diff --git a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt index 3525d702..244d9516 100644 --- a/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt @@ -488,4 +488,4 @@ internal class PowerSyncDatabaseImpl( } } -internal object DisconnectRequestedException: CancellationException("disconnect() called") +internal object DisconnectRequestedException : CancellationException("disconnect() called") diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt index d0300611..ce4fde57 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStatus.kt @@ -5,7 +5,6 @@ import com.powersync.connectors.PowerSyncBackendConnector import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.asSharedFlow -import kotlinx.coroutines.flow.cancel import kotlinx.datetime.Instant @ConsistentCopyVisibility diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt index 60a6b8ad..41573e3f 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -30,7 +30,6 @@ import dev.mokkery.verifySuspend import io.ktor.client.engine.mock.MockEngine import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.delay -import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest import kotlinx.coroutines.withTimeout diff --git a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt index 8e4ce6f1..e9fcdde1 100644 --- a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt +++ b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt @@ -33,13 +33,13 @@ import kotlinx.serialization.encodeToString internal class MockSyncService( private val lines: ReceiveChannel, ) : HttpClientEngineBase("sync-service") { - override val config: HttpClientEngineConfig get() = Config - override val supportedCapabilities: Set> = setOf( - HttpTimeoutCapability, - ) + override val supportedCapabilities: Set> = + setOf( + HttpTimeoutCapability, + ) @OptIn(InternalAPI::class) override suspend fun execute(data: HttpRequestData): HttpResponseData { @@ -47,13 +47,14 @@ internal class MockSyncService( val scope = CoroutineScope(context) return if (data.url.encodedPath == "/sync/stream") { - val job = scope.writer { - lines.consumeEach { - val serializedLine = JsonUtil.json.encodeToString(it) - channel.writeStringUtf8("$serializedLine\n") - channel.flush() + val job = + scope.writer { + lines.consumeEach { + val serializedLine = JsonUtil.json.encodeToString(it) + channel.writeStringUtf8("$serializedLine\n") + channel.flush() + } } - } HttpResponseData( HttpStatusCode.OK, From 1db2f284a2d7a82d8ceb0aab21fffd70923b12f1 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 31 Mar 2025 09:51:06 +0200 Subject: [PATCH 6/8] Wait for downstream listener --- .../com/powersync/testutils/MockSyncService.kt | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt index e9fcdde1..d0ece657 100644 --- a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt +++ b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt @@ -16,10 +16,12 @@ import io.ktor.http.HttpStatusCode import io.ktor.http.headersOf import io.ktor.util.date.GMTDate import io.ktor.utils.io.InternalAPI +import io.ktor.utils.io.awaitFreeSpace import io.ktor.utils.io.writeStringUtf8 import io.ktor.utils.io.writer import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.consume import kotlinx.coroutines.channels.consumeEach import kotlinx.serialization.encodeToString @@ -49,10 +51,15 @@ internal class MockSyncService( return if (data.url.encodedPath == "/sync/stream") { val job = scope.writer { - lines.consumeEach { - val serializedLine = JsonUtil.json.encodeToString(it) - channel.writeStringUtf8("$serializedLine\n") - channel.flush() + lines.consume { + while (true) { + // Wait for a downstream listener being ready before requesting a sync line + channel.awaitFreeSpace() + val line = receive() + val serializedLine = JsonUtil.json.encodeToString(line) + channel.writeStringUtf8("$serializedLine\n") + channel.flush() + } } } From 3875b2d32926ce8c33b3882387393ddc76818888 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 31 Mar 2025 10:00:48 +0200 Subject: [PATCH 7/8] Remove superfluous import --- .../commonTest/kotlin/com/powersync/testutils/MockSyncService.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt index d0ece657..56ab90ec 100644 --- a/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt +++ b/core/src/commonTest/kotlin/com/powersync/testutils/MockSyncService.kt @@ -22,7 +22,6 @@ import io.ktor.utils.io.writer import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.channels.consume -import kotlinx.coroutines.channels.consumeEach import kotlinx.serialization.encodeToString /** From ed2862829d205d97ae91d98133fd22b3e7fc7b87 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Mon, 31 Mar 2025 13:22:11 +0200 Subject: [PATCH 8/8] Add changelog entry --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2a393e0a..ca6dfc14 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## 1.0.0-BETA29 (unreleased) + +* Fix potential race condition between jobs in `connect()` and `disconnect()`. + ## 1.0.0-BETA28 * Update PowerSync SQLite core extension to 0.3.12.