Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ jobs:
if: runner.os == 'macOS'
uses: maxim-lobanov/setup-xcode@v1
with:
xcode-version: latest-stable
# TODO: Update to latest-stable once GH installs iOS 26 simulators
xcode-version: '^16.4.0'

- name: Build and run tests with Gradle
run: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import com.powersync.attachments.createAttachmentsTable
import com.powersync.db.getString
import com.powersync.db.schema.Schema
import com.powersync.db.schema.Table
import com.powersync.testutils.ActiveDatabaseTest
import com.powersync.testutils.MockedRemoteStorage
import com.powersync.testutils.UserRow
import com.powersync.testutils.databaseTest
Expand All @@ -27,7 +28,9 @@ import dev.mokkery.spy
import dev.mokkery.verifySuspend
import io.kotest.matchers.shouldBe
import io.kotest.matchers.shouldNotBe
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.onEach
import kotlinx.io.files.Path
import kotlin.test.Test
import kotlin.time.Duration.Companion.seconds
Expand All @@ -53,6 +56,12 @@ class AttachmentsTest {
)
}

private fun ActiveDatabaseTest.watchAttachmentsTable(): Flow<List<Attachment>> =
database
.watch("SELECT * FROM attachments") {
Attachment.fromCursor(it)
}.onEach { logger.i { "attachments table results: $it" } }

