From 4f6a832ebde8db75e81bf4fe1bdc1526346b27aa Mon Sep 17 00:00:00 2001 From: DominicGBauer Date: Tue, 14 Jan 2025 11:12:57 +0200 Subject: [PATCH 1/5] feat: do not invalidate credentials on io error --- .../kotlin/com/powersync/sync/SyncStream.kt | 10 +- .../com/powersync/sync/SyncStreamTest.kt | 189 ++++++++++++++++++ 2 files changed, 197 insertions(+), 2 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index bd4dfd64..283a7f78 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -31,6 +31,7 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow import kotlinx.datetime.Clock +import kotlinx.io.IOException import kotlinx.serialization.encodeToString import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.JsonObject @@ -98,9 +99,14 @@ internal class SyncStream( } catch (e: Exception) { // If the coroutine was cancelled, don't log an error if (e !is CancellationException) { - logger.e(e) { "Error in streamingSync" } + logger.e(e) { "Error in streamingSync: $e" } } - invalidCredentials = true + + // If there is a network issue don't invalidate the credentials + if (e !is IOException) { + invalidCredentials = true + } + status.update( downloadError = e, ) diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt index 04b743a9..2e9c634a 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -6,13 +6,21 @@ import co.touchlab.kermit.TestConfig import co.touchlab.kermit.TestLogWriter import com.powersync.bucket.BucketStorage import com.powersync.connectors.PowerSyncBackendConnector +import com.powersync.connectors.PowerSyncCredentials import com.powersync.db.crud.CrudEntry import com.powersync.db.crud.UpdateType import dev.mokkery.answering.returns +import dev.mokkery.answering.throws import dev.mokkery.everySuspend import dev.mokkery.mock import dev.mokkery.verify +import dev.mokkery.verify.VerifyMode +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest +import kotlinx.coroutines.withTimeout +import kotlinx.io.IOException import kotlinx.serialization.json.JsonObject import kotlin.test.BeforeTest import kotlin.test.Test @@ -112,4 +120,185 @@ class SyncStreamTest { assertEquals(Severity.Error, severity) } } + + @Test + fun testStreamingSyncBasicFlow() = + runTest { + bucketStorage = + mock { + everySuspend { getClientId() } returns "test-client-id" + everySuspend { getBucketStates() } returns emptyList() + } + + connector = + mock { + everySuspend { getCredentialsCached() } returns + PowerSyncCredentials( + token = "test-token", + userId = "test-user", + endpoint = "https://test.com", + ) + } + + syncStream = + SyncStream( + bucketStorage = bucketStorage, + connector = connector, + uploadCrud = { }, + retryDelayMs = 10, + logger = logger, + params = JsonObject(emptyMap()), + ) + + // Launch streaming sync in a coroutine that we'll cancel after verification + val job = + launch { + syncStream.streamingSync() + } + + // Wait for status to update + withTimeout(1000) { + while (!syncStream.status.connecting) { + delay(10) + } + } + + // Verify initial state + assertEquals(true, syncStream.status.connecting) + assertEquals(false, syncStream.status.connected) + + // Clean up + job.cancel() + } + + @Test + fun testStreamingSyncHandlesCancellation() = + runTest { + // Setup mocks + bucketStorage = + mock { + everySuspend { getClientId() } returns "test-client-id" + everySuspend { getBucketStates() } throws CancellationException("Test cancellation") + } + + connector = + mock { + everySuspend { getCredentialsCached() } returns null + everySuspend { invalidateCredentials() } returns Unit + } + + syncStream = + SyncStream( + bucketStorage = bucketStorage, + connector = connector, + uploadCrud = { }, + retryDelayMs = 10, + logger = logger, + params = JsonObject(emptyMap()), + ) + + val job = + launch { + syncStream.streamingSync() + } + + delay(100) // Give time for error handling + + // Verify no error was logged for CancellationException + assertEquals(0, testLogWriter.logs.count { it.severity == Severity.Error }) + + // Verify credentials were invalidated + verify(VerifyMode.exactly(1)) { syncStream.invalidateCredentials() } + + job.cancel() + } + + @Test + fun testStreamingSyncHandlesIOException() = + runTest { + bucketStorage = + mock { + everySuspend { getClientId() } returns "test-client-id" + everySuspend { getBucketStates() } throws IOException("Test IO error") + } + + connector = + mock { + everySuspend { getCredentialsCached() } returns null + everySuspend { invalidateCredentials() } returns Unit + } + + syncStream = + SyncStream( + bucketStorage = bucketStorage, + connector = connector, + uploadCrud = { }, + retryDelayMs = 10, + logger = logger, + params = JsonObject(emptyMap()), + ) + + val job = + launch { + syncStream.streamingSync() + } + + delay(9) + + // Verify error was logged + assertContains( + testLogWriter.logs + .first { + it.severity == Severity.Error + }.message, + "Error in streamingSync: java.io.IOException: Test IO error", + ) + + // Verify credentials weren't invalidated for IOException + verify(VerifyMode.exactly(0)) { syncStream.invalidateCredentials() } + + job.cancel() + } + + @Test + fun testStreamingSyncHandlesOtherException() = + runTest { + bucketStorage = + mock { + everySuspend { getClientId() } returns "test-client-id" + everySuspend { getBucketStates() } throws IllegalStateException("Test error") + } + + connector = + mock { + everySuspend { getCredentialsCached() } returns null + everySuspend { invalidateCredentials() } returns Unit + } + + syncStream = + SyncStream( + bucketStorage = bucketStorage, + connector = connector, + uploadCrud = { }, + retryDelayMs = 10, + logger = logger, + params = JsonObject(emptyMap()), + ) + + val job = + launch { + syncStream.streamingSync() + } + + delay(9) // Give time for error handling + + // Verify error was logged for non-CancellationException + assertEquals(1, testLogWriter.logs.count { it.severity == Severity.Error }) + assertContains(testLogWriter.logs.first { it.severity == Severity.Error }.message, "Error in streamingSync") + + // Verify credentials were invalidated for non-IOException + verify(VerifyMode.exactly(1)) { syncStream.invalidateCredentials() } + + job.cancel() + } } From 5d1bad2a8748e2b377582c06fdc203015e34d92c Mon Sep 17 00:00:00 2001 From: DominicGBauer Date: Tue, 14 Jan 2025 11:36:06 +0200 Subject: [PATCH 2/5] chore: remove tests as they don't really work --- .../kotlin/com/powersync/sync/SyncStream.kt | 6 +- .../com/powersync/sync/SyncStreamTest.kt | 135 ------------------ 2 files changed, 3 insertions(+), 138 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 283a7f78..cbdefeb2 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -97,11 +97,11 @@ internal class SyncStream( // break; // } } catch (e: Exception) { - // If the coroutine was cancelled, don't log an error - if (e !is CancellationException) { - logger.e(e) { "Error in streamingSync: $e" } + if (e is CancellationException) { + throw e } + logger.e(e) { "Error in streamingSync: $e" } // If there is a network issue don't invalidate the credentials if (e !is IOException) { invalidCredentials = true diff --git a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt index 2e9c634a..8284dce8 100644 --- a/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt +++ b/core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt @@ -10,17 +10,13 @@ import com.powersync.connectors.PowerSyncCredentials import com.powersync.db.crud.CrudEntry import com.powersync.db.crud.UpdateType import dev.mokkery.answering.returns -import dev.mokkery.answering.throws import dev.mokkery.everySuspend import dev.mokkery.mock import dev.mokkery.verify -import dev.mokkery.verify.VerifyMode -import kotlinx.coroutines.CancellationException import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.test.runTest import kotlinx.coroutines.withTimeout -import kotlinx.io.IOException import kotlinx.serialization.json.JsonObject import kotlin.test.BeforeTest import kotlin.test.Test @@ -170,135 +166,4 @@ class SyncStreamTest { // Clean up job.cancel() } - - @Test - fun testStreamingSyncHandlesCancellation() = - runTest { - // Setup mocks - bucketStorage = - mock { - everySuspend { getClientId() } returns "test-client-id" - everySuspend { getBucketStates() } throws CancellationException("Test cancellation") - } - - connector = - mock { - everySuspend { getCredentialsCached() } returns null - everySuspend { invalidateCredentials() } returns Unit - } - - syncStream = - SyncStream( - bucketStorage = bucketStorage, - connector = connector, - uploadCrud = { }, - retryDelayMs = 10, - logger = logger, - params = JsonObject(emptyMap()), - ) - - val job = - launch { - syncStream.streamingSync() - } - - delay(100) // Give time for error handling - - // Verify no error was logged for CancellationException - assertEquals(0, testLogWriter.logs.count { it.severity == Severity.Error }) - - // Verify credentials were invalidated - verify(VerifyMode.exactly(1)) { syncStream.invalidateCredentials() } - - job.cancel() - } - - @Test - fun testStreamingSyncHandlesIOException() = - runTest { - bucketStorage = - mock { - everySuspend { getClientId() } returns "test-client-id" - everySuspend { getBucketStates() } throws IOException("Test IO error") - } - - connector = - mock { - everySuspend { getCredentialsCached() } returns null - everySuspend { invalidateCredentials() } returns Unit - } - - syncStream = - SyncStream( - bucketStorage = bucketStorage, - connector = connector, - uploadCrud = { }, - retryDelayMs = 10, - logger = logger, - params = JsonObject(emptyMap()), - ) - - val job = - launch { - syncStream.streamingSync() - } - - delay(9) - - // Verify error was logged - assertContains( - testLogWriter.logs - .first { - it.severity == Severity.Error - }.message, - "Error in streamingSync: java.io.IOException: Test IO error", - ) - - // Verify credentials weren't invalidated for IOException - verify(VerifyMode.exactly(0)) { syncStream.invalidateCredentials() } - - job.cancel() - } - - @Test - fun testStreamingSyncHandlesOtherException() = - runTest { - bucketStorage = - mock { - everySuspend { getClientId() } returns "test-client-id" - everySuspend { getBucketStates() } throws IllegalStateException("Test error") - } - - connector = - mock { - everySuspend { getCredentialsCached() } returns null - everySuspend { invalidateCredentials() } returns Unit - } - - syncStream = - SyncStream( - bucketStorage = bucketStorage, - connector = connector, - uploadCrud = { }, - retryDelayMs = 10, - logger = logger, - params = JsonObject(emptyMap()), - ) - - val job = - launch { - syncStream.streamingSync() - } - - delay(9) // Give time for error handling - - // Verify error was logged for non-CancellationException - assertEquals(1, testLogWriter.logs.count { it.severity == Severity.Error }) - assertContains(testLogWriter.logs.first { it.severity == Severity.Error }.message, "Error in streamingSync") - - // Verify credentials were invalidated for non-IOException - verify(VerifyMode.exactly(1)) { syncStream.invalidateCredentials() } - - job.cancel() - } } From 64cc88167e274974f0c59d5ad2c0e7118935fc80 Mon Sep 17 00:00:00 2001 From: DominicGBauer Date: Tue, 14 Jan 2025 16:20:30 +0200 Subject: [PATCH 3/5] chore: update changelog --- CHANGELOG.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d7e7c24d..b41b0019 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,8 @@ ## 1.0.0-BETA16 -* Add `close` method to database methods +* Add `close` method to database methods +* Throw when error is a `CancellationError` in syncStream and only invalidate credentials when error is not IOException. ## 1.0.0-BETA15 From 07fba210566f051126ed78e8248b1189991886cf Mon Sep 17 00:00:00 2001 From: DominicGBauer Date: Tue, 14 Jan 2025 16:33:37 +0200 Subject: [PATCH 4/5] chore: remove invalidation --- core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt | 6 ------ 1 file changed, 6 deletions(-) diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index cbdefeb2..c16025a9 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -31,7 +31,6 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flow import kotlinx.datetime.Clock -import kotlinx.io.IOException import kotlinx.serialization.encodeToString import kotlinx.serialization.json.JsonElement import kotlinx.serialization.json.JsonObject @@ -102,11 +101,6 @@ internal class SyncStream( } logger.e(e) { "Error in streamingSync: $e" } - // If there is a network issue don't invalidate the credentials - if (e !is IOException) { - invalidCredentials = true - } - status.update( downloadError = e, ) From df30de8c2e534f18ab6eb6bc34116a4a7d41215b Mon Sep 17 00:00:00 2001 From: DominicGBauer Date: Tue, 14 Jan 2025 16:36:01 +0200 Subject: [PATCH 5/5] chore: update changelog --- CHANGELOG.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b41b0019..6008d19b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,8 +3,7 @@ ## 1.0.0-BETA16 * Add `close` method to database methods -* Throw when error is a `CancellationError` in syncStream and only invalidate credentials when error is not IOException. - +* Throw when error is a `CancellationError` and remove invalidation for all errors in `streamingSync` catch. ## 1.0.0-BETA15 * Update powersync-sqlite-core to 0.3.8