Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changeset/angry-ducks-sneeze.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
'@powersync/react-native': minor
'@powersync/common': minor
'@powersync/web': minor
'@powersync/node': minor
---

Add alpha support for sync streams, allowing different sets of data to be synced dynamically.
5 changes: 5 additions & 0 deletions .changeset/mighty-colts-rule.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/react': minor
---

Add hooks for sync streams
4 changes: 3 additions & 1 deletion .github/workflows/test-simulators.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ jobs:
test-ios:
name: Test iOS
needs: check-changes
if: ${{ needs.check-changes.outputs.should_run == 'true' }}
# TODO: Re-enable iOS tests. They have been disabled because they are failing extremely frequently without
# any apparent cause. In particular, it seems like even starting the simulator times out most of the time.
if: ${{ false && needs.check-changes.outputs.should_run == 'true' }}
runs-on: macOS-15

steps:
Expand Down
18 changes: 13 additions & 5 deletions packages/common/src/client/AbstractPowerSyncDatabase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,10 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
return this.waitForStatus(statusMatches, signal);
}

private async waitForStatus(predicate: (status: SyncStatus) => any, signal?: AbortSignal): Promise<void> {
/**
* Waits for the first sync status for which the `status` callback returns a truthy value.
*/
async waitForStatus(predicate: (status: SyncStatus) => any, signal?: AbortSignal): Promise<void> {
if (predicate(this.currentStatus)) {
return;
}
Expand All @@ -364,16 +367,21 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
const dispose = this.registerListener({
statusChanged: (status) => {
if (predicate(status)) {
dispose();
resolve();
abort();
}
}
});

signal?.addEventListener('abort', () => {
function abort() {
dispose();
resolve();
});
}

if (signal?.aborted) {
abort();
} else {
signal?.addEventListener('abort', abort);
}
});
}

Expand Down
9 changes: 5 additions & 4 deletions packages/common/src/client/ConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -324,14 +324,15 @@ export class ConnectionManager extends BaseObserver<ConnectionManagerListener> {
};
}

private get activeStreams() {
/**
* @internal exposed for testing
*/
get activeStreams() {
return [...this.locallyActiveSubscriptions.values()].map((a) => ({ name: a.name, params: a.parameters }));
}

private subscriptionsMayHaveChanged() {
if (this.syncStreamImplementation) {
this.syncStreamImplementation.updateSubscriptions(this.activeStreams);
}
this.syncStreamImplementation?.updateSubscriptions(this.activeStreams);
}
}

Expand Down
4 changes: 3 additions & 1 deletion packages/node/vitest.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { defineConfig } from 'vitest/config';
// We need to define an empty config to be part of the vitest works
export default defineConfig({
test: {
silent: false
silent: false,
// This doesn't make the tests considerably slower. It may improve reliability for GH actions.
fileParallelism: false
}
});
131 changes: 131 additions & 0 deletions packages/react/src/hooks/streams.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import { useEffect, useMemo, useState } from 'react';
import { usePowerSync } from './PowerSyncContext.js';
import {
AbstractPowerSyncDatabase,
SyncStatus,
SyncStreamStatus,
SyncStreamSubscribeOptions,
SyncStreamSubscription
} from '@powersync/common';
import { useStatus } from './useStatus.js';
import { QuerySyncStreamOptions } from './watched/watch-types.js';

/**
* A sync stream to subscribe to in {@link useSyncStream}.
*
* For more details on sync streams, see the [documentation](https://docs.powersync.com/usage/sync-streams).
*/
export interface UseSyncStreamOptions extends SyncStreamSubscribeOptions {
/**
* The name of the stream to subscribe to.
*/
name: string;
/**
* Parameters for the stream subscription. A single stream can have multiple subscriptions with different parameter
* sets.
*/
parameters?: Record<string, any>;
}

