diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 230b1334..10da910f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -46,7 +46,8 @@ jobs: if: runner.os == 'macOS' uses: maxim-lobanov/setup-xcode@v1 with: - xcode-version: latest-stable + # TODO: Update to latest-stable once GH installs iOS 26 simulators + xcode-version: '^16.4.0' - name: Build and run tests with Gradle run: | diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/AttachmentsTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/AttachmentsTest.kt index db05a89b..4d063a2b 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/AttachmentsTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/AttachmentsTest.kt @@ -12,6 +12,7 @@ import com.powersync.attachments.createAttachmentsTable import com.powersync.db.getString import com.powersync.db.schema.Schema import com.powersync.db.schema.Table +import com.powersync.testutils.ActiveDatabaseTest import com.powersync.testutils.MockedRemoteStorage import com.powersync.testutils.UserRow import com.powersync.testutils.databaseTest @@ -27,7 +28,9 @@ import dev.mokkery.spy import dev.mokkery.verifySuspend import io.kotest.matchers.shouldBe import io.kotest.matchers.shouldNotBe +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.flowOf +import kotlinx.coroutines.flow.onEach import kotlinx.io.files.Path import kotlin.test.Test import kotlin.time.Duration.Companion.seconds @@ -53,6 +56,12 @@ class AttachmentsTest { ) } + private fun ActiveDatabaseTest.watchAttachmentsTable(): Flow> = + database + .watch("SELECT * FROM attachments") { + Attachment.fromCursor(it) + }.onEach { logger.i { "attachments table results: $it" } } + suspend fun updateSchema(db: PowerSyncDatabase) { db.updateSchema( Schema( @@ -76,11 +85,7 @@ class AttachmentsTest { val remote = spy(MockedRemoteStorage()) // Monitor the attachments table for testing - val attachmentQuery = - database - // language=SQL - .watch("SELECT * FROM attachments") { Attachment.fromCursor(it) } - .testIn(this) + val attachmentQuery = watchAttachmentsTable().testIn(this) val queue = AttachmentQueue( @@ -93,6 +98,7 @@ class AttachmentsTest { * immediately be deleted. */ archivedCacheLimit = 0, + logger = logger, ) doOnCleanup { @@ -175,7 +181,7 @@ class AttachmentsTest { val exists = queue.localStorage.fileExists(localUri) exists shouldBe false - attachmentQuery.cancel() + attachmentQuery.cancelAndIgnoreRemainingEvents() } } @@ -187,11 +193,7 @@ class AttachmentsTest { val remote = spy(MockedRemoteStorage()) // Monitor the attachments table for testing - val attachmentQuery = - database - // language=SQL - .watch("SELECT * FROM attachments") { Attachment.fromCursor(it) } - .testIn(this) + val attachmentQuery = watchAttachmentsTable().testIn(this) val queue = AttachmentQueue( @@ -204,6 +206,7 @@ class AttachmentsTest { * immediately be deleted. */ archivedCacheLimit = 0, + logger = logger, ) doOnCleanup { @@ -284,7 +287,7 @@ class AttachmentsTest { // The file should have been deleted from storage queue.localStorage.fileExists(localUri) shouldBe false - attachmentQuery.cancel() + attachmentQuery.cancelAndIgnoreRemainingEvents() } } @@ -296,11 +299,7 @@ class AttachmentsTest { val remote = spy(MockedRemoteStorage()) // Monitor the attachments table for testing - val attachmentQuery = - database - // language=SQL - .watch("SELECT * FROM attachments") { Attachment.fromCursor(it) } - .testIn(this) + val attachmentQuery = watchAttachmentsTable().testIn(this) val queue = AttachmentQueue( @@ -314,6 +313,7 @@ class AttachmentsTest { */ archivedCacheLimit = 0, syncThrottleDuration = 0.seconds, + logger = logger, ) doOnCleanup { @@ -382,7 +382,7 @@ class AttachmentsTest { ) } - attachmentQuery.cancel() + attachmentQuery.cancelAndIgnoreRemainingEvents() } } @@ -395,11 +395,7 @@ class AttachmentsTest { val remote = spy(MockedRemoteStorage()) // Monitor the attachments table for testing - val attachmentQuery = - database - // language=SQL - .watch("SELECT * FROM attachments") { Attachment.fromCursor(it) } - .testIn(this) + val attachmentQuery = watchAttachmentsTable().testIn(this) val queue = AttachmentQueue( @@ -411,6 +407,7 @@ class AttachmentsTest { * Keep some items in the cache */ archivedCacheLimit = 10, + logger = logger, ) doOnCleanup { @@ -493,7 +490,7 @@ class AttachmentsTest { attachmentRecord = attachmentQuery.awaitItem().first() attachmentRecord.state shouldBe AttachmentState.SYNCED - attachmentQuery.cancel() + attachmentQuery.cancelAndIgnoreRemainingEvents() } } @@ -509,10 +506,7 @@ class AttachmentsTest { } // Monitor the attachments table for testing - val attachmentQuery = - database - .watch("SELECT * FROM attachments") { Attachment.fromCursor(it) } - .testIn(this) + val attachmentQuery = watchAttachmentsTable().testIn(this) val queue = AttachmentQueue( @@ -537,6 +531,7 @@ class AttachmentsTest { exception: Exception, ): Boolean = false }, + logger = logger, ) doOnCleanup { queue.stopSyncing() @@ -574,7 +569,7 @@ class AttachmentsTest { attachmentRecord.state shouldBe AttachmentState.ARCHIVED - attachmentQuery.cancel() + attachmentQuery.cancelAndIgnoreRemainingEvents() } } } diff --git a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt index 4d4db964..84e69e21 100644 --- a/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt +++ b/core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt @@ -913,4 +913,19 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) { query.cancelAndIgnoreRemainingEvents() } } + + @Test + fun `ends iteration on http close`() = + databaseTest { + turbineScope(timeout = 10.0.seconds) { + val turbine = database.currentStatus.asFlow().testIn(this) + database.connect(TestConnector(), options = getOptions()) + turbine.waitFor { it.connected } + + syncLines.close() + turbine.waitFor { !it.connected } + + turbine.cancelAndIgnoreRemainingEvents() + } + } } diff --git a/core/src/commonMain/kotlin/com/powersync/attachments/AttachmentQueue.kt b/core/src/commonMain/kotlin/com/powersync/attachments/AttachmentQueue.kt index c0dbfa04..5a6fa3ab 100644 --- a/core/src/commonMain/kotlin/com/powersync/attachments/AttachmentQueue.kt +++ b/core/src/commonMain/kotlin/com/powersync/attachments/AttachmentQueue.kt @@ -327,6 +327,8 @@ public open class AttachmentQueue( * Use a lock here to prevent conflicting state updates. */ attachmentsService.withContext { attachmentsContext -> + logger.v { "processWatchedAttachments($items)" } + /** * Need to get all the attachments which are tracked in the DB. * We might need to restore an archived attachment. diff --git a/core/src/commonMain/kotlin/com/powersync/attachments/implementation/AttachmentContextImpl.kt b/core/src/commonMain/kotlin/com/powersync/attachments/implementation/AttachmentContextImpl.kt index ecf52d37..6bf04c56 100644 --- a/core/src/commonMain/kotlin/com/powersync/attachments/implementation/AttachmentContextImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/attachments/implementation/AttachmentContextImpl.kt @@ -1,13 +1,14 @@ package com.powersync.attachments.implementation import co.touchlab.kermit.Logger +import com.powersync.ExperimentalPowerSyncAPI import com.powersync.PowerSyncDatabase import com.powersync.attachments.Attachment import com.powersync.attachments.AttachmentContext import com.powersync.attachments.AttachmentState import com.powersync.db.getString import com.powersync.db.internal.ConnectionContext -import kotlinx.serialization.json.Json +import com.powersync.db.runWrapped import kotlin.time.Clock /** @@ -65,6 +66,8 @@ public open class AttachmentContextImpl( } db.writeTransaction { tx -> + logger.v { "saveAttachments($attachments)" } + for (attachment in attachments) { upsertAttachment(attachment, tx) } @@ -131,6 +134,7 @@ public open class AttachmentContextImpl( * @returns true if all items have been deleted. Returns false if there might be more archived * items remaining. */ + @OptIn(ExperimentalPowerSyncAPI::class) public override suspend fun deleteArchivedAttachments(callback: suspend (attachments: List) -> Unit): Boolean { // First fetch the attachments in order to allow other cleanup val limit = 1000 @@ -154,12 +158,21 @@ public open class AttachmentContextImpl( ), ) { Attachment.fromCursor(it) } callback(attachments) - db.execute( - "DELETE FROM $table WHERE id IN (SELECT value FROM json_each(?));", - listOf( - Json.encodeToString(attachments.map { it.id }), - ), - ) + + runWrapped { + db.useConnection(readOnly = false) { conn -> + conn.usePrepared("DELETE FROM $table WHERE id = ?") { stmt -> + for (attachment in attachments) { + stmt.bindText(1, attachment.id) + stmt.step() + + stmt.reset() + stmt.clearBindings() + } + } + } + } + return attachments.size < limit } diff --git a/core/src/commonMain/kotlin/com/powersync/attachments/sync/SyncingService.kt b/core/src/commonMain/kotlin/com/powersync/attachments/sync/SyncingService.kt index 5678d626..cf031027 100644 --- a/core/src/commonMain/kotlin/com/powersync/attachments/sync/SyncingService.kt +++ b/core/src/commonMain/kotlin/com/powersync/attachments/sync/SyncingService.kt @@ -78,7 +78,7 @@ public open class SyncingService( merge( // Handles manual triggers for sync events syncTriggerFlow.asSharedFlow(), - // Triggers the sync process whenever an underlaying change to the + // Triggers the sync process whenever an underlying change to the // attachments table happens attachmentsService .watchActiveAttachments(), diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt index 0937cdac..bf4fa151 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt @@ -8,6 +8,7 @@ import com.powersync.sync.Instruction import com.powersync.sync.LegacySyncImplementation import com.powersync.sync.SyncDataBatch import com.powersync.sync.SyncLocalDatabaseResult +import com.powersync.utils.JsonUtil import kotlinx.serialization.Serializable import kotlinx.serialization.json.JsonObject @@ -55,25 +56,49 @@ internal interface BucketStorage { } internal sealed interface PowerSyncControlArguments { + /** + * Returns the arguments for the `powersync_control` SQL invocation. + */ + val sqlArguments: Pair + @Serializable class Start( val parameters: JsonObject, val schema: SerializableSchema, - ) : PowerSyncControlArguments + ) : PowerSyncControlArguments { + override val sqlArguments: Pair + get() = "start" to JsonUtil.json.encodeToString(this) + } - data object Stop : PowerSyncControlArguments + data object Stop : PowerSyncControlArguments { + override val sqlArguments: Pair = "stop" to null + } data class TextLine( val line: String, - ) : PowerSyncControlArguments + ) : PowerSyncControlArguments { + override val sqlArguments: Pair = "line_text" to line + } class BinaryLine( - val line: ByteArray, + line: ByteArray, ) : PowerSyncControlArguments { override fun toString(): String = "BinaryLine" + + override val sqlArguments: Pair = "line_binary" to line + } + + data object CompletedUpload : PowerSyncControlArguments { + override val sqlArguments: Pair = "completed_upload" to null } - data object CompletedUpload : PowerSyncControlArguments + data object ConnectionEstablished : PowerSyncControlArguments { + override val sqlArguments: Pair = "connection" to "established" + } + + data object ResponseStreamEnd : PowerSyncControlArguments { + override val sqlArguments: Pair = "connection" to "end" + } } @Serializable diff --git a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt index 00331c37..b9c9e58a 100644 --- a/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt +++ b/core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt @@ -358,17 +358,7 @@ internal class BucketStorageImpl( db.writeTransaction { tx -> logger.v { "powersync_control: $args" } - val (op: String, data: Any?) = - when (args) { - is PowerSyncControlArguments.Start -> "start" to JsonUtil.json.encodeToString(args) - PowerSyncControlArguments.Stop -> "stop" to null - - PowerSyncControlArguments.CompletedUpload -> "completed_upload" to null - - is PowerSyncControlArguments.BinaryLine -> "line_binary" to args.line - is PowerSyncControlArguments.TextLine -> "line_text" to args.line - } - + val (op: String, data: Any?) = args.sqlArguments tx.get("SELECT powersync_control(?, ?) AS r", listOf(op, data), ::handleControlResult) } } diff --git a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt index 56b2c4b4..00ead40f 100644 --- a/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt +++ b/core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt @@ -299,7 +299,6 @@ internal class SyncStream( throw RuntimeException("Received error when connecting to sync stream: ${httpResponse.bodyAsText()}") } - status.update { copy(connected = true, connecting = false) } block(isBson, httpResponse) } } @@ -307,6 +306,7 @@ internal class SyncStream( private fun receiveTextLines(req: JsonElement): Flow = flow { connectToSyncEndpoint(req, supportBson = false) { isBson, response -> + status.update { copy(connected = true, connecting = false) } check(!isBson) emitAll(response.body().lines()) @@ -316,6 +316,7 @@ internal class SyncStream( private fun receiveTextOrBinaryLines(req: JsonElement): Flow = flow { connectToSyncEndpoint(req, supportBson = false) { isBson, response -> + emit(PowerSyncControlArguments.ConnectionEstablished) val body = response.body() if (isBson) { @@ -323,6 +324,8 @@ internal class SyncStream( } else { emitAll(body.lines().map { PowerSyncControlArguments.TextLine(it) }) } + + emit(PowerSyncControlArguments.ResponseStreamEnd) } } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 9224ed7f..6a0ed505 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -16,7 +16,7 @@ kotlinx-datetime = "0.7.1" kotlinx-io = "0.8.0" ktor = "3.2.3" uuid = "0.8.4" -powersync-core = "0.4.5" +powersync-core = "0.4.6" turbine = "1.2.1" kotest = "5.9.1" # we can't upgrade to 6.x because that requires Java 11 or above (we need Java 8 support)