diff --git a/.changeset/angry-ducks-sneeze.md b/.changeset/angry-ducks-sneeze.md index 5ed7d7714..b7dffe7be 100644 --- a/.changeset/angry-ducks-sneeze.md +++ b/.changeset/angry-ducks-sneeze.md @@ -2,6 +2,7 @@ '@powersync/react-native': minor '@powersync/common': minor '@powersync/web': minor +'@powersync/node': minor --- Add alpha support for sync streams, allowing different sets of data to be synced dynamically. diff --git a/.changeset/mighty-colts-rule.md b/.changeset/mighty-colts-rule.md new file mode 100644 index 000000000..da7bb221a --- /dev/null +++ b/.changeset/mighty-colts-rule.md @@ -0,0 +1,5 @@ +--- +'@powersync/react': minor +--- + +Add hooks for sync streams diff --git a/.github/workflows/test-simulators.yaml b/.github/workflows/test-simulators.yaml index a3077a756..79126f849 100644 --- a/.github/workflows/test-simulators.yaml +++ b/.github/workflows/test-simulators.yaml @@ -130,7 +130,9 @@ jobs: test-ios: name: Test iOS needs: check-changes - if: ${{ needs.check-changes.outputs.should_run == 'true' }} + # TODO: Re-enable iOS tests. They have been disabled because they are failing extremely frequently without + # any apparent cause. In particular, it seems like even starting the simulator times out most of the time. + if: ${{ false && needs.check-changes.outputs.should_run == 'true' }} runs-on: macOS-15 steps: diff --git a/packages/common/src/client/AbstractPowerSyncDatabase.ts b/packages/common/src/client/AbstractPowerSyncDatabase.ts index 5b7fa050c..23dba5abd 100644 --- a/packages/common/src/client/AbstractPowerSyncDatabase.ts +++ b/packages/common/src/client/AbstractPowerSyncDatabase.ts @@ -355,7 +355,10 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver any, signal?: AbortSignal): Promise { + /** + * Waits for the first sync status for which the `status` callback returns a truthy value. + */ + async waitForStatus(predicate: (status: SyncStatus) => any, signal?: AbortSignal): Promise { if (predicate(this.currentStatus)) { return; } @@ -364,16 +367,21 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver { if (predicate(status)) { - dispose(); - resolve(); + abort(); } } }); - signal?.addEventListener('abort', () => { + function abort() { dispose(); resolve(); - }); + } + + if (signal?.aborted) { + abort(); + } else { + signal?.addEventListener('abort', abort); + } }); } diff --git a/packages/common/src/client/ConnectionManager.ts b/packages/common/src/client/ConnectionManager.ts index 7521c31af..7d3d3ac8c 100644 --- a/packages/common/src/client/ConnectionManager.ts +++ b/packages/common/src/client/ConnectionManager.ts @@ -324,14 +324,15 @@ export class ConnectionManager extends BaseObserver { }; } - private get activeStreams() { + /** + * @internal exposed for testing + */ + get activeStreams() { return [...this.locallyActiveSubscriptions.values()].map((a) => ({ name: a.name, params: a.parameters })); } private subscriptionsMayHaveChanged() { - if (this.syncStreamImplementation) { - this.syncStreamImplementation.updateSubscriptions(this.activeStreams); - } + this.syncStreamImplementation?.updateSubscriptions(this.activeStreams); } } diff --git a/packages/node/vitest.config.ts b/packages/node/vitest.config.ts index def711060..fcfb9639a 100644 --- a/packages/node/vitest.config.ts +++ b/packages/node/vitest.config.ts @@ -3,6 +3,8 @@ import { defineConfig } from 'vitest/config'; // We need to define an empty config to be part of the vitest works export default defineConfig({ test: { - silent: false + silent: false, + // This doesn't make the tests considerably slower. It may improve reliability for GH actions. + fileParallelism: false } }); diff --git a/packages/react/src/hooks/streams.ts b/packages/react/src/hooks/streams.ts new file mode 100644 index 000000000..8616cd76a --- /dev/null +++ b/packages/react/src/hooks/streams.ts @@ -0,0 +1,131 @@ +import { useEffect, useMemo, useState } from 'react'; +import { usePowerSync } from './PowerSyncContext.js'; +import { + AbstractPowerSyncDatabase, + SyncStatus, + SyncStreamStatus, + SyncStreamSubscribeOptions, + SyncStreamSubscription +} from '@powersync/common'; +import { useStatus } from './useStatus.js'; +import { QuerySyncStreamOptions } from './watched/watch-types.js'; + +/** + * A sync stream to subscribe to in {@link useSyncStream}. + * + * For more details on sync streams, see the [documentation](https://docs.powersync.com/usage/sync-streams). + */ +export interface UseSyncStreamOptions extends SyncStreamSubscribeOptions { + /** + * The name of the stream to subscribe to. + */ + name: string; + /** + * Parameters for the stream subscription. A single stream can have multiple subscriptions with different parameter + * sets. + */ + parameters?: Record; +} + +/** + * Creates a PowerSync stream subscription. The subscription is kept alive as long as the React component calling this + * function. When it unmounts, {@link SyncStreamSubscription.unsubscribe} is called + * + * For more details on sync streams, see the [documentation](https://docs.powersync.com/usage/sync-streams). + * + * @returns The status for that stream, or `null` if the stream is currently being resolved. + */ +export function useSyncStream(options: UseSyncStreamOptions): SyncStreamStatus | null { + const { name, parameters } = options; + const db = usePowerSync(); + const status = useStatus(); + const [subscription, setSubscription] = useState(null); + + useEffect(() => { + let active = true; + let subscription: SyncStreamSubscription | null = null; + + db.syncStream(name, parameters) + .subscribe(options) + .then((sub) => { + if (active) { + subscription = sub; + setSubscription(sub); + } else { + // The cleanup function already ran, unsubscribe immediately. + sub.unsubscribe(); + } + }); + + return () => { + active = false; + // If we don't have a subscription yet, it'll still get cleaned up once the promise resolves because we've set + // active to false. + subscription?.unsubscribe(); + }; + }, [name, parameters]); + + return subscription && status.forStream(subscription); +} + +/** + * @internal + */ +export function useAllSyncStreamsHaveSynced( + db: AbstractPowerSyncDatabase, + streams: QuerySyncStreamOptions[] | undefined +): boolean { + // Since streams are a user-supplied array, they will likely be different each time this function is called. We don't + // want to update underlying subscriptions each time, though. + const hash = useMemo(() => streams && JSON.stringify(streams), [streams]); + const [synced, setHasSynced] = useState(streams == null || streams.every((e) => e.waitForStream != true)); + + useEffect(() => { + if (streams) { + setHasSynced(false); + + const promises: Promise[] = []; + const abort = new AbortController(); + for (const stream of streams) { + promises.push(db.syncStream(stream.name, stream.parameters).subscribe(stream)); + } + + // First, wait for all subscribe() calls to make all subscriptions active. + Promise.all(promises).then(async (resolvedStreams) => { + function allHaveSynced(status: SyncStatus) { + return resolvedStreams.every((s, i) => { + const request = streams[i]; + return !request.waitForStream || status.forStream(s)?.subscription?.hasSynced; + }); + } + + // Wait for the effect to be cancelled or all streams having synced. + await db.waitForStatus(allHaveSynced, abort.signal); + if (abort.signal.aborted) { + // Was cancelled + } else { + // Has synced, update public state. + setHasSynced(true); + + // Wait for cancellation before clearing subscriptions. + await new Promise((resolve) => { + abort.signal.addEventListener('abort', () => resolve()); + }); + } + + // Effect was definitely cancelled at this point, so drop the subscriptions. + for (const stream of resolvedStreams) { + stream.unsubscribe(); + } + }); + + return () => abort.abort(); + } else { + // There are no streams, so all of them have synced. + setHasSynced(true); + return undefined; + } + }, [hash]); + + return synced; +} diff --git a/packages/react/src/hooks/watched/useQuery.ts b/packages/react/src/hooks/watched/useQuery.ts index 53b7c6003..771391178 100644 --- a/packages/react/src/hooks/watched/useQuery.ts +++ b/packages/react/src/hooks/watched/useQuery.ts @@ -4,6 +4,7 @@ import { useSingleQuery } from './useSingleQuery.js'; import { useWatchedQuery } from './useWatchedQuery.js'; import { AdditionalOptions, DifferentialHookOptions, QueryResult, ReadonlyQueryResult } from './watch-types.js'; import { constructCompatibleQuery } from './watch-utils.js'; +import { useAllSyncStreamsHaveSynced } from '../streams.js'; /** * A hook to access the results of a watched query. @@ -58,15 +59,20 @@ export function useQuery( ) { const powerSync = usePowerSync(); if (!powerSync) { - return { isLoading: false, isFetching: false, data: [], error: new Error('PowerSync not configured.') }; + return { + ..._loadingState, + isLoading: false, + error: new Error('PowerSync not configured.') + }; } const { parsedQuery, queryChanged } = constructCompatibleQuery(query, parameters, options); + const streamsHaveSynced = useAllSyncStreamsHaveSynced(powerSync, options?.streams); const runOnce = options?.runQueryOnce == true; const single = useSingleQuery({ query: parsedQuery, powerSync, queryChanged, - active: runOnce + active: runOnce && streamsHaveSynced }); const watched = useWatchedQuery({ query: parsedQuery, @@ -79,8 +85,14 @@ export function useQuery( // We emit new data for each table change by default. rowComparator: options.rowComparator }, - active: !runOnce + active: !runOnce && streamsHaveSynced }); - return runOnce ? single : watched; + if (!streamsHaveSynced) { + return { ..._loadingState }; + } + + return (runOnce ? single : watched) ?? _loadingState; } + +const _loadingState = { isLoading: true, isFetching: false, data: [], error: undefined }; diff --git a/packages/react/src/hooks/watched/watch-types.ts b/packages/react/src/hooks/watched/watch-types.ts index d77e904a4..ec7499f3d 100644 --- a/packages/react/src/hooks/watched/watch-types.ts +++ b/packages/react/src/hooks/watched/watch-types.ts @@ -1,9 +1,35 @@ -import { DifferentialWatchedQueryComparator, SQLOnChangeOptions } from '@powersync/common'; +import { DifferentialWatchedQueryComparator, SQLOnChangeOptions, SyncSubscriptionDescription } from '@powersync/common'; +import { UseSyncStreamOptions } from '../streams.js'; export interface HookWatchOptions extends Omit { + /** + * An optional array of sync streams (with names and parameters) backing the query. + * + * When set, `useQuery` will subscribe to those streams (and automatically handle unsubscribing from them, too). + * + * If {@link QuerySyncStreamOptions} is set on a stream, `useQuery` will remain in a loading state until that stream + * is marked as {@link SyncSubscriptionDescription.hasSynced}. This ensures the query is not missing rows that haven't + * been downloaded. + * Note however that after an initial sync, the query will not block itself while new rows are downloading. Instead, + * consistent sync snapshots will be made available as they've been processed by PowerSync. + * + * @experimental Sync streams are currently in alpha. + */ + streams?: QuerySyncStreamOptions[]; reportFetching?: boolean; } +/** + * Additional options to control how `useQuery` behaves when subscribing to a stream. + */ +export interface QuerySyncStreamOptions extends UseSyncStreamOptions { + /** + * When set to `true`, a `useQuery` hook will remain in a loading state as long as the stream is resolving or + * downloading for the first time (in other words, until {@link SyncSubscriptionDescription.hasSynced} is true). + */ + waitForStream?: boolean; +} + export interface AdditionalOptions extends HookWatchOptions { runQueryOnce?: boolean; } diff --git a/packages/react/src/index.ts b/packages/react/src/index.ts index 00697c9c3..a05820e66 100644 --- a/packages/react/src/index.ts +++ b/packages/react/src/index.ts @@ -5,6 +5,7 @@ export * from './hooks/PowerSyncContext.js'; export { SuspenseQueryResult } from './hooks/suspense/SuspenseQueryResult.js'; export { useSuspenseQuery } from './hooks/suspense/useSuspenseQuery.js'; export { useWatchedQuerySuspenseSubscription } from './hooks/suspense/useWatchedQuerySuspenseSubscription.js'; +export { useSyncStream, UseSyncStreamOptions } from './hooks/streams.js'; export { useStatus } from './hooks/useStatus.js'; export { useQuery } from './hooks/watched/useQuery.js'; export { useWatchedQuerySubscription } from './hooks/watched/useWatchedQuerySubscription.js'; diff --git a/packages/react/tests/QueryStore.test.tsx b/packages/react/tests/QueryStore.test.tsx index b02e65e7e..dab993776 100644 --- a/packages/react/tests/QueryStore.test.tsx +++ b/packages/react/tests/QueryStore.test.tsx @@ -1,7 +1,7 @@ import { AbstractPowerSyncDatabase, SQLWatchOptions } from '@powersync/common'; import { beforeEach, describe, expect, it } from 'vitest'; import { generateQueryKey, getQueryStore, QueryStore } from '../src/QueryStore'; -import { openPowerSync } from './useQuery.test'; +import { openPowerSync } from './utils'; describe('QueryStore', () => { describe('generateQueryKey', () => { diff --git a/packages/react/tests/streams.test.tsx b/packages/react/tests/streams.test.tsx new file mode 100644 index 000000000..cd70871bb --- /dev/null +++ b/packages/react/tests/streams.test.tsx @@ -0,0 +1,158 @@ +import { cleanup, renderHook, waitFor } from '@testing-library/react'; +import { describe, expect, vi } from 'vitest'; +import React, { act, useSyncExternalStore } from 'react'; +import { AbstractPowerSyncDatabase, ConnectionManager, SyncStatus } from '@powersync/common'; +import { openPowerSync } from './utils'; +import { PowerSyncContext } from '../src/hooks/PowerSyncContext'; +import { useSyncStream, UseSyncStreamOptions } from '../src/hooks/streams'; +import { useQuery } from '../src/hooks/watched/useQuery'; +import { QuerySyncStreamOptions } from '../src/hooks/watched/watch-types'; + +describe('stream hooks', () => { + let db: AbstractPowerSyncDatabase; + + beforeEach(() => { + db = openPowerSync(); + vi.clearAllMocks(); + cleanup(); // Cleanup the DOM after each test + }); + + function currentStreams() { + const connections = (db as any).connectionManager as ConnectionManager; + return connections.activeStreams; + } + + const baseWrapper = ({ children }) => {children}; + + const testCases = [ + { + mode: 'normal', + wrapper: baseWrapper + }, + { + mode: 'StrictMode', + wrapper: ({ children }) => {baseWrapper({ children, db })} + } + ]; + + testCases.forEach(({ mode, wrapper: testWrapper }) => { + describe(`in ${mode}`, () => { + it('useSyncStream', async () => { + expect(currentStreams()).toStrictEqual([]); + + const { result, unmount } = renderHook(() => useSyncStream({ name: 'a' }), { + wrapper: testWrapper + }); + expect(result.current).toBeNull(); + await waitFor(() => expect(result.current).not.toBeNull(), { timeout: 1000, interval: 100 }); + expect(currentStreams()).toStrictEqual([{ name: 'a', params: null }]); + + // Should drop subscription on unmount + unmount(); + expect(currentStreams()).toStrictEqual([]); + }); + + it('useQuery waiting on stream', async () => { + const { result } = renderHook( + () => useQuery('SELECT 1', [], { streams: [{ name: 'a', waitForStream: true }] }), + { + wrapper: testWrapper + } + ); + expect(result.current).toMatchObject({ isLoading: true }); + // Including the stream should subscribe. + await waitFor(() => expect(currentStreams()).toHaveLength(1), { timeout: 1000, interval: 100 }); + expect(result.current).toMatchObject({ isLoading: true }); + + // Set last_synced_at for the subscription + db.currentStatus = _testStatus; + db.iterateListeners((l) => l.statusChanged?.(_testStatus)); + + // Which should eventually run the query. + await waitFor(() => expect(result.current.data).toHaveLength(1), { timeout: 1000, interval: 100 }); + }); + + it('useQuery not waiting on stream', async () => { + // By default, it should still run the query immediately instead of waiting for the stream to resolve. + const { result } = renderHook(() => useQuery('SELECT 1', [], { streams: [{ name: 'a' }] }), { + wrapper: testWrapper + }); + + // Not resolving the subscription. + await waitFor(() => expect(result.current.data).toHaveLength(1), { timeout: 1000, interval: 100 }); + }); + + it('unsubscribes on unmount', async () => { + const { unmount } = renderHook(() => useQuery('SELECT 1', [], { streams: [{ name: 'a' }, { name: 'b' }] }), { + wrapper: testWrapper + }); + await waitFor(() => expect(currentStreams()).toHaveLength(2), { timeout: 1000, interval: 100 }); + + unmount(); + await waitFor(() => expect(currentStreams()).toHaveLength(0), { timeout: 1000, interval: 100 }); + }); + + it('handles stream parameter changes', async () => { + // Start without streams + let streams: QuerySyncStreamOptions[] = []; + let streamUpdateListeners: (() => void)[] = []; + + const { result } = renderHook( + () => { + const updatingStreams = useSyncExternalStore( + (cb) => { + streamUpdateListeners.push(cb); + return () => { + const index = streamUpdateListeners.indexOf(cb); + if (index != -1) { + streamUpdateListeners.splice(index, 1); + } + }; + }, + () => streams + ); + + return useQuery('SELECT 1', [], { streams: updatingStreams }); + }, + { + wrapper: testWrapper + } + ); + + await waitFor(() => expect(result.current.data).toHaveLength(1), { timeout: 1000, interval: 100 }); + + // Adopt streams - this should reset back to loading + streams = [{ name: 'a', waitForStream: true }]; + act(() => streamUpdateListeners.forEach((cb) => cb())); + expect(result.current).toMatchObject({ isLoading: true }); + + // ... and subscribe + await waitFor(() => expect(currentStreams()).toHaveLength(1), { timeout: 1000, interval: 100 }); + expect(result.current).toMatchObject({ isLoading: true }); + + // update back to no streams + streams = []; + act(() => streamUpdateListeners.forEach((cb) => cb())); + await waitFor(() => expect(currentStreams()).toHaveLength(0), { timeout: 1000, interval: 100 }); + }); + }); + }); +}); + +const _testStatus = new SyncStatus({ + dataFlow: { + internalStreamSubscriptions: [ + { + name: 'a', + parameters: null, + progress: { total: 0, downloaded: 0 }, + active: true, + is_default: false, + has_explicit_subscription: true, + expires_at: null, + last_synced_at: 1234, + priority: 1 + } + ] + } +}); diff --git a/packages/react/tests/useQuery.test.tsx b/packages/react/tests/useQuery.test.tsx index 2ced417df..273b5730e 100644 --- a/packages/react/tests/useQuery.test.tsx +++ b/packages/react/tests/useQuery.test.tsx @@ -1,34 +1,16 @@ import * as commonSdk from '@powersync/common'; import { toCompilableQuery, wrapPowerSyncWithDrizzle } from '@powersync/drizzle-driver'; -import { PowerSyncDatabase } from '@powersync/web'; import { act, cleanup, renderHook, waitFor } from '@testing-library/react'; import { eq } from 'drizzle-orm'; import { sqliteTable, text } from 'drizzle-orm/sqlite-core'; import pDefer from 'p-defer'; import React, { useEffect } from 'react'; -import { beforeEach, describe, expect, it, onTestFinished, vi } from 'vitest'; +import { beforeEach, describe, expect, it, vi } from 'vitest'; import { PowerSyncContext } from '../src/hooks/PowerSyncContext'; import { useQuery } from '../src/hooks/watched/useQuery'; import { useWatchedQuerySubscription } from '../src/hooks/watched/useWatchedQuerySubscription'; import { QueryResult } from '../src/hooks/watched/watch-types'; - -export const openPowerSync = () => { - const db = new PowerSyncDatabase({ - database: { dbFilename: 'test.db' }, - schema: new commonSdk.Schema({ - lists: new commonSdk.Table({ - name: commonSdk.column.text - }) - }) - }); - - onTestFinished(async () => { - await db.disconnectAndClear(); - await db.close(); - }); - - return db; -}; +import { openPowerSync } from './utils'; describe('useQuery', () => { beforeEach(() => { diff --git a/packages/react/tests/useSuspenseQuery.test.tsx b/packages/react/tests/useSuspenseQuery.test.tsx index 45c0b0269..c553bd0d6 100644 --- a/packages/react/tests/useSuspenseQuery.test.tsx +++ b/packages/react/tests/useSuspenseQuery.test.tsx @@ -6,7 +6,7 @@ import { beforeEach, describe, expect, it, vi } from 'vitest'; import { PowerSyncContext } from '../src/hooks/PowerSyncContext'; import { useSuspenseQuery } from '../src/hooks/suspense/useSuspenseQuery'; import { useWatchedQuerySuspenseSubscription } from '../src/hooks/suspense/useWatchedQuerySuspenseSubscription'; -import { openPowerSync } from './useQuery.test'; +import { openPowerSync } from './utils'; describe('useSuspenseQuery', () => { const loadingFallback = 'Loading'; diff --git a/packages/react/tests/utils.ts b/packages/react/tests/utils.ts new file mode 100644 index 000000000..787f93234 --- /dev/null +++ b/packages/react/tests/utils.ts @@ -0,0 +1,25 @@ +import * as commonSdk from '@powersync/common'; + +import { PowerSyncDatabase } from '@powersync/web'; +import { onTestFinished } from 'vitest'; + +export const openPowerSync = () => { + const db = new PowerSyncDatabase({ + database: { dbFilename: 'test.db' }, + schema: new commonSdk.Schema({ + lists: new commonSdk.Table({ + name: commonSdk.column.text + }) + }) + }); + + onTestFinished(async () => { + // backport for https://github.com/powersync-ja/powersync-sqlite-core, disconnectAndClear is supposed to do this. + await db.execute('DELETE FROM ps_stream_subscriptions'); + + await db.disconnectAndClear(); + await db.close(); + }); + + return db; +};