/**
* Creates a PowerSync stream subscription. The subscription is kept alive as long as the React component calling this
* function. When it unmounts, {@link SyncStreamSubscription.unsubscribe} is called
*
* For more details on sync streams, see the [documentation](https://docs.powersync.com/usage/sync-streams).
*
* @returns The status for that stream, or `null` if the stream is currently being resolved.
*/
export function useSyncStream(options: UseSyncStreamOptions): SyncStreamStatus | null {
const { name, parameters } = options;
const db = usePowerSync();
const status = useStatus();
const [subscription, setSubscription] = useState<SyncStreamSubscription | null>(null);

useEffect(() => {
let active = true;
let subscription: SyncStreamSubscription | null = null;

db.syncStream(name, parameters)
.subscribe(options)
.then((sub) => {
if (active) {
subscription = sub;
setSubscription(sub);
} else {
// The cleanup function already ran, unsubscribe immediately.
sub.unsubscribe();
}
});

return () => {
active = false;
// If we don't have a subscription yet, it'll still get cleaned up once the promise resolves because we've set
// active to false.
subscription?.unsubscribe();
};
}, [name, parameters]);

return subscription && status.forStream(subscription);
}

/**
* @internal
*/
export function useAllSyncStreamsHaveSynced(
db: AbstractPowerSyncDatabase,
streams: QuerySyncStreamOptions[] | undefined
): boolean {
// Since streams are a user-supplied array, they will likely be different each time this function is called. We don't
// want to update underlying subscriptions each time, though.
const hash = useMemo(() => streams && JSON.stringify(streams), [streams]);
const [synced, setHasSynced] = useState(streams == null || streams.every((e) => e.waitForStream != true));

useEffect(() => {
if (streams) {
setHasSynced(false);

const promises: Promise<SyncStreamSubscription>[] = [];
const abort = new AbortController();
for (const stream of streams) {
promises.push(db.syncStream(stream.name, stream.parameters).subscribe(stream));
}

// First, wait for all subscribe() calls to make all subscriptions active.
Promise.all(promises).then(async (resolvedStreams) => {
function allHaveSynced(status: SyncStatus) {
return resolvedStreams.every((s, i) => {
const request = streams[i];
return !request.waitForStream || status.forStream(s)?.subscription?.hasSynced;
});
}

// Wait for the effect to be cancelled or all streams having synced.
await db.waitForStatus(allHaveSynced, abort.signal);
if (abort.signal.aborted) {
// Was cancelled
} else {
// Has synced, update public state.
setHasSynced(true);

// Wait for cancellation before clearing subscriptions.
await new Promise<void>((resolve) => {
abort.signal.addEventListener('abort', () => resolve());
});
}

// Effect was definitely cancelled at this point, so drop the subscriptions.
for (const stream of resolvedStreams) {
stream.unsubscribe();
}
});

return () => abort.abort();
} else {
// There are no streams, so all of them have synced.
setHasSynced(true);
return undefined;
}
}, [hash]);

return synced;
}
20 changes: 16 additions & 4 deletions packages/react/src/hooks/watched/useQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { useSingleQuery } from './useSingleQuery.js';
import { useWatchedQuery } from './useWatchedQuery.js';
import { AdditionalOptions, DifferentialHookOptions, QueryResult, ReadonlyQueryResult } from './watch-types.js';
import { constructCompatibleQuery } from './watch-utils.js';
import { useAllSyncStreamsHaveSynced } from '../streams.js';

