Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions crates/core/src/sync/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,24 @@ pub enum SyncEvent<'a> {
/// If pending CRUD entries have previously prevented a sync from completing, this even can be
/// used to try again.
UploadFinished,
ConnectionEstablished,
StreamEnded,
/// Forward a text line (JSON) received from the sync service.
TextLine { data: &'a str },
TextLine {
data: &'a str,
},
/// Forward a binary line (BSON) received from the sync service.
BinaryLine { data: &'a [u8] },
BinaryLine {
data: &'a [u8],
},
/// The active stream subscriptions (as in, `SyncStreamSubscription` instances active right now)
/// have changed.
///
/// The client will compare the new active subscriptions with the current one and will issue a
/// request to restart the sync iteration if necessary.
DidUpdateSubscriptions { active_streams: Rc<Vec<StreamKey>> },
DidUpdateSubscriptions {
active_streams: Rc<Vec<StreamKey>>,
},
}

/// An instruction sent by the core extension to the SDK.
Expand Down Expand Up @@ -244,6 +252,13 @@ pub fn register(db: *mut sqlite::sqlite3, state: Arc<DatabaseState>) -> Result<(
.map_err(PowerSyncError::as_argument_error)?,
})
}
"connection" => SyncControlRequest::SyncEvent(match payload.text() {
"established" => SyncEvent::ConnectionEstablished,
"end" => SyncEvent::StreamEnded,
_ => {
return Err(PowerSyncError::argument_error("unknown connection event"));
}
}),
"subscriptions" => {
let request = serde_json::from_str(payload.text())
.map_err(PowerSyncError::as_argument_error)?;
Expand Down
8 changes: 8 additions & 0 deletions crates/core/src/sync/streaming_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,14 @@ impl StreamingSyncIteration {
continue;
}
}
SyncEvent::ConnectionEstablished => {
self.status
.update(|s| s.mark_connected(), &mut event.instructions);
continue;
}
SyncEvent::StreamEnded => {
break false;
}
SyncEvent::DidRefreshToken => {
// Break so that the client SDK starts another iteration.
break true;
Expand Down
13 changes: 13 additions & 0 deletions dart/test/sync_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,19 @@ void _syncTests<T>({
});
});

test('handles connection events', () {
invokeControl('start', null);
expect(invokeControl('connection', 'established'), [
containsPair('UpdateSyncStatus',
containsPair('status', containsPair('connected', true)))
]);
expect(invokeControl('connection', 'end'), [
{
'CloseSyncStream': {'hide_disconnect': false}
}
]);
});

test('does not publish until reaching checkpoint', () {
invokeControl('start', null);
pushCheckpoint(buckets: priorityBuckets);
Expand Down
10 changes: 9 additions & 1 deletion docs/sync.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ The following commands are supported:
- A `schema: { tables: Table[], raw_tables: RawTable[] }` entry specifying the schema of the database to
use. Regular tables are also inferred from the database itself, but raw tables need to be specified.
If no raw tables are used, the `schema` entry can be omitted.
- `active_streams`: An array of `{name: string, params: Record<string, any>}` entries representing streams that
have an active subscription object in the application at the time the stream was opened.
2. `stop`: No payload, requests the current sync iteration (if any) to be shut down.
3. `line_text`: Payload is a serialized JSON object received from the sync service.
4. `line_binary`: Payload is a BSON-encoded object received from the sync service.
Expand All @@ -26,8 +28,14 @@ The following commands are supported:
6. `completed_upload`: Notify the sync implementation that all local changes have been uploaded.
7. `update_subscriptions`: Notify the sync implementation that subscriptions which are currently active in the app
have changed. Depending on the TTL of caches, this may cause it to request a reconnect.
8. `subscriptions`: Store a new sync steam subscription in the database or remove it.
8. `connection`: Notify the sync implementation about the connection being opened (second parameter should be `established`)
or the HTTP stream closing (second parameter should be `end`).
This is used to set `connected` to true in the sync status without waiting for the first sync line.
9. `subscriptions`: Store a new sync steam subscription in the database or remove it.
This command can run outside of a sync iteration and does not affect it.
10. `update_subscriptions`: Second parameter is a JSON-encoded array of `{name: string, params: Record<string, any>}`.
If a new subscription is created, or when a subscription without a TTL has been removed, the client will ask to
restart the connection.

`powersync_control` returns a JSON-encoded array of instructions for the client:

Expand Down
Loading