diff --git a/crates/core/src/sync/interface.rs b/crates/core/src/sync/interface.rs index 20dafba..c5731de 100644 --- a/crates/core/src/sync/interface.rs +++ b/crates/core/src/sync/interface.rs @@ -87,16 +87,24 @@ pub enum SyncEvent<'a> { /// If pending CRUD entries have previously prevented a sync from completing, this even can be /// used to try again. UploadFinished, + ConnectionEstablished, + StreamEnded, /// Forward a text line (JSON) received from the sync service. - TextLine { data: &'a str }, + TextLine { + data: &'a str, + }, /// Forward a binary line (BSON) received from the sync service. - BinaryLine { data: &'a [u8] }, + BinaryLine { + data: &'a [u8], + }, /// The active stream subscriptions (as in, `SyncStreamSubscription` instances active right now) /// have changed. /// /// The client will compare the new active subscriptions with the current one and will issue a /// request to restart the sync iteration if necessary. - DidUpdateSubscriptions { active_streams: Rc> }, + DidUpdateSubscriptions { + active_streams: Rc>, + }, } /// An instruction sent by the core extension to the SDK. @@ -244,6 +252,13 @@ pub fn register(db: *mut sqlite::sqlite3, state: Arc) -> Result<( .map_err(PowerSyncError::as_argument_error)?, }) } + "connection" => SyncControlRequest::SyncEvent(match payload.text() { + "established" => SyncEvent::ConnectionEstablished, + "end" => SyncEvent::StreamEnded, + _ => { + return Err(PowerSyncError::argument_error("unknown connection event")); + } + }), "subscriptions" => { let request = serde_json::from_str(payload.text()) .map_err(PowerSyncError::as_argument_error)?; diff --git a/crates/core/src/sync/streaming_sync.rs b/crates/core/src/sync/streaming_sync.rs index 66cd15f..7b7c6e0 100644 --- a/crates/core/src/sync/streaming_sync.rs +++ b/crates/core/src/sync/streaming_sync.rs @@ -556,6 +556,14 @@ impl StreamingSyncIteration { continue; } } + SyncEvent::ConnectionEstablished => { + self.status + .update(|s| s.mark_connected(), &mut event.instructions); + continue; + } + SyncEvent::StreamEnded => { + break false; + } SyncEvent::DidRefreshToken => { // Break so that the client SDK starts another iteration. break true; diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart index cf2c624..0cfdfe1 100644 --- a/dart/test/sync_test.dart +++ b/dart/test/sync_test.dart @@ -173,6 +173,19 @@ void _syncTests({ }); }); + test('handles connection events', () { + invokeControl('start', null); + expect(invokeControl('connection', 'established'), [ + containsPair('UpdateSyncStatus', + containsPair('status', containsPair('connected', true))) + ]); + expect(invokeControl('connection', 'end'), [ + { + 'CloseSyncStream': {'hide_disconnect': false} + } + ]); + }); + test('does not publish until reaching checkpoint', () { invokeControl('start', null); pushCheckpoint(buckets: priorityBuckets); diff --git a/docs/sync.md b/docs/sync.md index 06bdb4c..8737cee 100644 --- a/docs/sync.md +++ b/docs/sync.md @@ -16,6 +16,8 @@ The following commands are supported: - A `schema: { tables: Table[], raw_tables: RawTable[] }` entry specifying the schema of the database to use. Regular tables are also inferred from the database itself, but raw tables need to be specified. If no raw tables are used, the `schema` entry can be omitted. + - `active_streams`: An array of `{name: string, params: Record}` entries representing streams that + have an active subscription object in the application at the time the stream was opened. 2. `stop`: No payload, requests the current sync iteration (if any) to be shut down. 3. `line_text`: Payload is a serialized JSON object received from the sync service. 4. `line_binary`: Payload is a BSON-encoded object received from the sync service. @@ -26,8 +28,14 @@ The following commands are supported: 6. `completed_upload`: Notify the sync implementation that all local changes have been uploaded. 7. `update_subscriptions`: Notify the sync implementation that subscriptions which are currently active in the app have changed. Depending on the TTL of caches, this may cause it to request a reconnect. -8. `subscriptions`: Store a new sync steam subscription in the database or remove it. +8. `connection`: Notify the sync implementation about the connection being opened (second parameter should be `established`) + or the HTTP stream closing (second parameter should be `end`). + This is used to set `connected` to true in the sync status without waiting for the first sync line. +9. `subscriptions`: Store a new sync steam subscription in the database or remove it. This command can run outside of a sync iteration and does not affect it. +10. `update_subscriptions`: Second parameter is a JSON-encoded array of `{name: string, params: Record}`. + If a new subscription is created, or when a subscription without a TTL has been removed, the client will ask to + restart the connection. `powersync_control` returns a JSON-encoded array of instructions for the client: