diff --git a/.github/workflows/build_and_test.yaml b/.github/workflows/build_and_test.yaml index 6b1bbbd..52dbf5e 100644 --- a/.github/workflows/build_and_test.yaml +++ b/.github/workflows/build_and_test.yaml @@ -19,3 +19,20 @@ jobs: xcodebuild test -scheme PowerSync -destination "platform=iOS Simulator,name=iPhone 16" xcodebuild test -scheme PowerSync -destination "platform=macOS,arch=arm64,name=My Mac" xcodebuild test -scheme PowerSync -destination "platform=watchOS Simulator,arch=arm64,name=Apple Watch Ultra 2 (49mm)" + + buildSwift6: + name: Build and test with Swift 6 + runs-on: macos-latest + steps: + - uses: actions/checkout@v4 + - name: Set up XCode + uses: maxim-lobanov/setup-xcode@v1 + with: + xcode-version: latest-stable + - name: Use Swift 6 + run: | + sed -i '' 's|^// swift-tools-version:.*$|// swift-tools-version:6.1|' Package.swift + - name: Build and Test + run: | + swift build -Xswiftc -strict-concurrency=complete + swift test -Xswiftc -strict-concurrency=complete diff --git a/.gitignore b/.gitignore index fb8464f..79542bb 100644 --- a/.gitignore +++ b/.gitignore @@ -69,6 +69,7 @@ DerivedData/ .swiftpm/configuration/registries.json .swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata .netrc - +.vscode +.sourcekit-lsp Secrets.swift \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index c62bdeb..1f92db5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,13 @@ ## Unreleased * Fix null values in CRUD entries being reported as strings. +* Added support for Swift 6 strict concurrency checking. + - Accepted query parameter types have been updated from `[Any]` to `[Sendable]`. This should cover all supported query parameter types. + - Query and lock methods' return `Result` generic types now should extend `Sendable`. + - Deprecated default `open class PowerSyncBackendConnector`. Devs should preferably implement the `PowerSyncBackendConnectorProtocol` + +* *Potential Breaking Change*: Attachment helpers have been updated to better support Swift 6 strict concurrency checking. `Actor` isolation is improved, but developers who customize or extend `AttachmentQueue` will need to update their implementations. The default instantiation of `AttachmentQueue` remains unchanged. +`AttachmentQueueProtocol` now defines the basic requirements for an attachment queue, with most base functionality provided via an extension. Custom implementations should extend `AttachmentQueueProtocol`. * [Internal] Instantiate Kotlin Kermit logger directly. * [Internal] Improved connection context error handling. diff --git a/Demo/PowerSyncExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved b/Demo/PowerSyncExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved index 4dfb9de..547bdd5 100644 --- a/Demo/PowerSyncExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved +++ b/Demo/PowerSyncExample.xcodeproj/project.xcworkspace/xcshareddata/swiftpm/Package.resolved @@ -1,5 +1,5 @@ { - "originHash" : "2d885a1b46f17f9239b7876e3889168a6de98024718f2d7af03aede290c8a86a", + "originHash" : "33297127250b66812faa920958a24bae46bf9e9d1c38ea6b84ca413efaf16afd", "pins" : [ { "identity" : "anycodable", @@ -15,8 +15,8 @@ "kind" : "remoteSourceControl", "location" : "https://github.com/powersync-ja/powersync-sqlite-core-swift.git", "state" : { - "revision" : "21057135ce8269b43582022aa4ca56407332e6a8", - "version" : "0.4.2" + "revision" : "3396dd7eb9d4264b19e3d95bfe0d77347826f4c2", + "version" : "0.4.4" } }, { diff --git a/Demo/PowerSyncExample/PowerSync/SupabaseConnector.swift b/Demo/PowerSyncExample/PowerSync/SupabaseConnector.swift index 6024923..4bad48e 100644 --- a/Demo/PowerSyncExample/PowerSync/SupabaseConnector.swift +++ b/Demo/PowerSyncExample/PowerSync/SupabaseConnector.swift @@ -38,7 +38,7 @@ private enum PostgresFatalCodes { } @Observable -class SupabaseConnector: PowerSyncBackendConnector { +final class SupabaseConnector: PowerSyncBackendConnectorProtocol { let powerSyncEndpoint: String = Secrets.powerSyncEndpoint let client: SupabaseClient = .init( supabaseURL: Secrets.supabaseURL, @@ -50,8 +50,7 @@ class SupabaseConnector: PowerSyncBackendConnector { @ObservationIgnored private var observeAuthStateChangesTask: Task? - override init() { - super.init() + init() { session = client.auth.currentSession observeAuthStateChangesTask = Task { [weak self] in guard let self = self else { return } @@ -80,7 +79,7 @@ class SupabaseConnector: PowerSyncBackendConnector { return client.storage.from(bucket) } - override func fetchCredentials() async throws -> PowerSyncCredentials? { + func fetchCredentials() async throws -> PowerSyncCredentials? { session = try await client.auth.session if session == nil { @@ -92,7 +91,7 @@ class SupabaseConnector: PowerSyncBackendConnector { return PowerSyncCredentials(endpoint: powerSyncEndpoint, token: token) } - override func uploadData(database: PowerSyncDatabaseProtocol) async throws { + func uploadData(database: PowerSyncDatabaseProtocol) async throws { guard let transaction = try await database.getNextCrudTransaction() else { return } var lastEntry: CrudEntry? diff --git a/Demo/PowerSyncExample/PowerSync/SystemManager.swift b/Demo/PowerSyncExample/PowerSync/SystemManager.swift index 4737c52..89b3641 100644 --- a/Demo/PowerSyncExample/PowerSync/SystemManager.swift +++ b/Demo/PowerSyncExample/PowerSync/SystemManager.swift @@ -225,17 +225,17 @@ class SystemManager { if let attachments, let photoId = todo.photoId { try await attachments.deleteFile( attachmentId: photoId - ) { tx, _ in + ) { transaction, _ in try self.deleteTodoInTX( id: todo.id, - tx: tx + tx: transaction ) } } else { - try await db.writeTransaction { tx in + try await db.writeTransaction { transaction in try self.deleteTodoInTX( id: todo.id, - tx: tx + tx: transaction ) } } diff --git a/Sources/PowerSync/Kotlin/DatabaseLogger.swift b/Sources/PowerSync/Kotlin/DatabaseLogger.swift index 852dc07..ed49d5a 100644 --- a/Sources/PowerSync/Kotlin/DatabaseLogger.swift +++ b/Sources/PowerSync/Kotlin/DatabaseLogger.swift @@ -6,7 +6,7 @@ import PowerSyncKotlin private class KermitLogWriterAdapter: Kermit_coreLogWriter { /// The underlying Swift log writer to forward log messages to. let logger: any LoggerProtocol - + /// Initializes a new adapter. /// /// - Parameter logger: A Swift log writer that will handle log output. @@ -14,7 +14,7 @@ private class KermitLogWriterAdapter: Kermit_coreLogWriter { self.logger = logger super.init() } - + /// Called by Kermit to log a message. /// /// - Parameters: @@ -22,7 +22,7 @@ private class KermitLogWriterAdapter: Kermit_coreLogWriter { /// - message: The content of the log message. /// - tag: A string categorizing the log. /// - throwable: An optional Kotlin exception (ignored here). - override func log(severity: Kermit_coreSeverity, message: String, tag: String, throwable: KotlinThrowable?) { + override func log(severity: Kermit_coreSeverity, message: String, tag: String, throwable _: KotlinThrowable?) { switch severity { case PowerSyncKotlin.Kermit_coreSeverity.verbose: return logger.debug(message, tag: tag) @@ -43,7 +43,7 @@ private class KermitLogWriterAdapter: Kermit_coreLogWriter { class KotlinKermitLoggerConfig: PowerSyncKotlin.Kermit_coreLoggerConfig { var logWriterList: [Kermit_coreLogWriter] var minSeverity: PowerSyncKotlin.Kermit_coreSeverity - + init(logWriterList: [Kermit_coreLogWriter], minSeverity: PowerSyncKotlin.Kermit_coreSeverity) { self.logWriterList = logWriterList self.minSeverity = minSeverity @@ -54,18 +54,18 @@ class KotlinKermitLoggerConfig: PowerSyncKotlin.Kermit_coreLoggerConfig { /// /// This class bridges Swift log writers with the Kotlin logging system and supports /// runtime configuration of severity levels and writer lists. -class DatabaseLogger: LoggerProtocol { +final class DatabaseLogger: LoggerProtocol { /// The underlying Kermit logger instance provided by the PowerSyncKotlin SDK. public let kLogger: PowerSyncKotlin.KermitLogger public let logger: any LoggerProtocol - + /// Initializes a new logger with an optional list of writers. /// /// - Parameter logger: A logger which will be called for each internal log operation init(_ logger: any LoggerProtocol) { self.logger = logger // Set to the lowest severity. The provided logger should filter by severity - self.kLogger = PowerSyncKotlin.KermitLogger( + kLogger = PowerSyncKotlin.KermitLogger( config: KotlinKermitLoggerConfig( logWriterList: [KermitLogWriterAdapter(logger: logger)], minSeverity: Kermit_coreSeverity.verbose @@ -73,27 +73,27 @@ class DatabaseLogger: LoggerProtocol { tag: "PowerSync" ) } - + /// Logs a debug-level message. public func debug(_ message: String, tag: String?) { logger.debug(message, tag: tag) } - + /// Logs an info-level message. public func info(_ message: String, tag: String?) { logger.info(message, tag: tag) } - + /// Logs a warning-level message. public func warning(_ message: String, tag: String?) { logger.warning(message, tag: tag) } - + /// Logs an error-level message. public func error(_ message: String, tag: String?) { logger.error(message, tag: tag) } - + /// Logs a fault (assert-level) message, typically used for critical issues. public func fault(_ message: String, tag: String?) { logger.fault(message, tag: tag) diff --git a/Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift b/Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift index 931df1b..83cdf6c 100644 --- a/Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift +++ b/Sources/PowerSync/Kotlin/KotlinPowerSyncDatabaseImpl.swift @@ -1,9 +1,11 @@ import Foundation import PowerSyncKotlin -final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { +final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol, + // `PowerSyncKotlin.PowerSyncDatabase` cannot be marked as Sendable + @unchecked Sendable +{ let logger: any LoggerProtocol - private let kotlinDatabase: PowerSyncKotlin.PowerSyncDatabase private let encoder = JSONEncoder() let currentStatus: SyncStatus @@ -43,7 +45,7 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } func connect( - connector: PowerSyncBackendConnector, + connector: PowerSyncBackendConnectorProtocol, options: ConnectOptions? ) async throws { let connectorAdapter = PowerSyncBackendConnectorAdapter( @@ -98,7 +100,7 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } @discardableResult - func execute(sql: String, parameters: [Any?]?) async throws -> Int64 { + func execute(sql: String, parameters: [Sendable?]?) async throws -> Int64 { try await writeTransaction { ctx in try ctx.execute( sql: sql, @@ -107,10 +109,10 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } } - func get( + func get( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) -> RowType ) async throws -> RowType { try await readLock { ctx in try ctx.get( @@ -121,10 +123,10 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } } - func get( + func get( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) async throws -> RowType { try await readLock { ctx in try ctx.get( @@ -135,10 +137,10 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } } - func getAll( + func getAll( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) -> RowType ) async throws -> [RowType] { try await readLock { ctx in try ctx.getAll( @@ -149,10 +151,10 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } } - func getAll( + func getAll( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) async throws -> [RowType] { try await readLock { ctx in try ctx.getAll( @@ -163,10 +165,10 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } } - func getOptional( + func getOptional( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) -> RowType ) async throws -> RowType? { try await readLock { ctx in try ctx.getOptional( @@ -177,10 +179,10 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } } - func getOptional( + func getOptional( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) async throws -> RowType? { try await readLock { ctx in try ctx.getOptional( @@ -191,10 +193,10 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } } - func watch( + func watch( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) -> RowType ) throws -> AsyncThrowingStream<[RowType], any Error> { try watch( options: WatchOptions( @@ -205,10 +207,10 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { ) } - func watch( + func watch( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) throws -> AsyncThrowingStream<[RowType], any Error> { try watch( options: WatchOptions( @@ -219,7 +221,7 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { ) } - func watch( + func watch( options: WatchOptions ) throws -> AsyncThrowingStream<[RowType], Error> { AsyncThrowingStream { continuation in @@ -269,8 +271,8 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } } - func writeLock( - callback: @escaping (any ConnectionContext) throws -> R + func writeLock( + callback: @Sendable @escaping (any ConnectionContext) throws -> R ) async throws -> R { return try await wrapPowerSyncException { try safeCast( @@ -282,8 +284,8 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } } - func writeTransaction( - callback: @escaping (any Transaction) throws -> R + func writeTransaction( + callback: @Sendable @escaping (any Transaction) throws -> R ) async throws -> R { return try await wrapPowerSyncException { try safeCast( @@ -295,8 +297,8 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } } - func readLock( - callback: @escaping (any ConnectionContext) throws -> R + func readLock( + callback: @Sendable @escaping (any ConnectionContext) throws -> R ) async throws -> R { @@ -310,8 +312,8 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } } - func readTransaction( - callback: @escaping (any Transaction) throws -> R + func readTransaction( + callback: @Sendable @escaping (any Transaction) throws -> R ) async throws -> R { return try await wrapPowerSyncException { try safeCast( @@ -328,7 +330,7 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { } /// Tries to convert Kotlin PowerSyncExceptions to Swift Exceptions - private func wrapPowerSyncException( + private func wrapPowerSyncException( handler: () async throws -> R) async throws -> R { @@ -348,7 +350,7 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { private func getQuerySourceTables( sql: String, - parameters: [Any?] + parameters: [Sendable?] ) async throws -> Set { let rows = try await getAll( sql: "EXPLAIN \(sql)", @@ -385,7 +387,7 @@ final class KotlinPowerSyncDatabaseImpl: PowerSyncDatabaseProtocol { let tableRows = try await getAll( sql: "SELECT tbl_name FROM sqlite_master WHERE rootpage IN (SELECT json_each.value FROM json_each(?))", parameters: [ - pagesString + pagesString, ] ) { try $0.getString(index: 0) } @@ -417,7 +419,7 @@ extension Error { } func wrapLockContext( - callback: @escaping (any ConnectionContext) throws -> Any + callback: @Sendable @escaping (any ConnectionContext) throws -> Any ) throws -> PowerSyncKotlin.ThrowableLockCallback { PowerSyncKotlin.wrapContextHandler { kotlinContext in do { @@ -436,7 +438,7 @@ func wrapLockContext( } func wrapTransactionContext( - callback: @escaping (any Transaction) throws -> Any + callback: @Sendable @escaping (any Transaction) throws -> Any ) throws -> PowerSyncKotlin.ThrowableTransactionCallback { PowerSyncKotlin.wrapTransactionContextHandler { kotlinContext in do { diff --git a/Sources/PowerSync/Kotlin/KotlinTypes.swift b/Sources/PowerSync/Kotlin/KotlinTypes.swift index 18edcbd..8d0acc8 100644 --- a/Sources/PowerSync/Kotlin/KotlinTypes.swift +++ b/Sources/PowerSync/Kotlin/KotlinTypes.swift @@ -3,3 +3,8 @@ import PowerSyncKotlin typealias KotlinPowerSyncBackendConnector = PowerSyncKotlin.PowerSyncBackendConnector typealias KotlinPowerSyncCredentials = PowerSyncKotlin.PowerSyncCredentials typealias KotlinPowerSyncDatabase = PowerSyncKotlin.PowerSyncDatabase + +extension KotlinPowerSyncBackendConnector: @retroactive @unchecked Sendable {} +extension KotlinPowerSyncCredentials: @retroactive @unchecked Sendable {} +extension PowerSyncKotlin.KermitLogger: @retroactive @unchecked Sendable {} +extension PowerSyncKotlin.SyncStatus: @retroactive @unchecked Sendable {} diff --git a/Sources/PowerSync/Kotlin/PowerSyncBackendConnectorAdapter.swift b/Sources/PowerSync/Kotlin/PowerSyncBackendConnectorAdapter.swift index 8e8da4c..6507610 100644 --- a/Sources/PowerSync/Kotlin/PowerSyncBackendConnectorAdapter.swift +++ b/Sources/PowerSync/Kotlin/PowerSyncBackendConnectorAdapter.swift @@ -1,12 +1,15 @@ import OSLog -class PowerSyncBackendConnectorAdapter: KotlinPowerSyncBackendConnector { - let swiftBackendConnector: PowerSyncBackendConnector +final class PowerSyncBackendConnectorAdapter: KotlinPowerSyncBackendConnector, + // We need to declare this since we declared KotlinPowerSyncBackendConnector as @unchecked Sendable + @unchecked Sendable +{ + let swiftBackendConnector: PowerSyncBackendConnectorProtocol let db: any PowerSyncDatabaseProtocol let logTag = "PowerSyncBackendConnector" init( - swiftBackendConnector: PowerSyncBackendConnector, + swiftBackendConnector: PowerSyncBackendConnectorProtocol, db: any PowerSyncDatabaseProtocol ) { self.swiftBackendConnector = swiftBackendConnector @@ -26,7 +29,7 @@ class PowerSyncBackendConnectorAdapter: KotlinPowerSyncBackendConnector { } } - override func __uploadData(database: KotlinPowerSyncDatabase) async throws { + override func __uploadData(database _: KotlinPowerSyncDatabase) async throws { do { // Pass the Swift DB protocal to the connector return try await swiftBackendConnector.uploadData(database: db) diff --git a/Sources/PowerSync/Kotlin/SafeCastError.swift b/Sources/PowerSync/Kotlin/SafeCastError.swift index 35ef8cb..bd18664 100644 --- a/Sources/PowerSync/Kotlin/SafeCastError.swift +++ b/Sources/PowerSync/Kotlin/SafeCastError.swift @@ -1,7 +1,7 @@ import Foundation enum SafeCastError: Error, CustomStringConvertible { - case typeMismatch(expected: Any.Type, actual: Any?) + case typeMismatch(expected: String, actual: String?) var description: String { switch self { @@ -25,6 +25,6 @@ func safeCast(_ value: Any?, to type: T.Type) throws -> T { if let castedValue = value as? T { return castedValue } else { - throw SafeCastError.typeMismatch(expected: type, actual: value) + throw SafeCastError.typeMismatch(expected: "\(type)", actual: "\(value ?? "nil")") } } diff --git a/Sources/PowerSync/Kotlin/db/KotlinConnectionContext.swift b/Sources/PowerSync/Kotlin/db/KotlinConnectionContext.swift index dbfca2d..0b7f314 100644 --- a/Sources/PowerSync/Kotlin/db/KotlinConnectionContext.swift +++ b/Sources/PowerSync/Kotlin/db/KotlinConnectionContext.swift @@ -10,17 +10,17 @@ protocol KotlinConnectionContextProtocol: ConnectionContext { /// Implements most of `ConnectionContext` using the `ctx` provided. extension KotlinConnectionContextProtocol { - func execute(sql: String, parameters: [Any?]?) throws -> Int64 { + func execute(sql: String, parameters: [Sendable?]?) throws -> Int64 { try ctx.execute( sql: sql, parameters: mapParameters(parameters) ) } - func getOptional( + func getOptional( sql: String, - parameters: [Any?]?, - mapper: @escaping (any SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (any SqlCursor) throws -> RowType ) throws -> RowType? { return try wrapQueryCursorTyped( mapper: mapper, @@ -35,10 +35,10 @@ extension KotlinConnectionContextProtocol { ) } - func getAll( + func getAll( sql: String, - parameters: [Any?]?, - mapper: @escaping (any SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (any SqlCursor) throws -> RowType ) throws -> [RowType] { return try wrapQueryCursorTyped( mapper: mapper, @@ -53,10 +53,10 @@ extension KotlinConnectionContextProtocol { ) } - func get( + func get( sql: String, - parameters: [Any?]?, - mapper: @escaping (any SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (any SqlCursor) throws -> RowType ) throws -> RowType { return try wrapQueryCursorTyped( mapper: mapper, @@ -72,7 +72,10 @@ extension KotlinConnectionContextProtocol { } } -class KotlinConnectionContext: KotlinConnectionContextProtocol { +final class KotlinConnectionContext: KotlinConnectionContextProtocol, + // The Kotlin ConnectionContext is technically sendable, but we cannot annotate that + @unchecked Sendable +{ let ctx: PowerSyncKotlin.ConnectionContext init(ctx: PowerSyncKotlin.ConnectionContext) { @@ -80,7 +83,10 @@ class KotlinConnectionContext: KotlinConnectionContextProtocol { } } -class KotlinTransactionContext: Transaction, KotlinConnectionContextProtocol { +final class KotlinTransactionContext: Transaction, KotlinConnectionContextProtocol, + // The Kotlin ConnectionContext is technically sendable, but we cannot annotate that + @unchecked Sendable +{ let ctx: PowerSyncKotlin.ConnectionContext init(ctx: PowerSyncKotlin.PowerSyncTransaction) { diff --git a/Sources/PowerSync/Kotlin/sync/KotlinSyncStatus.swift b/Sources/PowerSync/Kotlin/sync/KotlinSyncStatus.swift index b71615f..db8ce2d 100644 --- a/Sources/PowerSync/Kotlin/sync/KotlinSyncStatus.swift +++ b/Sources/PowerSync/Kotlin/sync/KotlinSyncStatus.swift @@ -2,7 +2,7 @@ import Combine import Foundation import PowerSyncKotlin -class KotlinSyncStatus: KotlinSyncStatusDataProtocol, SyncStatus { +final class KotlinSyncStatus: KotlinSyncStatusDataProtocol, SyncStatus { private let baseStatus: PowerSyncKotlin.SyncStatus var base: any PowerSyncKotlin.SyncStatusData { diff --git a/Sources/PowerSync/Kotlin/sync/KotlinSyncStatusData.swift b/Sources/PowerSync/Kotlin/sync/KotlinSyncStatusData.swift index 0d2d759..df64951 100644 --- a/Sources/PowerSync/Kotlin/sync/KotlinSyncStatusData.swift +++ b/Sources/PowerSync/Kotlin/sync/KotlinSyncStatusData.swift @@ -6,7 +6,10 @@ protocol KotlinSyncStatusDataProtocol: SyncStatusData { var base: PowerSyncKotlin.SyncStatusData { get } } -struct KotlinSyncStatusData: KotlinSyncStatusDataProtocol { +struct KotlinSyncStatusData: KotlinSyncStatusDataProtocol, + // We can't override the PowerSyncKotlin.SyncStatusData's Sendable status + @unchecked Sendable +{ let base: PowerSyncKotlin.SyncStatusData } @@ -15,19 +18,19 @@ extension KotlinSyncStatusDataProtocol { var connected: Bool { base.connected } - + var connecting: Bool { base.connecting } - + var downloading: Bool { base.downloading } - + var uploading: Bool { base.uploading } - + var lastSyncedAt: Date? { guard let lastSyncedAt = base.lastSyncedAt else { return nil } return Date( @@ -36,32 +39,32 @@ extension KotlinSyncStatusDataProtocol { ) ) } - + var downloadProgress: (any SyncDownloadProgress)? { guard let kotlinProgress = base.downloadProgress else { return nil } return KotlinSyncDownloadProgress(progress: kotlinProgress) } - + var hasSynced: Bool? { base.hasSynced?.boolValue } - + var uploadError: Any? { base.uploadError } - + var downloadError: Any? { base.downloadError } - + var anyError: Any? { base.anyError } - + public var priorityStatusEntries: [PriorityStatusEntry] { base.priorityStatusEntries.map { mapPriorityStatus($0) } } - + public func statusForPriority(_ priority: BucketPriority) -> PriorityStatusEntry { mapPriorityStatus( base.statusForPriority( @@ -69,7 +72,7 @@ extension KotlinSyncStatusDataProtocol { ) ) } - + private func mapPriorityStatus(_ status: PowerSyncKotlin.PriorityStatusEntry) -> PriorityStatusEntry { var lastSyncedAt: Date? if let syncedAt = status.lastSyncedAt { @@ -77,7 +80,7 @@ extension KotlinSyncStatusDataProtocol { timeIntervalSince1970: Double(syncedAt.epochSeconds) ) } - + return PriorityStatusEntry( priority: BucketPriority(status.priority), lastSyncedAt: lastSyncedAt, @@ -94,7 +97,7 @@ extension KotlinProgressWithOperationsProtocol { var totalOperations: Int32 { return base.totalOperations } - + var downloadedOperations: Int32 { return base.downloadedOperations } @@ -106,11 +109,11 @@ struct KotlinProgressWithOperations: KotlinProgressWithOperationsProtocol { struct KotlinSyncDownloadProgress: KotlinProgressWithOperationsProtocol, SyncDownloadProgress { let progress: PowerSyncKotlin.SyncDownloadProgress - + var base: any PowerSyncKotlin.ProgressWithOperations { progress } - + func untilPriority(priority: BucketPriority) -> any ProgressWithOperations { return KotlinProgressWithOperations(base: progress.untilPriority(priority: priority.priorityCode)) } diff --git a/Sources/PowerSync/Kotlin/wrapQueryCursor.swift b/Sources/PowerSync/Kotlin/wrapQueryCursor.swift index 05a99ca..ebdd33a 100644 --- a/Sources/PowerSync/Kotlin/wrapQueryCursor.swift +++ b/Sources/PowerSync/Kotlin/wrapQueryCursor.swift @@ -14,9 +14,9 @@ import PowerSyncKotlin /// and other functionality—without modifying the public `PowerSyncDatabase` API to include /// Swift-specific logic. func wrapQueryCursor( - mapper: @escaping (SqlCursor) throws -> RowType, + mapper: @Sendable @escaping (SqlCursor) throws -> RowType, // The Kotlin APIs return the results as Any, we can explicitly cast internally - executor: @escaping (_ wrappedMapper: @escaping (PowerSyncKotlin.SqlCursor) -> RowType?) throws -> ReturnType + executor: @Sendable @escaping (_ wrappedMapper: @escaping (PowerSyncKotlin.SqlCursor) -> RowType?) throws -> ReturnType ) throws -> ReturnType { var mapperException: Error? @@ -36,7 +36,7 @@ func wrapQueryCursor( } let executionResult = try executor(wrappedMapper) - + if let mapperException { // Allow propagating the error throw mapperException @@ -45,15 +45,14 @@ func wrapQueryCursor( return executionResult } - -func wrapQueryCursorTyped( - mapper: @escaping (SqlCursor) throws -> RowType, +func wrapQueryCursorTyped( + mapper: @Sendable @escaping (SqlCursor) throws -> RowType, // The Kotlin APIs return the results as Any, we can explicitly cast internally - executor: @escaping (_ wrappedMapper: @escaping (PowerSyncKotlin.SqlCursor) -> RowType?) throws -> Any?, + executor: @Sendable @escaping (_ wrappedMapper: @escaping (PowerSyncKotlin.SqlCursor) -> Any?) throws -> Any?, resultType: ReturnType.Type ) throws -> ReturnType { return try safeCast( - wrapQueryCursor( + wrapQueryCursor( mapper: mapper, executor: executor ), to: @@ -61,7 +60,6 @@ func wrapQueryCursorTyped( ) } - /// Throws a `PowerSyncException` using a helper provided by the Kotlin SDK. /// We can't directly throw Kotlin `PowerSyncException`s from Swift, but we can delegate the throwing /// to the Kotlin implementation. @@ -73,7 +71,7 @@ func wrapQueryCursorTyped( /// to any calling Kotlin stack. /// This only works for SKIEE methods which have an associated completion handler which handles annotated errors. /// This seems to only apply for Kotlin suspending function bindings. -func throwKotlinPowerSyncError (message: String, cause: String? = nil) throws { +func throwKotlinPowerSyncError(message: String, cause: String? = nil) throws { try throwPowerSyncException( exception: PowerSyncKotlin.PowerSyncException( message: message, diff --git a/Sources/PowerSync/Logger.swift b/Sources/PowerSync/Logger.swift index 988d013..cd1c06c 100644 --- a/Sources/PowerSync/Logger.swift +++ b/Sources/PowerSync/Logger.swift @@ -4,7 +4,6 @@ import OSLog /// /// This writer uses `os.Logger` on iOS/macOS/tvOS/watchOS 14+ and falls back to `print` for earlier versions. public class PrintLogWriter: LogWriterProtocol { - private let subsystem: String private let category: String private lazy var logger: Any? = { @@ -13,17 +12,18 @@ public class PrintLogWriter: LogWriterProtocol { } return nil }() - + /// Creates a new PrintLogWriter /// - Parameters: /// - subsystem: The subsystem identifier (typically reverse DNS notation of your app) /// - category: The category within your subsystem public init(subsystem: String = Bundle.main.bundleIdentifier ?? "com.powersync.logger", - category: String = "default") { + category: String = "default") + { self.subsystem = subsystem self.category = category } - + /// Logs a message with a given severity and optional tag. /// - Parameters: /// - severity: The severity level of the message. @@ -32,10 +32,10 @@ public class PrintLogWriter: LogWriterProtocol { public func log(severity: LogSeverity, message: String, tag: String?) { let tagPrefix = tag.map { !$0.isEmpty ? "[\($0)] " : "" } ?? "" let formattedMessage = "\(tagPrefix)\(message)" - + if #available(iOS 14.0, macOS 11.0, tvOS 14.0, watchOS 7.0, *) { guard let logger = logger as? Logger else { return } - + switch severity { case .info: logger.info("\(formattedMessage, privacy: .public)") @@ -54,58 +54,65 @@ public class PrintLogWriter: LogWriterProtocol { } } +/// A default logger configuration that uses `PrintLogWriter` and filters messages by minimum severity. +public final class DefaultLogger: LoggerProtocol, + // The shared state is guarded by the DispatchQueue + @unchecked Sendable +{ + private var minSeverity: LogSeverity + private var writers: [any LogWriterProtocol] + private let queue = DispatchQueue(label: "DefaultLogger.queue") - -/// A default logger configuration that uses `PrintLogWritter` and filters messages by minimum severity. -public class DefaultLogger: LoggerProtocol { - public var minSeverity: LogSeverity - public var writers: [any LogWriterProtocol] - /// Initializes the default logger with an optional minimum severity level. /// /// - Parameters /// - minSeverity: The minimum severity level to log. Defaults to `.debug`. /// - writers: Optional writers which logs should be written to. Defaults to a `PrintLogWriter`. - public init(minSeverity: LogSeverity = .debug, writers: [any LogWriterProtocol]? = nil ) { - self.writers = writers ?? [ PrintLogWriter() ] + public init(minSeverity: LogSeverity = .debug, writers: [any LogWriterProtocol]? = nil) { + self.writers = writers ?? [PrintLogWriter()] self.minSeverity = minSeverity } - - public func setWriters(_ writters: [any LogWriterProtocol]) { - self.writers = writters + + public func setWriters(_ writers: [any LogWriterProtocol]) { + queue.sync { + self.writers = writers + } } - + public func setMinSeverity(_ severity: LogSeverity) { - self.minSeverity = severity + queue.sync { + minSeverity = severity + } } - - + public func debug(_ message: String, tag: String? = nil) { - self.writeLog(message, severity: LogSeverity.debug, tag: tag) + writeLog(message, severity: LogSeverity.debug, tag: tag) } - + public func error(_ message: String, tag: String? = nil) { - self.writeLog(message, severity: LogSeverity.error, tag: tag) + writeLog(message, severity: LogSeverity.error, tag: tag) } - + public func info(_ message: String, tag: String? = nil) { - self.writeLog(message, severity: LogSeverity.info, tag: tag) + writeLog(message, severity: LogSeverity.info, tag: tag) } - + public func warning(_ message: String, tag: String? = nil) { - self.writeLog(message, severity: LogSeverity.warning, tag: tag) + writeLog(message, severity: LogSeverity.warning, tag: tag) } - + public func fault(_ message: String, tag: String? = nil) { - self.writeLog(message, severity: LogSeverity.fault, tag: tag) + writeLog(message, severity: LogSeverity.fault, tag: tag) } - + private func writeLog(_ message: String, severity: LogSeverity, tag: String?) { - if (severity.rawValue < self.minSeverity.rawValue) { + let currentSeverity = queue.sync { minSeverity } + + if severity.rawValue < currentSeverity.rawValue { return } - - for writer in self.writers { + + for writer in writers { writer.log(severity: severity, message: message, tag: tag) } } diff --git a/Sources/PowerSync/PowerSyncCredentials.swift b/Sources/PowerSync/PowerSyncCredentials.swift index b1e1d27..1de8bae 100644 --- a/Sources/PowerSync/PowerSyncCredentials.swift +++ b/Sources/PowerSync/PowerSyncCredentials.swift @@ -1,10 +1,9 @@ import Foundation - /// /// Temporary credentials to connect to the PowerSync service. /// -public struct PowerSyncCredentials: Codable { +public struct PowerSyncCredentials: Codable, Sendable { /// PowerSync endpoint, e.g. "https://myinstance.powersync.co". public let endpoint: String @@ -14,17 +13,18 @@ public struct PowerSyncCredentials: Codable { /// User ID. @available(*, deprecated, message: "This value is not used anymore.") public let userId: String? = nil - + enum CodingKeys: String, CodingKey { - case endpoint - case token - } + case endpoint + case token + } @available(*, deprecated, message: "Use init(endpoint:token:) instead. `userId` is no longer used.") public init( endpoint: String, token: String, - userId: String? = nil) { + userId _: String? = nil + ) { self.endpoint = endpoint self.token = token } @@ -34,12 +34,12 @@ public struct PowerSyncCredentials: Codable { self.token = token } - internal init(kotlin: KotlinPowerSyncCredentials) { - self.endpoint = kotlin.endpoint - self.token = kotlin.token + init(kotlin: KotlinPowerSyncCredentials) { + endpoint = kotlin.endpoint + token = kotlin.token } - internal var kotlinCredentials: KotlinPowerSyncCredentials { + var kotlinCredentials: KotlinPowerSyncCredentials { return KotlinPowerSyncCredentials(endpoint: endpoint, token: token, userId: nil) } diff --git a/Sources/PowerSync/Protocol/LoggerProtocol.swift b/Sources/PowerSync/Protocol/LoggerProtocol.swift index f2c3396..2169f86 100644 --- a/Sources/PowerSync/Protocol/LoggerProtocol.swift +++ b/Sources/PowerSync/Protocol/LoggerProtocol.swift @@ -1,4 +1,4 @@ -public enum LogSeverity: Int, CaseIterable { +public enum LogSeverity: Int, CaseIterable, Sendable { /// Detailed information typically used for debugging. case debug = 0 @@ -47,35 +47,35 @@ public protocol LogWriterProtocol { /// A protocol defining the interface for a logger that supports severity filtering and multiple writers. /// /// Conformers provide logging APIs and manage attached log writers. -public protocol LoggerProtocol { +public protocol LoggerProtocol: Sendable { /// Logs an informational message. /// /// - Parameters: /// - message: The content of the log message. /// - tag: An optional tag to categorize the message. func info(_ message: String, tag: String?) - + /// Logs an error message. /// /// - Parameters: /// - message: The content of the log message. /// - tag: An optional tag to categorize the message. func error(_ message: String, tag: String?) - + /// Logs a debug message. /// /// - Parameters: /// - message: The content of the log message. /// - tag: An optional tag to categorize the message. func debug(_ message: String, tag: String?) - + /// Logs a warning message. /// /// - Parameters: /// - message: The content of the log message. /// - tag: An optional tag to categorize the message. func warning(_ message: String, tag: String?) - + /// Logs a fault message, typically used for critical system-level failures. /// /// - Parameters: diff --git a/Sources/PowerSync/Protocol/PowerSyncBackendConnector.swift b/Sources/PowerSync/Protocol/PowerSyncBackendConnector.swift index 87fda9a..7879620 100644 --- a/Sources/PowerSync/Protocol/PowerSyncBackendConnector.swift +++ b/Sources/PowerSync/Protocol/PowerSyncBackendConnector.swift @@ -1,4 +1,11 @@ -public protocol PowerSyncBackendConnectorProtocol { + +/// Implement this to connect an app backend. +/// +/// The connector is responsible for: +/// 1. Creating credentials for connecting to the PowerSync service. +/// 2. Applying local changes against the backend application server. +/// +public protocol PowerSyncBackendConnectorProtocol: Sendable { /// /// Get credentials for PowerSync. /// @@ -22,19 +29,16 @@ public protocol PowerSyncBackendConnectorProtocol { func uploadData(database: PowerSyncDatabaseProtocol) async throws } -/// Implement this to connect an app backend. -/// -/// The connector is responsible for: -/// 1. Creating credentials for connecting to the PowerSync service. -/// 2. Applying local changes against the backend application server. -/// -/// -open class PowerSyncBackendConnector: PowerSyncBackendConnectorProtocol { +@available(*, deprecated, message: "PowerSyncBackendConnector is deprecated. Please implement PowerSyncBackendConnectorProtocol directly in your own class.") +open class PowerSyncBackendConnector: PowerSyncBackendConnectorProtocol, + // This class is non-final, implementations should strictly conform to Sendable + @unchecked Sendable +{ public init() {} open func fetchCredentials() async throws -> PowerSyncCredentials? { return nil } - open func uploadData(database: PowerSyncDatabaseProtocol) async throws {} + open func uploadData(database _: PowerSyncDatabaseProtocol) async throws {} } diff --git a/Sources/PowerSync/Protocol/PowerSyncDatabaseProtocol.swift b/Sources/PowerSync/Protocol/PowerSyncDatabaseProtocol.swift index 0edde56..8493504 100644 --- a/Sources/PowerSync/Protocol/PowerSyncDatabaseProtocol.swift +++ b/Sources/PowerSync/Protocol/PowerSyncDatabaseProtocol.swift @@ -12,7 +12,7 @@ public struct SyncClientConfiguration { /// /// - SeeAlso: `SyncRequestLoggerConfiguration` for configuration options public let requestLogger: SyncRequestLoggerConfiguration? - + /// Creates a new sync client configuration. /// - Parameter requestLogger: Optional network logger configuration public init(requestLogger: SyncRequestLoggerConfiguration? = nil) { @@ -26,10 +26,10 @@ public struct SyncClientConfiguration { public struct ConnectOptions { /// Defaults to 1 second public static let DefaultCrudThrottle: TimeInterval = 1 - + /// Defaults to 5 seconds public static let DefaultRetryDelay: TimeInterval = 5 - + /// TimeInterval (in seconds) between CRUD (Create, Read, Update, Delete) operations. /// /// Default is ``ConnectOptions/DefaultCrudThrottle``. @@ -54,14 +54,14 @@ public struct ConnectOptions { /// ] /// ``` public var params: JsonParam - + /// Uses a new sync client implemented in Rust instead of the one implemented in Kotlin. /// /// The new client is more efficient and will become the default in the future, but is still marked as experimental for now. /// We encourage interested users to try the new client. @_spi(PowerSyncExperimental) public var newClientImplementation: Bool - + /// Configuration for the sync client used for PowerSync requests. /// /// Provides options to customize network behavior including logging of HTTP @@ -73,7 +73,7 @@ public struct ConnectOptions { /// /// - SeeAlso: `SyncClientConfiguration` for available configuration options public var clientConfiguration: SyncClientConfiguration? - + /// Initializes a `ConnectOptions` instance with optional values. /// /// - Parameters: @@ -90,10 +90,10 @@ public struct ConnectOptions { self.crudThrottle = crudThrottle self.retryDelay = retryDelay self.params = params - self.newClientImplementation = false + newClientImplementation = false self.clientConfiguration = clientConfiguration } - + /// Initializes a ``ConnectOptions`` instance with optional values, including experimental options. @_spi(PowerSyncExperimental) public init( @@ -118,25 +118,25 @@ public struct ConnectOptions { /// Use `PowerSyncDatabase.connect` to connect to the PowerSync service, to keep the local database in sync with the remote database. /// /// All changes to local tables are automatically recorded, whether connected or not. Once connected, the changes are uploaded. -public protocol PowerSyncDatabaseProtocol: Queries { +public protocol PowerSyncDatabaseProtocol: Queries, Sendable { /// The current sync status. var currentStatus: SyncStatus { get } - + /// Logger used for PowerSync operations var logger: any LoggerProtocol { get } - + /// Wait for the first sync to occur func waitForFirstSync() async throws - + /// Replace the schema with a new version. This is for advanced use cases - typically the schema /// should just be specified once in the constructor. /// /// Cannot be used while connected - this should only be called before connect. func updateSchema(schema: SchemaProtocol) async throws - + /// Wait for the first (possibly partial) sync to occur that contains all buckets in the given priority. func waitForFirstSync(priority: Int32) async throws - + /// Connects to the PowerSync service and keeps the local database in sync with the remote database. /// /// The connection is automatically re-opened if it fails for any reason. @@ -169,10 +169,10 @@ public protocol PowerSyncDatabaseProtocol: Queries { /// /// - Throws: An error if the connection fails or if the database is not properly configured. func connect( - connector: PowerSyncBackendConnector, + connector: PowerSyncBackendConnectorProtocol, options: ConnectOptions? ) async throws - + /// Get a batch of crud data to upload. /// /// Returns nil if there is no data to upload. @@ -188,7 +188,7 @@ public protocol PowerSyncDatabaseProtocol: Queries { /// data by transaction. One batch may contain data from multiple transactions, /// and a single transaction may be split over multiple batches. func getCrudBatch(limit: Int32) async throws -> CrudBatch? - + /// Get the next recorded transaction to upload. /// /// Returns nil if there is no data to upload. @@ -201,15 +201,15 @@ public protocol PowerSyncDatabaseProtocol: Queries { /// Unlike `getCrudBatch`, this only returns data from a single transaction at a time. /// All data for the transaction is loaded into memory. func getNextCrudTransaction() async throws -> CrudTransaction? - + /// Convenience method to get the current version of PowerSync. func getPowerSyncVersion() async throws -> String - + /// Close the sync connection. /// /// Use `connect` to connect again. func disconnect() async throws - + /// Disconnect and clear the database. /// Use this when logging out. /// The database can still be queried after this is called, but the tables @@ -217,7 +217,7 @@ public protocol PowerSyncDatabaseProtocol: Queries { /// /// - Parameter clearLocal: Set to false to preserve data in local-only tables. Defaults to `true`. func disconnectAndClear(clearLocal: Bool) async throws - + /// Close the database, releasing resources. /// Also disconnects any active connection. /// @@ -250,7 +250,7 @@ public extension PowerSyncDatabaseProtocol { /// params: params /// ) func connect( - connector: PowerSyncBackendConnector, + connector: PowerSyncBackendConnectorProtocol, crudThrottle: TimeInterval = 1, retryDelay: TimeInterval = 5, params: JsonParam = [:] @@ -264,11 +264,11 @@ public extension PowerSyncDatabaseProtocol { ) ) } - + func disconnectAndClear() async throws { try await disconnectAndClear(clearLocal: true) } - + func getCrudBatch(limit: Int32 = 100) async throws -> CrudBatch? { try await getCrudBatch( limit: limit diff --git a/Sources/PowerSync/Protocol/QueriesProtocol.swift b/Sources/PowerSync/Protocol/QueriesProtocol.swift index 1e94702..0f315f4 100644 --- a/Sources/PowerSync/Protocol/QueriesProtocol.swift +++ b/Sources/PowerSync/Protocol/QueriesProtocol.swift @@ -3,16 +3,16 @@ import Foundation public let DEFAULT_WATCH_THROTTLE: TimeInterval = 0.03 // 30ms -public struct WatchOptions { +public struct WatchOptions: Sendable { public var sql: String - public var parameters: [Any?] + public var parameters: [Sendable?] public var throttle: TimeInterval - public var mapper: (SqlCursor) throws -> RowType + public var mapper: @Sendable (SqlCursor) throws -> RowType public init( - sql: String, parameters: [Any?]? = [], + sql: String, parameters: [Sendable?]? = [], throttle: TimeInterval? = DEFAULT_WATCH_THROTTLE, - mapper: @escaping (SqlCursor) throws -> RowType + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) { self.sql = sql self.parameters = parameters ?? [] @@ -25,37 +25,37 @@ public protocol Queries { /// Execute a write query (INSERT, UPDATE, DELETE) /// Using `RETURNING *` will result in an error. @discardableResult - func execute(sql: String, parameters: [Any?]?) async throws -> Int64 + func execute(sql: String, parameters: [Sendable?]?) async throws -> Int64 /// Execute a read-only (SELECT) query and return a single result. /// If there is no result, throws an IllegalArgumentException. /// See `getOptional` for queries where the result might be empty. func get( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) async throws -> RowType /// Execute a read-only (SELECT) query and return the results. func getAll( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) async throws -> [RowType] /// Execute a read-only (SELECT) query and return a single optional result. func getOptional( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) async throws -> RowType? /// Execute a read-only (SELECT) query every time the source tables are modified /// and return the results as an array in a Publisher. func watch( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) throws -> AsyncThrowingStream<[RowType], Error> func watch( @@ -66,7 +66,7 @@ public protocol Queries { /// /// In most cases, [writeTransaction] should be used instead. func writeLock( - callback: @escaping (any ConnectionContext) throws -> R + callback: @Sendable @escaping (any ConnectionContext) throws -> R ) async throws -> R /// Takes a read lock, without starting a transaction. @@ -74,17 +74,17 @@ public protocol Queries { /// The lock only applies to a single connection, and multiple /// connections may hold read locks at the same time. func readLock( - callback: @escaping (any ConnectionContext) throws -> R + callback: @Sendable @escaping (any ConnectionContext) throws -> R ) async throws -> R /// Execute a write transaction with the given callback func writeTransaction( - callback: @escaping (any Transaction) throws -> R + callback: @Sendable @escaping (any Transaction) throws -> R ) async throws -> R /// Execute a read transaction with the given callback func readTransaction( - callback: @escaping (any Transaction) throws -> R + callback: @Sendable @escaping (any Transaction) throws -> R ) async throws -> R } @@ -96,29 +96,29 @@ public extension Queries { func get( _ sql: String, - mapper: @escaping (SqlCursor) throws -> RowType + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) async throws -> RowType { return try await get(sql: sql, parameters: [], mapper: mapper) } func getAll( _ sql: String, - mapper: @escaping (SqlCursor) throws -> RowType + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) async throws -> [RowType] { return try await getAll(sql: sql, parameters: [], mapper: mapper) } func getOptional( _ sql: String, - mapper: @escaping (SqlCursor) throws -> RowType + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) async throws -> RowType? { return try await getOptional(sql: sql, parameters: [], mapper: mapper) } func watch( _ sql: String, - mapper: @escaping (SqlCursor) throws -> RowType + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) throws -> AsyncThrowingStream<[RowType], Error> { - return try watch(sql: sql, parameters: [Any?](), mapper: mapper) + return try watch(sql: sql, parameters: [Sendable?](), mapper: mapper) } } diff --git a/Sources/PowerSync/Protocol/db/ConnectionContext.swift b/Sources/PowerSync/Protocol/db/ConnectionContext.swift index 13dd939..4f904d1 100644 --- a/Sources/PowerSync/Protocol/db/ConnectionContext.swift +++ b/Sources/PowerSync/Protocol/db/ConnectionContext.swift @@ -1,71 +1,71 @@ import Foundation -public protocol ConnectionContext { +public protocol ConnectionContext: Sendable { /** Executes a SQL statement with optional parameters. - + - Parameters: - sql: The SQL statement to execute - parameters: Optional list of parameters for the SQL statement - + - Returns: A value indicating the number of rows affected - + - Throws: PowerSyncError if execution fails */ @discardableResult - func execute(sql: String, parameters: [Any?]?) throws -> Int64 - + func execute(sql: String, parameters: [Sendable?]?) throws -> Int64 + /** Retrieves an optional value from the database using the provided SQL query. - + - Parameters: - sql: The SQL query to execute - parameters: Optional list of parameters for the SQL query - mapper: A closure that maps the SQL cursor result to the desired type - + - Returns: An optional value of type RowType or nil if no result - + - Throws: PowerSyncError if the query fails */ - func getOptional( + func getOptional( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) throws -> RowType? - + /** Retrieves all matching rows from the database using the provided SQL query. - + - Parameters: - sql: The SQL query to execute - parameters: Optional list of parameters for the SQL query - mapper: A closure that maps each SQL cursor result to the desired type - + - Returns: An array of RowType objects - + - Throws: PowerSyncError if the query fails */ - func getAll( + func getAll( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) throws -> [RowType] - + /** Retrieves a single value from the database using the provided SQL query. - + - Parameters: - sql: The SQL query to execute - parameters: Optional list of parameters for the SQL query - mapper: A closure that maps the SQL cursor result to the desired type - + - Returns: A value of type RowType - + - Throws: PowerSyncError if the query fails or no result is found */ - func get( + func get( sql: String, - parameters: [Any?]?, - mapper: @escaping (SqlCursor) throws -> RowType + parameters: [Sendable?]?, + mapper: @Sendable @escaping (SqlCursor) throws -> RowType ) throws -> RowType } diff --git a/Sources/PowerSync/Protocol/sync/BucketPriority.swift b/Sources/PowerSync/Protocol/sync/BucketPriority.swift index 0ff8a1d..6b0f677 100644 --- a/Sources/PowerSync/Protocol/sync/BucketPriority.swift +++ b/Sources/PowerSync/Protocol/sync/BucketPriority.swift @@ -1,7 +1,7 @@ import Foundation /// Represents the priority of a bucket, used for sorting and managing operations based on priority levels. -public struct BucketPriority: Comparable { +public struct BucketPriority: Comparable, Sendable { /// The priority code associated with the bucket. Higher values indicate lower priority. public let priorityCode: Int32 diff --git a/Sources/PowerSync/Protocol/sync/SyncStatusData.swift b/Sources/PowerSync/Protocol/sync/SyncStatusData.swift index 66b836a..f4b5a4d 100644 --- a/Sources/PowerSync/Protocol/sync/SyncStatusData.swift +++ b/Sources/PowerSync/Protocol/sync/SyncStatusData.swift @@ -1,7 +1,7 @@ import Foundation /// A protocol representing the synchronization status of a system, providing various indicators and error states. -public protocol SyncStatusData { +public protocol SyncStatusData: Sendable { /// Indicates whether the system is currently connected. var connected: Bool { get } @@ -12,7 +12,7 @@ public protocol SyncStatusData { var downloading: Bool { get } /// Realtime progress information about downloaded operations during an active sync. - /// + /// /// For more information on what progress is reported, see ``SyncDownloadProgress``. /// This value will be non-null only if ``downloading`` is `true`. var downloadProgress: SyncDownloadProgress? { get } @@ -50,7 +50,7 @@ public protocol SyncStatusData { } /// A protocol extending `SyncStatusData` to include flow-based updates for synchronization status. -public protocol SyncStatus: SyncStatusData { +public protocol SyncStatus: SyncStatusData, Sendable { /// Provides a flow of synchronization status updates. /// - Returns: An `AsyncStream` that emits updates whenever the synchronization status changes. func asFlow() -> AsyncStream diff --git a/Sources/PowerSync/attachments/Attachment.swift b/Sources/PowerSync/attachments/Attachment.swift index 42ad8f5..a14a27f 100644 --- a/Sources/PowerSync/attachments/Attachment.swift +++ b/Sources/PowerSync/attachments/Attachment.swift @@ -1,5 +1,5 @@ /// Enum representing the state of an attachment -public enum AttachmentState: Int { +public enum AttachmentState: Int, Sendable { /// The attachment has been queued for download from the cloud storage case queuedDownload /// The attachment has been queued for upload to the cloud storage @@ -24,7 +24,7 @@ public enum AttachmentState: Int { } /// Struct representing an attachment -public struct Attachment { +public struct Attachment: Sendable { /// Unique identifier for the attachment public let id: String @@ -91,7 +91,7 @@ public struct Attachment { func with( filename: String? = nil, state: AttachmentState? = nil, - timestamp : Int = 0, + timestamp: Int = 0, hasSynced: Bool? = nil, localUri: String?? = .none, mediaType: String?? = .none, @@ -110,20 +110,19 @@ public struct Attachment { metaData: resolveOverride(metaData, current: self.metaData) ) } - + /// Resolves double optionals /// if a non nil value is provided: the override will be used /// if .some(nil) is provided: The value will be set to nil /// // if nil is provided: the current value will be preserved private func resolveOverride(_ override: T??, current: T?) -> T? { if let value = override { - return value // could be nil (explicit clear) or a value + return value // could be nil (explicit clear) or a value } else { - return current // not provided, use current + return current // not provided, use current } } - /// Constructs an `Attachment` from a `SqlCursor`. /// /// - Parameter cursor: The `SqlCursor` containing the attachment data. diff --git a/Sources/PowerSync/attachments/AttachmentContext.swift b/Sources/PowerSync/attachments/AttachmentContext.swift index 394d028..6ee3b53 100644 --- a/Sources/PowerSync/attachments/AttachmentContext.swift +++ b/Sources/PowerSync/attachments/AttachmentContext.swift @@ -1,66 +1,80 @@ import Foundation -/// Context which performs actions on the attachment records -open class AttachmentContext { - private let db: any PowerSyncDatabaseProtocol - private let tableName: String - private let logger: any LoggerProtocol - private let logTag = "AttachmentService" - private let maxArchivedCount: Int64 - - /// Table used for storing attachments in the attachment queue. - private var table: String { - return tableName - } - - /// Initializes a new `AttachmentContext`. - public init( - db: PowerSyncDatabaseProtocol, - tableName: String, - logger: any LoggerProtocol, - maxArchivedCount: Int64 - ) { - self.db = db - self.tableName = tableName - self.logger = logger - self.maxArchivedCount = maxArchivedCount - } +public protocol AttachmentContextProtocol: Sendable { + var db: any PowerSyncDatabaseProtocol { get } + var tableName: String { get } + var logger: any LoggerProtocol { get } + var maxArchivedCount: Int64 { get } /// Deletes the attachment from the attachment queue. - public func deleteAttachment(id: String) async throws { + func deleteAttachment(id: String) async throws + + /// Sets the state of the attachment to ignored (archived). + func ignoreAttachment(id: String) async throws + + /// Gets the attachment from the attachment queue using an ID. + func getAttachment(id: String) async throws -> Attachment? + + /// Saves the attachment to the attachment queue. + func saveAttachment(attachment: Attachment) async throws -> Attachment + + /// Saves multiple attachments to the attachment queue. + func saveAttachments(attachments: [Attachment]) async throws + + /// Gets all the IDs of attachments in the attachment queue. + func getAttachmentIds() async throws -> [String] + + /// Gets all attachments in the attachment queue. + func getAttachments() async throws -> [Attachment] + + /// Gets all active attachments that require an operation to be performed. + func getActiveAttachments() async throws -> [Attachment] + + /// Deletes attachments that have been archived. + /// + /// - Parameter callback: A callback invoked with the list of archived attachments before deletion. + /// - Returns: `true` if all items have been deleted, `false` if there may be more archived items remaining. + func deleteArchivedAttachments( + callback: @Sendable @escaping ([Attachment]) async throws -> Void + ) async throws -> Bool + + /// Clears the attachment queue. + /// + /// - Note: Currently only used for testing purposes. + func clearQueue() async throws +} + +public extension AttachmentContextProtocol { + func deleteAttachment(id: String) async throws { _ = try await db.execute( - sql: "DELETE FROM \(table) WHERE id = ?", + sql: "DELETE FROM \(tableName) WHERE id = ?", parameters: [id] ) } - /// Sets the state of the attachment to ignored (archived). - public func ignoreAttachment(id: String) async throws { + func ignoreAttachment(id: String) async throws { _ = try await db.execute( - sql: "UPDATE \(table) SET state = ? WHERE id = ?", + sql: "UPDATE \(tableName) SET state = ? WHERE id = ?", parameters: [AttachmentState.archived.rawValue, id] ) } - /// Gets the attachment from the attachment queue using an ID. - public func getAttachment(id: String) async throws -> Attachment? { + func getAttachment(id: String) async throws -> Attachment? { return try await db.getOptional( - sql: "SELECT * FROM \(table) WHERE id = ?", + sql: "SELECT * FROM \(tableName) WHERE id = ?", parameters: [id] ) { cursor in try Attachment.fromCursor(cursor) } } - /// Saves the attachment to the attachment queue. - public func saveAttachment(attachment: Attachment) async throws -> Attachment { + func saveAttachment(attachment: Attachment) async throws -> Attachment { return try await db.writeTransaction { ctx in try self.upsertAttachment(attachment, context: ctx) } } - /// Saves multiple attachments to the attachment queue. - public func saveAttachments(attachments: [Attachment]) async throws { + func saveAttachments(attachments: [Attachment]) async throws { if attachments.isEmpty { return } @@ -72,24 +86,22 @@ open class AttachmentContext { } } - /// Gets all the IDs of attachments in the attachment queue. - public func getAttachmentIds() async throws -> [String] { + func getAttachmentIds() async throws -> [String] { return try await db.getAll( - sql: "SELECT id FROM \(table) WHERE id IS NOT NULL", + sql: "SELECT id FROM \(tableName) WHERE id IS NOT NULL", parameters: [] ) { cursor in try cursor.getString(name: "id") } } - /// Gets all attachments in the attachment queue. - public func getAttachments() async throws -> [Attachment] { + func getAttachments() async throws -> [Attachment] { return try await db.getAll( sql: """ SELECT * FROM - \(table) + \(tableName) WHERE id IS NOT NULL ORDER BY @@ -101,14 +113,13 @@ open class AttachmentContext { } } - /// Gets all active attachments that require an operation to be performed. - public func getActiveAttachments() async throws -> [Attachment] { + func getActiveAttachments() async throws -> [Attachment] { return try await db.getAll( sql: """ SELECT * FROM - \(table) + \(tableName) WHERE state = ? OR state = ? @@ -126,25 +137,18 @@ open class AttachmentContext { } } - /// Clears the attachment queue. - /// - /// - Note: Currently only used for testing purposes. - public func clearQueue() async throws { - _ = try await db.execute("DELETE FROM \(table)") + func clearQueue() async throws { + _ = try await db.execute("DELETE FROM \(tableName)") } - /// Deletes attachments that have been archived. - /// - /// - Parameter callback: A callback invoked with the list of archived attachments before deletion. - /// - Returns: `true` if all items have been deleted, `false` if there may be more archived items remaining. - public func deleteArchivedAttachments(callback: @escaping ([Attachment]) async throws -> Void) async throws -> Bool { + func deleteArchivedAttachments(callback: @Sendable @escaping ([Attachment]) async throws -> Void) async throws -> Bool { let limit = 1000 let attachments = try await db.getAll( sql: """ SELECT * FROM - \(table) + \(tableName) WHERE state = ? ORDER BY @@ -154,7 +158,7 @@ open class AttachmentContext { parameters: [ AttachmentState.archived.rawValue, limit, - maxArchivedCount, + maxArchivedCount ] ) { cursor in try Attachment.fromCursor(cursor) @@ -166,7 +170,7 @@ open class AttachmentContext { let idsString = String(data: ids, encoding: .utf8)! _ = try await db.execute( - sql: "DELETE FROM \(table) WHERE id IN (SELECT value FROM json_each(?));", + sql: "DELETE FROM \(tableName) WHERE id IN (SELECT value FROM json_each(?));", parameters: [idsString] ) @@ -179,7 +183,7 @@ open class AttachmentContext { /// - attachment: The attachment to upsert. /// - context: The database transaction context. /// - Returns: The original attachment. - public func upsertAttachment( + func upsertAttachment( _ attachment: Attachment, context: ConnectionContext ) throws -> Attachment { @@ -198,7 +202,7 @@ open class AttachmentContext { try context.execute( sql: """ INSERT OR REPLACE INTO - \(table) (id, timestamp, filename, local_uri, media_type, size, state, has_synced, meta_data) + \(tableName) (id, timestamp, filename, local_uri, media_type, size, state, has_synced, meta_data) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, @@ -218,3 +222,23 @@ open class AttachmentContext { return attachment } } + +/// Context which performs actions on the attachment records +public actor AttachmentContext: AttachmentContextProtocol { + public let db: any PowerSyncDatabaseProtocol + public let tableName: String + public let logger: any LoggerProtocol + public let maxArchivedCount: Int64 + + public init( + db: PowerSyncDatabaseProtocol, + tableName: String, + logger: any LoggerProtocol, + maxArchivedCount: Int64 + ) { + self.db = db + self.tableName = tableName + self.logger = logger + self.maxArchivedCount = maxArchivedCount + } +} diff --git a/Sources/PowerSync/attachments/AttachmentQueue.swift b/Sources/PowerSync/attachments/AttachmentQueue.swift index 857d998..64c607e 100644 --- a/Sources/PowerSync/attachments/AttachmentQueue.swift +++ b/Sources/PowerSync/attachments/AttachmentQueue.swift @@ -1,12 +1,235 @@ import Combine import Foundation +/// Default name of the attachments table +public let defaultAttachmentsTableName = "attachments" + +public protocol AttachmentQueueProtocol: Sendable { + var db: any PowerSyncDatabaseProtocol { get } + var attachmentsService: any AttachmentServiceProtocol { get } + var localStorage: any LocalStorageAdapter { get } + var downloadAttachments: Bool { get } + + /// Starts the attachment sync process + func startSync() async throws + + /// Stops active syncing tasks. Syncing can be resumed with ``startSync()`` + func stopSyncing() async throws + + /// Closes the attachment queue and cancels all sync tasks + func close() async throws + + /// Resolves the filename for a new attachment + /// - Parameters: + /// - attachmentId: Attachment ID + /// - fileExtension: File extension + /// - Returns: Resolved filename + func resolveNewAttachmentFilename( + attachmentId: String, + fileExtension: String? + ) async -> String + + /// Processes watched attachment items and updates sync state + /// - Parameter items: List of watched attachment items + func processWatchedAttachments(items: [WatchedAttachmentItem]) async throws + + /// Saves a new file and schedules it for upload + /// - Parameters: + /// - data: File data + /// - mediaType: MIME type + /// - fileExtension: File extension + /// - updateHook: Hook to assign attachment relationships in the same transaction + /// - Returns: The created attachment + @discardableResult + func saveFile( + data: Data, + mediaType: String, + fileExtension: String?, + updateHook: @Sendable @escaping (ConnectionContext, Attachment) throws -> Void + ) async throws -> Attachment + + /// Queues a file for deletion + /// - Parameters: + /// - attachmentId: ID of the attachment to delete + /// - updateHook: Hook to perform additional DB updates in the same transaction + @discardableResult + func deleteFile( + attachmentId: String, + updateHook: @Sendable @escaping (ConnectionContext, Attachment) throws -> Void + ) async throws -> Attachment + + /// Returns the local URI where a file is stored based on filename + /// - Parameter filename: The name of the file + /// - Returns: The file path + @Sendable func getLocalUri(_ filename: String) async -> String + + /// Removes all archived items + func expireCache() async throws + + /// Clears the attachment queue and deletes all attachment files + func clearQueue() async throws +} + +public extension AttachmentQueueProtocol { + func resolveNewAttachmentFilename( + attachmentId: String, + fileExtension: String? + ) -> String { + return "\(attachmentId).\(fileExtension ?? "attachment")" + } + + @discardableResult + func saveFile( + data: Data, + mediaType: String, + fileExtension: String?, + updateHook: @Sendable @escaping (ConnectionContext, Attachment) throws -> Void + ) async throws -> Attachment { + let id = try await db.get(sql: "SELECT uuid() as id", parameters: [], mapper: { cursor in + try cursor.getString(name: "id") + }) + + let filename = await resolveNewAttachmentFilename(attachmentId: id, fileExtension: fileExtension) + let localUri = await getLocalUri(filename) + + // Write the file to the filesystem + let fileSize = try await localStorage.saveFile(filePath: localUri, data: data) + + return try await attachmentsService.withContext { context in + // Start a write transaction. The attachment record and relevant local relationship + // assignment should happen in the same transaction. + try await db.writeTransaction { tx in + let attachment = Attachment( + id: id, + filename: filename, + state: AttachmentState.queuedUpload, + localUri: localUri, + mediaType: mediaType, + size: fileSize + ) + + // Allow consumers to set relationships to this attachment id + try updateHook(tx, attachment) + + return try context.upsertAttachment(attachment, context: tx) + } + } + } + + @discardableResult + func deleteFile( + attachmentId: String, + updateHook: @Sendable @escaping (ConnectionContext, Attachment) throws -> Void + ) async throws -> Attachment { + try await attachmentsService.withContext { context in + guard let attachment = try await context.getAttachment(id: attachmentId) else { + throw PowerSyncAttachmentError.notFound("Attachment record with id \(attachmentId) was not found.") + } + + let result = try await self.db.writeTransaction { transaction in + try updateHook(transaction, attachment) + + let updatedAttachment = Attachment( + id: attachment.id, + filename: attachment.filename, + state: AttachmentState.queuedDelete, + hasSynced: attachment.hasSynced, + localUri: attachment.localUri, + mediaType: attachment.mediaType, + size: attachment.size + ) + + return try context.upsertAttachment(updatedAttachment, context: transaction) + } + return result + } + } + + func processWatchedAttachments(items: [WatchedAttachmentItem]) async throws { + // Need to get all the attachments which are tracked in the DB. + // We might need to restore an archived attachment. + try await attachmentsService.withContext { context in + let currentAttachments = try await context.getAttachments() + var attachmentUpdates = [Attachment]() + + for item in items { + guard let existingQueueItem = currentAttachments.first(where: { $0.id == item.id }) else { + // Item is not present in the queue + + if !downloadAttachments { + continue + } + + // This item should be added to the queue + let filename = await resolveNewAttachmentFilename( + attachmentId: item.id, + fileExtension: item.fileExtension + ) + + attachmentUpdates.append( + Attachment( + id: item.id, + filename: filename, + state: .queuedDownload, + hasSynced: false + ) + ) + continue + } + + if existingQueueItem.state == AttachmentState.archived { + // The attachment is present again. Need to queue it for sync. + // We might be able to optimize this in future + if existingQueueItem.hasSynced == true { + // No remote action required, we can restore the record (avoids deletion) + attachmentUpdates.append( + existingQueueItem.with(state: AttachmentState.synced) + ) + } else { + // The localURI should be set if the record was meant to be downloaded + // and has been synced. If it's missing and hasSynced is false then + // it must be an upload operation + let newState = existingQueueItem.localUri == nil ? + AttachmentState.queuedDownload : + AttachmentState.queuedUpload + + attachmentUpdates.append( + existingQueueItem.with(state: newState) + ) + } + } + } + + for attachment in currentAttachments { + let notInWatchedItems = items.first(where: { $0.id == attachment.id }) == nil + if notInWatchedItems { + switch attachment.state { + case .queuedDelete, .queuedUpload: + // Only archive if it has synced + if attachment.hasSynced == true { + attachmentUpdates.append( + attachment.with(state: .archived) + ) + } + default: + // Archive other states such as QUEUED_DOWNLOAD + attachmentUpdates.append( + attachment.with(state: .archived) + ) + } + } + } + + if !attachmentUpdates.isEmpty { + try await context.saveAttachments(attachments: attachmentUpdates) + } + } + } +} + /// Class used to implement the attachment queue /// Requires a PowerSyncDatabase, a RemoteStorageAdapter implementation, and a directory name for attachments. -open class AttachmentQueue { - /// Default name of the attachments table - public static let defaultTableName = "attachments" - +public actor AttachmentQueue: AttachmentQueueProtocol { let logTag = "AttachmentQueue" /// PowerSync database client @@ -19,7 +242,7 @@ open class AttachmentQueue { private let attachmentsDirectory: String /// Closure which creates a Stream of ``WatchedAttachmentItem`` - private let watchAttachments: () throws -> AsyncThrowingStream<[WatchedAttachmentItem], Error> + private let watchAttachments: @Sendable () throws -> AsyncThrowingStream<[WatchedAttachmentItem], Error> /// Local file system adapter public let localStorage: LocalStorageAdapter @@ -43,7 +266,7 @@ open class AttachmentQueue { private let subdirectories: [String]? /// Whether to allow downloading of attachments - private let downloadAttachments: Bool + public let downloadAttachments: Bool /** * Logging interface used for all log operations @@ -51,7 +274,7 @@ open class AttachmentQueue { public let logger: any LoggerProtocol /// Attachment service for interacting with attachment records - public let attachmentsService: AttachmentService + public let attachmentsService: AttachmentServiceProtocol private var syncStatusTask: Task? private var cancellables = Set() @@ -60,20 +283,9 @@ open class AttachmentQueue { public private(set) var closed: Bool = false /// Syncing service instance - private(set) lazy var syncingService: SyncingService = .init( - remoteStorage: self.remoteStorage, - localStorage: self.localStorage, - attachmentsService: self.attachmentsService, - logger: self.logger, - getLocalUri: { [weak self] filename in - guard let self = self else { return filename } - return self.getLocalUri(filename) - }, - errorHandler: self.errorHandler, - syncThrottle: self.syncThrottleDuration - ) - - private let lock: LockActor + public let syncingService: SyncingService + + private let _getLocalUri: @Sendable (_ filename: String) async -> String /// Initializes the attachment queue /// - Parameters match the stored properties @@ -81,16 +293,17 @@ open class AttachmentQueue { db: PowerSyncDatabaseProtocol, remoteStorage: RemoteStorageAdapter, attachmentsDirectory: String, - watchAttachments: @escaping () throws -> AsyncThrowingStream<[WatchedAttachmentItem], Error>, + watchAttachments: @Sendable @escaping () throws -> AsyncThrowingStream<[WatchedAttachmentItem], Error>, localStorage: LocalStorageAdapter = FileManagerStorageAdapter(), - attachmentsQueueTableName: String = defaultTableName, + attachmentsQueueTableName: String = defaultAttachmentsTableName, errorHandler: SyncErrorHandler? = nil, syncInterval: TimeInterval = 30.0, archivedCacheLimit: Int64 = 100, syncThrottleDuration: TimeInterval = 1.0, subdirectories: [String]? = nil, downloadAttachments: Bool = true, - logger: (any LoggerProtocol)? = nil + logger: (any LoggerProtocol)? = nil, + getLocalUri: (@Sendable (_ filename: String) async -> String)? = nil ) { self.db = db self.remoteStorage = remoteStorage @@ -105,81 +318,93 @@ open class AttachmentQueue { self.subdirectories = subdirectories self.downloadAttachments = downloadAttachments self.logger = logger ?? db.logger - self.attachmentsService = AttachmentService( + _getLocalUri = getLocalUri ?? { filename in + URL(fileURLWithPath: attachmentsDirectory).appendingPathComponent(filename).path + } + attachmentsService = AttachmentServiceImpl( db: db, tableName: attachmentsQueueTableName, logger: self.logger, maxArchivedCount: archivedCacheLimit ) - self.lock = LockActor() + syncingService = SyncingService( + remoteStorage: self.remoteStorage, + localStorage: self.localStorage, + attachmentsService: attachmentsService, + logger: self.logger, + getLocalUri: _getLocalUri, + errorHandler: self.errorHandler, + syncThrottle: self.syncThrottleDuration + ) + } + + public func getLocalUri(_ filename: String) async -> String { + return await _getLocalUri(filename) } - /// Starts the attachment sync process public func startSync() async throws { - try await lock.withLock { - try guardClosed() + try guardClosed() - // Stop any active syncing before starting new Tasks - try await _stopSyncing() + // Stop any active syncing before starting new Tasks + try await _stopSyncing() - // Ensure the directory where attachments are downloaded exists - try await localStorage.makeDir(path: attachmentsDirectory) + // Ensure the directory where attachments are downloaded exists + try await localStorage.makeDir(path: attachmentsDirectory) - if let subdirectories = subdirectories { - for subdirectory in subdirectories { - let path = URL(fileURLWithPath: attachmentsDirectory).appendingPathComponent(subdirectory).path - try await localStorage.makeDir(path: path) - } + if let subdirectories = subdirectories { + for subdirectory in subdirectories { + let path = URL(fileURLWithPath: attachmentsDirectory).appendingPathComponent(subdirectory).path + try await localStorage.makeDir(path: path) } + } - // Verify initial state - try await attachmentsService.withContext { context in - try await self.verifyAttachments(context: context) - } + // Verify initial state + try await attachmentsService.withContext { context in + try await self.verifyAttachments(context: context) + } - try await syncingService.startSync(period: syncInterval) - - syncStatusTask = Task { - do { - try await withThrowingTaskGroup(of: Void.self) { group in - // Add connectivity monitoring task - group.addTask { - var previousConnected = self.db.currentStatus.connected - for await status in self.db.currentStatus.asFlow() { - try Task.checkCancellation() - if !previousConnected && status.connected { - try await self.syncingService.triggerSync() - } - previousConnected = status.connected - } - } + try await syncingService.startSync(period: syncInterval) + _startSyncTask() + } - // Add attachment watching task - group.addTask { - for try await items in try self.watchAttachments() { - try await self.processWatchedAttachments(items: items) + public func stopSyncing() async throws { + try await _stopSyncing() + } + + private func _startSyncTask() { + syncStatusTask = Task { + do { + try await withThrowingTaskGroup(of: Void.self) { group in + // Add connectivity monitoring task + group.addTask { + var previousConnected = self.db.currentStatus.connected + for await status in self.db.currentStatus.asFlow() { + try Task.checkCancellation() + if !previousConnected, status.connected { + try await self.syncingService.triggerSync() } + previousConnected = status.connected } - - // Wait for any task to complete (which should only happen on cancellation) - try await group.next() } - } catch { - if !(error is CancellationError) { - logger.error("Error in attachment sync job: \(error.localizedDescription)", tag: logTag) + + // Add attachment watching task + group.addTask { + for try await items in try self.watchAttachments() { + try await self.processWatchedAttachments(items: items) + } } + + // Wait for any task to complete (which should only happen on cancellation) + try await group.next() + } + } catch { + if !(error is CancellationError) { + logger.error("Error in attachment sync job: \(error.localizedDescription)", tag: logTag) } } } } - /// Stops active syncing tasks. Syncing can be resumed with ``startSync()`` - public func stopSyncing() async throws { - try await lock.withLock { - try await _stopSyncing() - } - } - private func _stopSyncing() async throws { try guardClosed() @@ -196,198 +421,23 @@ open class AttachmentQueue { try await syncingService.stopSync() } - /// Closes the attachment queue and cancels all sync tasks public func close() async throws { - try await lock.withLock { - try guardClosed() - - try await _stopSyncing() - try await syncingService.close() - closed = true - } - } - - /// Resolves the filename for a new attachment - /// - Parameters: - /// - attachmentId: Attachment ID - /// - fileExtension: File extension - /// - Returns: Resolved filename - public func resolveNewAttachmentFilename( - attachmentId: String, - fileExtension: String? - ) -> String { - return "\(attachmentId).\(fileExtension ?? "attachment")" - } - - /// Processes watched attachment items and updates sync state - /// - Parameter items: List of watched attachment items - public func processWatchedAttachments(items: [WatchedAttachmentItem]) async throws { - // Need to get all the attachments which are tracked in the DB. - // We might need to restore an archived attachment. - try await attachmentsService.withContext { context in - let currentAttachments = try await context.getAttachments() - var attachmentUpdates = [Attachment]() - - for item in items { - guard let existingQueueItem = currentAttachments.first(where: { $0.id == item.id }) else { - // Item is not present in the queue - - if !self.downloadAttachments { - continue - } - - // This item should be added to the queue - let filename = self.resolveNewAttachmentFilename( - attachmentId: item.id, - fileExtension: item.fileExtension - ) - - attachmentUpdates.append( - Attachment( - id: item.id, - filename: filename, - state: .queuedDownload, - hasSynced: false - ) - ) - continue - } - - if existingQueueItem.state == AttachmentState.archived { - // The attachment is present again. Need to queue it for sync. - // We might be able to optimize this in future - if existingQueueItem.hasSynced == true { - // No remote action required, we can restore the record (avoids deletion) - attachmentUpdates.append( - existingQueueItem.with(state: AttachmentState.synced) - ) - } else { - // The localURI should be set if the record was meant to be downloaded - // and has been synced. If it's missing and hasSynced is false then - // it must be an upload operation - let newState = existingQueueItem.localUri == nil ? - AttachmentState.queuedDownload : - AttachmentState.queuedUpload - - attachmentUpdates.append( - existingQueueItem.with(state: newState) - ) - } - } - } - - for attachment in currentAttachments { - let notInWatchedItems = items.first(where: { $0.id == attachment.id }) == nil - if notInWatchedItems { - switch attachment.state { - case .queuedDelete, .queuedUpload: - // Only archive if it has synced - if attachment.hasSynced == true { - attachmentUpdates.append( - attachment.with(state: .archived) - ) - } - default: - // Archive other states such as QUEUED_DOWNLOAD - attachmentUpdates.append( - attachment.with(state: .archived) - ) - } - } - } + try guardClosed() - if !attachmentUpdates.isEmpty { - try await context.saveAttachments(attachments: attachmentUpdates) - } - } + try await _stopSyncing() + try await syncingService.close() + closed = true } - /// Saves a new file and schedules it for upload - /// - Parameters: - /// - data: File data - /// - mediaType: MIME type - /// - fileExtension: File extension - /// - updateHook: Hook to assign attachment relationships in the same transaction - /// - Returns: The created attachment - @discardableResult - public func saveFile( - data: Data, - mediaType: String, - fileExtension: String?, - updateHook: @escaping (ConnectionContext, Attachment) throws -> Void - ) async throws -> Attachment { - let id = try await db.get(sql: "SELECT uuid() as id", parameters: [], mapper: { cursor in - try cursor.getString(name: "id") - }) - - let filename = resolveNewAttachmentFilename(attachmentId: id, fileExtension: fileExtension) - let localUri = getLocalUri(filename) - - // Write the file to the filesystem - let fileSize = try await localStorage.saveFile(filePath: localUri, data: data) - - return try await attachmentsService.withContext { context in - // Start a write transaction. The attachment record and relevant local relationship - // assignment should happen in the same transaction. - try await self.db.writeTransaction { tx in - let attachment = Attachment( - id: id, - filename: filename, - state: AttachmentState.queuedUpload, - localUri: localUri, - mediaType: mediaType, - size: fileSize - ) - - // Allow consumers to set relationships to this attachment id - try updateHook(tx, attachment) - - return try context.upsertAttachment(attachment, context: tx) - } - } - } - - /// Queues a file for deletion - /// - Parameters: - /// - attachmentId: ID of the attachment to delete - /// - updateHook: Hook to perform additional DB updates in the same transaction - @discardableResult - public func deleteFile( - attachmentId: String, - updateHook: @escaping (ConnectionContext, Attachment) throws -> Void - ) async throws -> Attachment { + /// Clears the attachment queue and deletes all attachment files + public func clearQueue() async throws { try await attachmentsService.withContext { context in - guard let attachment = try await context.getAttachment(id: attachmentId) else { - throw PowerSyncAttachmentError.notFound("Attachment record with id \(attachmentId) was not found.") - } - - let result = try await self.db.writeTransaction { tx in - try updateHook(tx, attachment) - - let updatedAttachment = Attachment( - id: attachment.id, - filename: attachment.filename, - state: AttachmentState.queuedDelete, - hasSynced: attachment.hasSynced, - localUri: attachment.localUri, - mediaType: attachment.mediaType, - size: attachment.size - ) - - return try context.upsertAttachment(updatedAttachment, context: tx) - } - return result + try await context.clearQueue() + // Remove the attachments directory + try await self.localStorage.rmDir(path: self.attachmentsDirectory) } } - /// Returns the local URI where a file is stored based on filename - /// - Parameter filename: The name of the file - /// - Returns: The file path - public func getLocalUri(_ filename: String) -> String { - return URL(fileURLWithPath: attachmentsDirectory).appendingPathComponent(filename).path - } - - /// Removes all archived items public func expireCache() async throws { try await attachmentsService.withContext { context in var done = false @@ -397,15 +447,6 @@ open class AttachmentQueue { } } - /// Clears the attachment queue and deletes all attachment files - public func clearQueue() async throws { - try await attachmentsService.withContext { context in - try await context.clearQueue() - // Remove the attachments directory - try await self.localStorage.rmDir(path: self.attachmentsDirectory) - } - } - /// Verifies attachment records are present in the filesystem private func verifyAttachments(context: AttachmentContext) async throws { let attachments = try await context.getAttachments() @@ -421,7 +462,7 @@ open class AttachmentQueue { // The file exists, this is correct continue } - + if attachment.state == AttachmentState.queuedUpload { // The file must have been removed from the local storage before upload was completed updates.append(attachment.with( diff --git a/Sources/PowerSync/attachments/AttachmentService.swift b/Sources/PowerSync/attachments/AttachmentService.swift index 3690439..a4db2a7 100644 --- a/Sources/PowerSync/attachments/AttachmentService.swift +++ b/Sources/PowerSync/attachments/AttachmentService.swift @@ -1,14 +1,23 @@ import Foundation +public protocol AttachmentServiceProtocol: Sendable { + /// Watches for changes to the attachments table. + func watchActiveAttachments() async throws -> AsyncThrowingStream<[String], Error> + + /// Executes a callback with exclusive access to the attachment context. + func withContext( + callback: @Sendable @escaping (AttachmentContext) async throws -> R + ) async throws -> R +} + /// Service which manages attachment records. -open class AttachmentService { +actor AttachmentServiceImpl: AttachmentServiceProtocol { private let db: any PowerSyncDatabaseProtocol private let tableName: String private let logger: any LoggerProtocol private let logTag = "AttachmentService" private let context: AttachmentContext - private let lock: LockActor /// Initializes the attachment service with the specified database, table name, logger, and max archived count. public init( @@ -26,10 +35,8 @@ open class AttachmentService { logger: logger, maxArchivedCount: maxArchivedCount ) - lock = LockActor() } - /// Watches for changes to the attachments table. public func watchActiveAttachments() throws -> AsyncThrowingStream<[String], Error> { logger.info("Watching attachments...", tag: logTag) @@ -49,17 +56,16 @@ open class AttachmentService { parameters: [ AttachmentState.queuedUpload.rawValue, AttachmentState.queuedDownload.rawValue, - AttachmentState.queuedDelete.rawValue, + AttachmentState.queuedDelete.rawValue ] ) { cursor in try cursor.getString(name: "id") } } - /// Executes a callback with exclusive access to the attachment context. - public func withContext(callback: @Sendable @escaping (AttachmentContext) async throws -> R) async throws -> R { - try await lock.withLock { - try await callback(context) - } + public func withContext( + callback: @Sendable @escaping (AttachmentContext) async throws -> R + ) async throws -> R { + try await callback(context) } } diff --git a/Sources/PowerSync/attachments/FileManagerLocalStorage.swift b/Sources/PowerSync/attachments/FileManagerLocalStorage.swift index cc3915e..d2ba944 100644 --- a/Sources/PowerSync/attachments/FileManagerLocalStorage.swift +++ b/Sources/PowerSync/attachments/FileManagerLocalStorage.swift @@ -3,10 +3,14 @@ import Foundation /** * Implementation of LocalStorageAdapter using FileManager */ -public class FileManagerStorageAdapter: LocalStorageAdapter { - private let fileManager = FileManager.default +public actor FileManagerStorageAdapter: LocalStorageAdapter { + private let fileManager: FileManager - public init() {} + public init( + fileManager: FileManager? = nil + ) { + self.fileManager = fileManager ?? FileManager.default + } public func saveFile(filePath: String, data: Data) async throws -> Int64 { return try await Task { diff --git a/Sources/PowerSync/attachments/LocalStorage.swift b/Sources/PowerSync/attachments/LocalStorage.swift index 071e522..a2187c6 100644 --- a/Sources/PowerSync/attachments/LocalStorage.swift +++ b/Sources/PowerSync/attachments/LocalStorage.swift @@ -4,7 +4,7 @@ import Foundation public enum PowerSyncAttachmentError: Error { /// A general error with an associated message case generalError(String) - + /// Indicates no matching attachment record could be found case notFound(String) @@ -16,13 +16,13 @@ public enum PowerSyncAttachmentError: Error { /// The given file or directory path was invalid case invalidPath(String) - + /// The attachments queue or sub services have been closed case closed(String) } /// Protocol defining an adapter interface for local file storage -public protocol LocalStorageAdapter { +public protocol LocalStorageAdapter: Sendable { /// Saves data to a file at the specified path. /// /// - Parameters: diff --git a/Sources/PowerSync/attachments/LockActor.swift b/Sources/PowerSync/attachments/LockActor.swift deleted file mode 100644 index 94f41db..0000000 --- a/Sources/PowerSync/attachments/LockActor.swift +++ /dev/null @@ -1,48 +0,0 @@ -import Foundation - -actor LockActor { - private var isLocked = false - private var waiters: [(id: UUID, continuation: CheckedContinuation)] = [] - - func withLock(_ operation: @Sendable () async throws -> T) async throws -> T { - try await waitUntilUnlocked() - - isLocked = true - defer { unlockNext() } - - try Task.checkCancellation() // cancellation check after acquiring lock - return try await operation() - } - - private func waitUntilUnlocked() async throws { - if !isLocked { return } - - let id = UUID() - - // Use withTaskCancellationHandler to manage cancellation - await withTaskCancellationHandler { - await withCheckedContinuation { continuation in - waiters.append((id: id, continuation: continuation)) - } - } onCancel: { - // Cancellation logic: remove the waiter when cancelled - Task { - await self.removeWaiter(id: id) - } - } - } - - private func removeWaiter(id: UUID) async { - // Safely remove the waiter from the actor's waiters list - waiters.removeAll { $0.id == id } - } - - private func unlockNext() { - if let next = waiters.first { - waiters.removeFirst() - next.continuation.resume() - } else { - isLocked = false - } - } -} diff --git a/Sources/PowerSync/attachments/README.md b/Sources/PowerSync/attachments/README.md index 9d85fca..47d9f7f 100644 --- a/Sources/PowerSync/attachments/README.md +++ b/Sources/PowerSync/attachments/README.md @@ -47,7 +47,7 @@ let schema = Schema( ) ``` -2. Create an `AttachmentQueue` instance. This class provides default syncing utilities and implements a default sync strategy. It can be subclassed for custom functionality: +2. Create an `AttachmentQueue` instance. This class provides default syncing utilities and implements a default sync strategy. ```swift func getAttachmentsDirectoryPath() throws -> String { @@ -78,6 +78,9 @@ let queue = AttachmentQueue( ) } ) ``` + +Note: `AttachmentQueue` is an Actor which implements `AttachmentQueueProtocol`. The `AttachmentQueueProtocol` can be subclassed for custom queue functionality if required. + - The `attachmentsDirectory` specifies where local attachment files should be stored. `FileManager.default.urls(for: .documentDirectory, in: .userDomainMask).first?.appendingPathComponent("attachments")` is a good choice. - The `remoteStorage` is responsible for connecting to the attachments backend. See the `RemoteStorageAdapter` protocol definition. - `watchAttachments` is closure which generates a publisher of `WatchedAttachmentItem`. These items represent the attachments that should be present in the application. diff --git a/Sources/PowerSync/attachments/RemoteStorage.swift b/Sources/PowerSync/attachments/RemoteStorage.swift index bd94a42..8779655 100644 --- a/Sources/PowerSync/attachments/RemoteStorage.swift +++ b/Sources/PowerSync/attachments/RemoteStorage.swift @@ -1,7 +1,7 @@ import Foundation /// Adapter for interfacing with remote attachment storage. -public protocol RemoteStorageAdapter { +public protocol RemoteStorageAdapter: Sendable { /// Uploads a file to remote storage. /// /// - Parameters: diff --git a/Sources/PowerSync/attachments/SyncErrorHandler.swift b/Sources/PowerSync/attachments/SyncErrorHandler.swift index b9a2b55..e7feb31 100644 --- a/Sources/PowerSync/attachments/SyncErrorHandler.swift +++ b/Sources/PowerSync/attachments/SyncErrorHandler.swift @@ -6,7 +6,7 @@ import Foundation /// operations (download, upload, delete) should be retried upon failure. /// /// If an operation fails and should not be retried, the attachment record is archived. -public protocol SyncErrorHandler { +public protocol SyncErrorHandler: Sendable { /// Handles a download error for a specific attachment. /// /// - Parameters: @@ -44,7 +44,7 @@ public protocol SyncErrorHandler { /// Default implementation of `SyncErrorHandler`. /// /// By default, all operations return `false`, indicating no retry. -public class DefaultSyncErrorHandler: SyncErrorHandler { +public final class DefaultSyncErrorHandler: SyncErrorHandler, Sendable { public init() {} public func onDownloadError(attachment _: Attachment, error _: Error) async -> Bool { diff --git a/Sources/PowerSync/attachments/SyncingService.swift b/Sources/PowerSync/attachments/SyncingService.swift index 3c3a551..1fdf22f 100644 --- a/Sources/PowerSync/attachments/SyncingService.swift +++ b/Sources/PowerSync/attachments/SyncingService.swift @@ -6,18 +6,37 @@ import Foundation /// This watches for changes to active attachments and performs queued /// download, upload, and delete operations. Syncs can be triggered manually, /// periodically, or based on database changes. -open class SyncingService { +public protocol SyncingServiceProtocol: Sendable { + /// Starts periodic syncing of attachments. + /// + /// - Parameter period: The time interval in seconds between each sync. + func startSync(period: TimeInterval) async throws + + func stopSync() async throws + + /// Cleans up internal resources and cancels any ongoing syncing. + func close() async throws + + /// Triggers a sync operation. Can be called manually. + func triggerSync() async throws + + /// Deletes attachments marked as archived that exist on local storage. + /// + /// - Returns: `true` if any deletions occurred, `false` otherwise. + func deleteArchivedAttachments(_ context: AttachmentContext) async throws -> Bool +} + +public actor SyncingService: SyncingServiceProtocol { private let remoteStorage: RemoteStorageAdapter private let localStorage: LocalStorageAdapter - private let attachmentsService: AttachmentService - private let getLocalUri: (String) async -> String + private let attachmentsService: AttachmentServiceProtocol + private let getLocalUri: @Sendable (String) async -> String private let errorHandler: SyncErrorHandler? private let syncThrottle: TimeInterval private var cancellables = Set() private let syncTriggerSubject = PassthroughSubject() private var periodicSyncTimer: Timer? private var syncTask: Task? - private let lock: LockActor let logger: any LoggerProtocol let logTag = "AttachmentSync" @@ -32,12 +51,12 @@ open class SyncingService { /// - getLocalUri: Callback used to resolve a local path for saving downloaded attachments. /// - errorHandler: Optional handler to determine if sync errors should be retried. /// - syncThrottle: Throttle interval to control frequency of sync triggers. - init( + public init( remoteStorage: RemoteStorageAdapter, localStorage: LocalStorageAdapter, - attachmentsService: AttachmentService, + attachmentsService: AttachmentServiceProtocol, logger: any LoggerProtocol, - getLocalUri: @escaping (String) async -> String, + getLocalUri: @Sendable @escaping (String) async -> String, errorHandler: SyncErrorHandler? = nil, syncThrottle: TimeInterval = 5.0 ) { @@ -48,29 +67,24 @@ open class SyncingService { self.errorHandler = errorHandler self.syncThrottle = syncThrottle self.logger = logger - self.closed = false - self.lock = LockActor() + closed = false } /// Starts periodic syncing of attachments. /// /// - Parameter period: The time interval in seconds between each sync. public func startSync(period: TimeInterval) async throws { - try await lock.withLock { - try guardClosed() + try guardClosed() - // Close any active sync operations - try await _stopSync() + // Close any active sync operations + try await _stopSync() - setupSyncFlow(period: period) - } + setupSyncFlow(period: period) } public func stopSync() async throws { - try await lock.withLock { - try guardClosed() - try await _stopSync() - } + try guardClosed() + try await _stopSync() } private func _stopSync() async throws { @@ -92,17 +106,15 @@ open class SyncingService { } /// Cleans up internal resources and cancels any ongoing syncing. - func close() async throws { - try await lock.withLock { - try guardClosed() + public func close() async throws { + try guardClosed() - try await _stopSync() - closed = true - } + try await _stopSync() + closed = true } /// Triggers a sync operation. Can be called manually. - func triggerSync() async throws { + public func triggerSync() async throws { try guardClosed() syncTriggerSubject.send(()) } @@ -110,7 +122,7 @@ open class SyncingService { /// Deletes attachments marked as archived that exist on local storage. /// /// - Returns: `true` if any deletions occurred, `false` otherwise. - func deleteArchivedAttachments(_ context: AttachmentContext) async throws -> Bool { + public func deleteArchivedAttachments(_ context: AttachmentContext) async throws -> Bool { return try await context.deleteArchivedAttachments { pendingDelete in for attachment in pendingDelete { guard let localUri = attachment.localUri else { continue } @@ -136,9 +148,6 @@ open class SyncingService { ) .sink { _ in continuation.yield(()) } - continuation.onTermination = { _ in - cancellable.cancel() - } self.cancellables.insert(cancellable) } } @@ -150,7 +159,7 @@ open class SyncingService { try await withThrowingTaskGroup(of: Void.self) { group in // Handle sync trigger events group.addTask { - let syncTrigger = self.createSyncTrigger() + let syncTrigger = await self.createSyncTrigger() for await _ in syncTrigger { try Task.checkCancellation() @@ -165,9 +174,9 @@ open class SyncingService { // Watch attachment records. Trigger a sync on change group.addTask { - for try await _ in try self.attachmentsService.watchActiveAttachments() { + for try await _ in try await self.attachmentsService.watchActiveAttachments() { try Task.checkCancellation() - self.syncTriggerSubject.send(()) + await self._triggerSyncSubject() } } @@ -178,7 +187,7 @@ open class SyncingService { try await self.triggerSync() } } - + // Wait for any task to complete try await group.next() } @@ -270,6 +279,11 @@ open class SyncingService { } } + /// Small actor isolated method to trigger the sync subject + private func _triggerSyncSubject() { + syncTriggerSubject.send(()) + } + /// Deletes an attachment from remote and local storage. /// /// - Parameter attachment: The attachment to delete. diff --git a/Sources/PowerSync/attachments/WatchedAttachmentItem.swift b/Sources/PowerSync/attachments/WatchedAttachmentItem.swift index b4cddc7..ff415dd 100644 --- a/Sources/PowerSync/attachments/WatchedAttachmentItem.swift +++ b/Sources/PowerSync/attachments/WatchedAttachmentItem.swift @@ -4,7 +4,7 @@ import Foundation /// A watched attachment record item. /// This is usually returned from watching all relevant attachment IDs. -public struct WatchedAttachmentItem { +public struct WatchedAttachmentItem: Sendable { /// Id for the attachment record public let id: String diff --git a/Tests/PowerSyncTests/AttachmentTests.swift b/Tests/PowerSyncTests/AttachmentTests.swift index b4427e4..46cb114 100644 --- a/Tests/PowerSyncTests/AttachmentTests.swift +++ b/Tests/PowerSyncTests/AttachmentTests.swift @@ -1,4 +1,3 @@ - @testable import PowerSync import XCTest @@ -29,7 +28,7 @@ final class AttachmentTests: XCTestCase { database = nil try await super.tearDown() } - + func getAttachmentDirectory() -> String { URL(fileURLWithPath: NSTemporaryDirectory()).appendingPathComponent("attachments").path } @@ -40,37 +39,39 @@ final class AttachmentTests: XCTestCase { remoteStorage: { struct MockRemoteStorage: RemoteStorageAdapter { func uploadFile( - fileData: Data, - attachment: Attachment + fileData _: Data, + attachment _: Attachment ) async throws {} - + /** * Download a file from remote storage */ - func downloadFile(attachment: Attachment) async throws -> Data { + func downloadFile(attachment _: Attachment) async throws -> Data { return Data([1, 2, 3]) } - + /** * Delete a file from remote storage */ - func deleteFile(attachment: Attachment) async throws {} + func deleteFile(attachment _: Attachment) async throws {} } - + return MockRemoteStorage() }(), attachmentsDirectory: getAttachmentDirectory(), - watchAttachments: { try self.database.watch(options: WatchOptions( - sql: "SELECT photo_id FROM users WHERE photo_id IS NOT NULL", - mapper: { cursor in try WatchedAttachmentItem( - id: cursor.getString(name: "photo_id"), - fileExtension: "jpg" - ) } - )) } + watchAttachments: { [database = database!] in + try database.watch(options: WatchOptions( + sql: "SELECT photo_id FROM users WHERE photo_id IS NOT NULL", + mapper: { cursor in try WatchedAttachmentItem( + id: cursor.getString(name: "photo_id"), + fileExtension: "jpg" + ) } + )) + } ) - + try await queue.startSync() - + // Create a user which has a photo_id associated. // This will be treated as a download since no attachment record was created. // saveFile creates the attachment record before the updates are made. @@ -78,58 +79,60 @@ final class AttachmentTests: XCTestCase { sql: "INSERT INTO users (id, name, email, photo_id) VALUES (uuid(), 'steven', 'steven@example.com', uuid())", parameters: [] ) - - let attachmentsWatch = try database.watch( - options: WatchOptions( - sql: "SELECT * FROM attachments", - mapper: { cursor in try Attachment.fromCursor(cursor) } - )).makeAsyncIterator() - + let attachmentRecord = try await waitForMatch( - iterator: attachmentsWatch, + iteratorGenerator: { [database = database!] in try database.watch( + options: WatchOptions( + sql: "SELECT * FROM attachments", + mapper: { cursor in try Attachment.fromCursor(cursor) } + )) }, where: { results in results.first?.state == AttachmentState.synced }, timeout: 5 ).first - - // The file should exist + +// The file should exist let localData = try await queue.localStorage.readFile(filePath: attachmentRecord!.localUri!) XCTAssertEqual(localData.count, 3) - + try await queue.clearQueue() try await queue.close() } - + func testAttachmentUpload() async throws { - class MockRemoteStorage: RemoteStorageAdapter { + actor MockRemoteStorage: RemoteStorageAdapter { public var uploadCalled = false - + + func wasUploadCalled() -> Bool { + return uploadCalled + } + func uploadFile( - fileData: Data, - attachment: Attachment + fileData _: Data, + attachment _: Attachment ) async throws { uploadCalled = true } - + /** * Download a file from remote storage */ - func downloadFile(attachment: Attachment) async throws -> Data { + func downloadFile(attachment _: Attachment) async throws -> Data { return Data([1, 2, 3]) } - + /** * Delete a file from remote storage */ - func deleteFile(attachment: Attachment) async throws {} + func deleteFile(attachment _: Attachment) async throws {} } let mockedRemote = MockRemoteStorage() - + let queue = AttachmentQueue( db: database, remoteStorage: mockedRemote, attachmentsDirectory: getAttachmentDirectory(), - watchAttachments: { try self.database.watch(options: WatchOptions( + watchAttachments: { [database = database!] in try database.watch(options: WatchOptions( sql: "SELECT photo_id FROM users WHERE photo_id IS NOT NULL", mapper: { cursor in try WatchedAttachmentItem( id: cursor.getString(name: "photo_id"), @@ -137,35 +140,36 @@ final class AttachmentTests: XCTestCase { ) } )) } ) - + try await queue.startSync() - - let attachmentsWatch = try database.watch( - options: WatchOptions( - sql: "SELECT * FROM attachments", - mapper: { cursor in try Attachment.fromCursor(cursor) } - )).makeAsyncIterator() - + _ = try await queue.saveFile( data: Data([3, 4, 5]), mediaType: "image/jpg", fileExtension: "jpg" - ) { tx, attachment in - _ = try tx.execute( + ) { transaction, attachment in + _ = try transaction.execute( sql: "INSERT INTO users (id, name, email, photo_id) VALUES (uuid(), 'john', 'j@j.com', ?)", parameters: [attachment.id] ) } - + _ = try await waitForMatch( - iterator: attachmentsWatch, + iteratorGenerator: { [database = database!] in + try database.watch( + options: WatchOptions( + sql: "SELECT * FROM attachments", + mapper: { cursor in try Attachment.fromCursor(cursor) } + )) + }, where: { results in results.first?.state == AttachmentState.synced }, timeout: 5 ).first - + + let uploadCalled = await mockedRemote.wasUploadCalled() // Upload should have been called - XCTAssertTrue(mockedRemote.uploadCalled) - + XCTAssertTrue(uploadCalled) + try await queue.clearQueue() try await queue.close() } @@ -176,21 +180,18 @@ public enum WaitForMatchError: Error { case predicateFail(message: String) } -public func waitForMatch( - iterator: AsyncThrowingStream.Iterator, - where predicate: @escaping (T) -> Bool, +public func waitForMatch( + iteratorGenerator: @Sendable @escaping () throws -> AsyncThrowingStream, + where predicate: @Sendable @escaping (T) -> Bool, timeout: TimeInterval ) async throws -> T { let timeoutNanoseconds = UInt64(timeout * 1_000_000_000) return try await withThrowingTaskGroup(of: T.self) { group in // Task to wait for a matching value - group.addTask { - var localIterator = iterator - while let value = try await localIterator.next() { - if predicate(value) { - return value - } + group.addTask { [iteratorGenerator] in + for try await value in try iteratorGenerator() where predicate(value) { + return value } throw WaitForMatchError.timeout() // stream ended before match } @@ -214,13 +215,13 @@ func waitFor( predicate: () async throws -> Void ) async throws { let intervalNanoseconds = UInt64(interval * 1_000_000_000) - + let timeoutDate = Date( timeIntervalSinceNow: timeout ) - + var lastError: Error? - + while Date() < timeoutDate { do { try await predicate() @@ -230,7 +231,7 @@ func waitFor( } try await Task.sleep(nanoseconds: intervalNanoseconds) } - + throw WaitForMatchError.timeout( lastError: lastError ) diff --git a/Tests/PowerSyncTests/ConnectTests.swift b/Tests/PowerSyncTests/ConnectTests.swift index 3c80cb4..ab373a0 100644 --- a/Tests/PowerSyncTests/ConnectTests.swift +++ b/Tests/PowerSyncTests/ConnectTests.swift @@ -38,7 +38,7 @@ final class ConnectTests: XCTestCase { /// This is an example of specifying JSON client params. /// The test here just ensures that the Kotlin SDK accepts these params and does not crash try await database.connect( - connector: PowerSyncBackendConnector(), + connector: MockConnector(), params: [ "foo": .string("bar"), ] @@ -50,7 +50,7 @@ final class ConnectTests: XCTestCase { XCTAssert(database.currentStatus.connecting == false) try await database.connect( - connector: PowerSyncBackendConnector() + connector: MockConnector() ) try await waitFor(timeout: 10) { @@ -58,59 +58,61 @@ final class ConnectTests: XCTestCase { throw WaitForMatchError.predicateFail(message: "Should be connecting") } } - + try await database.disconnect() - + try await waitFor(timeout: 10) { guard database.currentStatus.connecting == false else { throw WaitForMatchError.predicateFail(message: "Should not be connecting after disconnect") } } } - + func testSyncStatusUpdates() async throws { let expectation = XCTestExpectation( description: "Watch Sync Status" ) - - let watchTask = Task { - for try await _ in database.currentStatus.asFlow() { + + let watchTask = Task { [database] in + for try await _ in database!.currentStatus.asFlow() { expectation.fulfill() } } - + // Do some connecting operations try await database.connect( - connector: PowerSyncBackendConnector() + connector: MockConnector() ) - + // We should get an update await fulfillment(of: [expectation], timeout: 5) watchTask.cancel() } - + func testSyncHTTPLogs() async throws { let expectation = XCTestExpectation( description: "Should log a request to the PowerSync endpoint" ) - + let fakeUrl = "https://fakepowersyncinstance.fakepowersync.local" - - class TestConnector: PowerSyncBackendConnector { + + final class TestConnector: PowerSyncBackendConnectorProtocol { let url: String - + init(url: String) { self.url = url } - - override func fetchCredentials() async throws -> PowerSyncCredentials? { + + func fetchCredentials() async throws -> PowerSyncCredentials? { PowerSyncCredentials( endpoint: url, token: "123" ) } + + func uploadData(database _: PowerSyncDatabaseProtocol) async throws {} } - + try await database.connect( connector: TestConnector(url: fakeUrl), options: ConnectOptions( @@ -126,9 +128,9 @@ final class ConnectTests: XCTestCase { ) ) ) - + await fulfillment(of: [expectation], timeout: 5) - + try await database.disconnectAndClear() } } diff --git a/Tests/PowerSyncTests/test-utils/MockConnector.swift b/Tests/PowerSyncTests/test-utils/MockConnector.swift index 09cab45..dcc8012 100644 --- a/Tests/PowerSyncTests/test-utils/MockConnector.swift +++ b/Tests/PowerSyncTests/test-utils/MockConnector.swift @@ -1,5 +1,9 @@ import PowerSync -class MockConnector: PowerSyncBackendConnector { - +final class MockConnector: PowerSyncBackendConnectorProtocol { + func fetchCredentials() async throws -> PowerSync.PowerSyncCredentials? { + return nil + } + + func uploadData(database _: any PowerSync.PowerSyncDatabaseProtocol) async throws {} }