suspend fun updateSchema(db: PowerSyncDatabase) {
db.updateSchema(
Schema(
Expand All @@ -76,11 +85,7 @@ class AttachmentsTest {
val remote = spy<RemoteStorage>(MockedRemoteStorage())

// Monitor the attachments table for testing
val attachmentQuery =
database
// language=SQL
.watch("SELECT * FROM attachments") { Attachment.fromCursor(it) }
.testIn(this)
val attachmentQuery = watchAttachmentsTable().testIn(this)

val queue =
AttachmentQueue(
Expand All @@ -93,6 +98,7 @@ class AttachmentsTest {
* immediately be deleted.
*/
archivedCacheLimit = 0,
logger = logger,
)

doOnCleanup {
Expand Down Expand Up @@ -175,7 +181,7 @@ class AttachmentsTest {
val exists = queue.localStorage.fileExists(localUri)
exists shouldBe false

attachmentQuery.cancel()
attachmentQuery.cancelAndIgnoreRemainingEvents()
}
}

Expand All @@ -187,11 +193,7 @@ class AttachmentsTest {
val remote = spy<RemoteStorage>(MockedRemoteStorage())

// Monitor the attachments table for testing
val attachmentQuery =
database
// language=SQL
.watch("SELECT * FROM attachments") { Attachment.fromCursor(it) }
.testIn(this)
val attachmentQuery = watchAttachmentsTable().testIn(this)

val queue =
AttachmentQueue(
Expand All @@ -204,6 +206,7 @@ class AttachmentsTest {
* immediately be deleted.
*/
archivedCacheLimit = 0,
logger = logger,
)

doOnCleanup {
Expand Down Expand Up @@ -284,7 +287,7 @@ class AttachmentsTest {
// The file should have been deleted from storage
queue.localStorage.fileExists(localUri) shouldBe false

attachmentQuery.cancel()
attachmentQuery.cancelAndIgnoreRemainingEvents()
}
}

Expand All @@ -296,11 +299,7 @@ class AttachmentsTest {
val remote = spy<RemoteStorage>(MockedRemoteStorage())

// Monitor the attachments table for testing
val attachmentQuery =
database
// language=SQL
.watch("SELECT * FROM attachments") { Attachment.fromCursor(it) }
.testIn(this)
val attachmentQuery = watchAttachmentsTable().testIn(this)

val queue =
AttachmentQueue(
Expand All @@ -314,6 +313,7 @@ class AttachmentsTest {
*/
archivedCacheLimit = 0,
syncThrottleDuration = 0.seconds,
logger = logger,
)

doOnCleanup {
Expand Down Expand Up @@ -382,7 +382,7 @@ class AttachmentsTest {
)
}

attachmentQuery.cancel()
attachmentQuery.cancelAndIgnoreRemainingEvents()
}
}

Expand All @@ -395,11 +395,7 @@ class AttachmentsTest {
val remote = spy<RemoteStorage>(MockedRemoteStorage())

// Monitor the attachments table for testing
val attachmentQuery =
database
// language=SQL
.watch("SELECT * FROM attachments") { Attachment.fromCursor(it) }
.testIn(this)
val attachmentQuery = watchAttachmentsTable().testIn(this)

val queue =
AttachmentQueue(
Expand All @@ -411,6 +407,7 @@ class AttachmentsTest {
* Keep some items in the cache
*/
archivedCacheLimit = 10,
logger = logger,
)

doOnCleanup {
Expand Down Expand Up @@ -493,7 +490,7 @@ class AttachmentsTest {
attachmentRecord = attachmentQuery.awaitItem().first()
attachmentRecord.state shouldBe AttachmentState.SYNCED

attachmentQuery.cancel()
attachmentQuery.cancelAndIgnoreRemainingEvents()
}
}

Expand All @@ -509,10 +506,7 @@ class AttachmentsTest {
}

// Monitor the attachments table for testing
val attachmentQuery =
database
.watch("SELECT * FROM attachments") { Attachment.fromCursor(it) }
.testIn(this)
val attachmentQuery = watchAttachmentsTable().testIn(this)

val queue =
AttachmentQueue(
Expand All @@ -537,6 +531,7 @@ class AttachmentsTest {
exception: Exception,
): Boolean = false
},
logger = logger,
)
doOnCleanup {
queue.stopSyncing()
Expand Down Expand Up @@ -574,7 +569,7 @@ class AttachmentsTest {

attachmentRecord.state shouldBe AttachmentState.ARCHIVED

attachmentQuery.cancel()
attachmentQuery.cancelAndIgnoreRemainingEvents()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -913,4 +913,19 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) {
query.cancelAndIgnoreRemainingEvents()
}
}

@Test
fun `ends iteration on http close`() =
databaseTest {
turbineScope(timeout = 10.0.seconds) {
val turbine = database.currentStatus.asFlow().testIn(this)
database.connect(TestConnector(), options = getOptions())
turbine.waitFor { it.connected }

syncLines.close()
turbine.waitFor { !it.connected }

turbine.cancelAndIgnoreRemainingEvents()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ public open class AttachmentQueue(
* Use a lock here to prevent conflicting state updates.
*/
attachmentsService.withContext { attachmentsContext ->
logger.v { "processWatchedAttachments($items)" }

/**
* Need to get all the attachments which are tracked in the DB.
* We might need to restore an archived attachment.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package com.powersync.attachments.implementation

import co.touchlab.kermit.Logger
import com.powersync.ExperimentalPowerSyncAPI
import com.powersync.PowerSyncDatabase
import com.powersync.attachments.Attachment
import com.powersync.attachments.AttachmentContext
import com.powersync.attachments.AttachmentState
import com.powersync.db.getString
import com.powersync.db.internal.ConnectionContext
import kotlinx.serialization.json.Json
import com.powersync.db.runWrapped
import kotlin.time.Clock

/**
Expand Down Expand Up @@ -65,6 +66,8 @@ public open class AttachmentContextImpl(
}

db.writeTransaction { tx ->
logger.v { "saveAttachments($attachments)" }

for (attachment in attachments) {
upsertAttachment(attachment, tx)
}
Expand Down Expand Up @@ -131,6 +134,7 @@ public open class AttachmentContextImpl(
* @returns true if all items have been deleted. Returns false if there might be more archived
* items remaining.
*/
@OptIn(ExperimentalPowerSyncAPI::class)
public override suspend fun deleteArchivedAttachments(callback: suspend (attachments: List<Attachment>) -> Unit): Boolean {
// First fetch the attachments in order to allow other cleanup
val limit = 1000
Expand All @@ -154,12 +158,21 @@ public open class AttachmentContextImpl(
),
) { Attachment.fromCursor(it) }
callback(attachments)
db.execute(
"DELETE FROM $table WHERE id IN (SELECT value FROM json_each(?));",
listOf(
Json.encodeToString(attachments.map { it.id }),
),
)

runWrapped {
db.useConnection(readOnly = false) { conn ->
conn.usePrepared("DELETE FROM $table WHERE id = ?") { stmt ->
for (attachment in attachments) {
stmt.bindText(1, attachment.id)
stmt.step()

stmt.reset()
stmt.clearBindings()
}
}
}
}

return attachments.size < limit
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public open class SyncingService(
merge(
// Handles manual triggers for sync events
syncTriggerFlow.asSharedFlow(),
// Triggers the sync process whenever an underlaying change to the
// Triggers the sync process whenever an underlying change to the
// attachments table happens
attachmentsService
.watchActiveAttachments(),
Expand Down
35 changes: 30 additions & 5 deletions core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import com.powersync.sync.Instruction
import com.powersync.sync.LegacySyncImplementation
import com.powersync.sync.SyncDataBatch
import com.powersync.sync.SyncLocalDatabaseResult
import com.powersync.utils.JsonUtil
import kotlinx.serialization.Serializable
import kotlinx.serialization.json.JsonObject

Expand Down Expand Up @@ -55,25 +56,49 @@ internal interface BucketStorage {
}

internal sealed interface PowerSyncControlArguments {
/**
* Returns the arguments for the `powersync_control` SQL invocation.
*/
val sqlArguments: Pair<String, Any?>

@Serializable
class Start(
val parameters: JsonObject,
val schema: SerializableSchema,
) : PowerSyncControlArguments
) : PowerSyncControlArguments {
override val sqlArguments: Pair<String, Any?>
get() = "start" to JsonUtil.json.encodeToString(this)
}

data object Stop : PowerSyncControlArguments
data object Stop : PowerSyncControlArguments {
override val sqlArguments: Pair<String, Any?> = "stop" to null
}

data class TextLine(
val line: String,
) : PowerSyncControlArguments
) : PowerSyncControlArguments {
override val sqlArguments: Pair<String, Any?> = "line_text" to line
}

class BinaryLine(
val line: ByteArray,
line: ByteArray,
) : PowerSyncControlArguments {
override fun toString(): String = "BinaryLine"

override val sqlArguments: Pair<String, Any?> = "line_binary" to line
}

data object CompletedUpload : PowerSyncControlArguments {
override val sqlArguments: Pair<String, Any?> = "completed_upload" to null
}

data object CompletedUpload : PowerSyncControlArguments
data object ConnectionEstablished : PowerSyncControlArguments {
override val sqlArguments: Pair<String, Any?> = "connection" to "established"
}

data object ResponseStreamEnd : PowerSyncControlArguments {
override val sqlArguments: Pair<String, Any?> = "connection" to "end"
}
}

@Serializable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,17 +358,7 @@ internal class BucketStorageImpl(
db.writeTransaction { tx ->
logger.v { "powersync_control: $args" }

val (op: String, data: Any?) =
when (args) {
is PowerSyncControlArguments.Start -> "start" to JsonUtil.json.encodeToString(args)
PowerSyncControlArguments.Stop -> "stop" to null

PowerSyncControlArguments.CompletedUpload -> "completed_upload" to null

is PowerSyncControlArguments.BinaryLine -> "line_binary" to args.line
is PowerSyncControlArguments.TextLine -> "line_text" to args.line
}

val (op: String, data: Any?) = args.sqlArguments
tx.get("SELECT powersync_control(?, ?) AS r", listOf(op, data), ::handleControlResult)
}
}
Loading