/**
* A hook to access the results of a watched query.
Expand Down Expand Up @@ -58,15 +59,20 @@ export function useQuery<RowType = any>(
) {
const powerSync = usePowerSync();
if (!powerSync) {
return { isLoading: false, isFetching: false, data: [], error: new Error('PowerSync not configured.') };
return {
..._loadingState,
isLoading: false,
error: new Error('PowerSync not configured.')
};
}
const { parsedQuery, queryChanged } = constructCompatibleQuery(query, parameters, options);
const streamsHaveSynced = useAllSyncStreamsHaveSynced(powerSync, options?.streams);
const runOnce = options?.runQueryOnce == true;
const single = useSingleQuery<RowType>({
query: parsedQuery,
powerSync,
queryChanged,
active: runOnce
active: runOnce && streamsHaveSynced
});
const watched = useWatchedQuery<RowType>({
query: parsedQuery,
Expand All @@ -79,8 +85,14 @@ export function useQuery<RowType = any>(
// We emit new data for each table change by default.
rowComparator: options.rowComparator
},
active: !runOnce
active: !runOnce && streamsHaveSynced
});

return runOnce ? single : watched;
if (!streamsHaveSynced) {
return { ..._loadingState };
}

return (runOnce ? single : watched) ?? _loadingState;
}

const _loadingState = { isLoading: true, isFetching: false, data: [], error: undefined };
28 changes: 27 additions & 1 deletion packages/react/src/hooks/watched/watch-types.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,35 @@
import { DifferentialWatchedQueryComparator, SQLOnChangeOptions } from '@powersync/common';
import { DifferentialWatchedQueryComparator, SQLOnChangeOptions, SyncSubscriptionDescription } from '@powersync/common';
import { UseSyncStreamOptions } from '../streams.js';

export interface HookWatchOptions extends Omit<SQLOnChangeOptions, 'signal'> {
/**
* An optional array of sync streams (with names and parameters) backing the query.
*
* When set, `useQuery` will subscribe to those streams (and automatically handle unsubscribing from them, too).
*
* If {@link QuerySyncStreamOptions} is set on a stream, `useQuery` will remain in a loading state until that stream
* is marked as {@link SyncSubscriptionDescription.hasSynced}. This ensures the query is not missing rows that haven't
* been downloaded.
* Note however that after an initial sync, the query will not block itself while new rows are downloading. Instead,
* consistent sync snapshots will be made available as they've been processed by PowerSync.
*
* @experimental Sync streams are currently in alpha.
*/
streams?: QuerySyncStreamOptions[];
reportFetching?: boolean;
}

/**
* Additional options to control how `useQuery` behaves when subscribing to a stream.
*/
export interface QuerySyncStreamOptions extends UseSyncStreamOptions {
/**
* When set to `true`, a `useQuery` hook will remain in a loading state as long as the stream is resolving or
* downloading for the first time (in other words, until {@link SyncSubscriptionDescription.hasSynced} is true).
*/
waitForStream?: boolean;
}

export interface AdditionalOptions extends HookWatchOptions {
runQueryOnce?: boolean;
}
Expand Down
1 change: 1 addition & 0 deletions packages/react/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export * from './hooks/PowerSyncContext.js';
export { SuspenseQueryResult } from './hooks/suspense/SuspenseQueryResult.js';
export { useSuspenseQuery } from './hooks/suspense/useSuspenseQuery.js';
export { useWatchedQuerySuspenseSubscription } from './hooks/suspense/useWatchedQuerySuspenseSubscription.js';
export { useSyncStream, UseSyncStreamOptions } from './hooks/streams.js';
export { useStatus } from './hooks/useStatus.js';
export { useQuery } from './hooks/watched/useQuery.js';
export { useWatchedQuerySubscription } from './hooks/watched/useWatchedQuerySubscription.js';
Expand Down
2 changes: 1 addition & 1 deletion packages/react/tests/QueryStore.test.tsx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { AbstractPowerSyncDatabase, SQLWatchOptions } from '@powersync/common';
import { beforeEach, describe, expect, it } from 'vitest';
import { generateQueryKey, getQueryStore, QueryStore } from '../src/QueryStore';
import { openPowerSync } from './useQuery.test';
import { openPowerSync } from './utils';

describe('QueryStore', () => {
describe('generateQueryKey', () => {
Expand Down
Loading