Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/angry-planets-clean.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/common': patch
---

Improved potential race condition when closing HTTP stream connections.
20 changes: 15 additions & 5 deletions packages/common/src/client/sync/stream/AbstractRemote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import PACKAGE from '../../../../package.json' with { type: 'json' };
import { AbortOperation } from '../../../utils/AbortOperation.js';
import { DataStream } from '../../../utils/DataStream.js';
import { PowerSyncCredentials } from '../../connection/PowerSyncCredentials.js';
import { StreamingSyncRequest } from './streaming-sync-types.js';
import { WebsocketClientTransport } from './WebsocketClientTransport.js';
import { StreamingSyncRequest } from './streaming-sync-types.js';


export type BSONImplementation = typeof BSON;

Expand Down Expand Up @@ -557,30 +558,39 @@ export abstract class AbstractRemote {
// Create a new stream splitting the response at line endings while also handling cancellations
// by closing the reader.
const reader = res.body.getReader();
let readerReleased = false;
// This will close the network request and read stream
const closeReader = async () => {
try {
readerReleased = true;
await reader.cancel();
} catch (ex) {
// an error will throw if the reader hasn't been used yet
}
reader.releaseLock();
};


const stream = new DataStream<T, string>({
logger: this.logger,
mapLine: mapLine
});

abortSignal?.addEventListener('abort', () => {
closeReader();
stream.close();
});

const decoder = this.createTextDecoder();
let buffer = '';

const stream = new DataStream<T, string>({
logger: this.logger,
mapLine: mapLine
});


const l = stream.registerListener({
lowWater: async () => {
if (stream.closed || abortSignal?.aborted || readerReleased) {
return
}
try {
let didCompleteLine = false;
while (!didCompleteLine) {
Expand Down
14 changes: 8 additions & 6 deletions packages/node/tests/sync.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { describe, vi, expect, beforeEach } from 'vitest';
import util from 'node:util';
import { beforeEach, describe, expect, vi } from 'vitest';

import { bucket, MockSyncService, mockSyncServiceTest, TestConnector, waitForSyncStatus } from './utils';
import {
AbstractPowerSyncDatabase,
BucketChecksum,
Expand All @@ -14,6 +13,7 @@ import {
SyncStreamConnectionMethod
} from '@powersync/common';
import Logger from 'js-logger';
import { bucket, MockSyncService, mockSyncServiceTest, TestConnector, waitForSyncStatus } from './utils';

describe('Sync', () => {
describe('js client', () => {
Expand Down Expand Up @@ -484,6 +484,7 @@ function defineSyncTests(impl: SyncClientImplementation) {

// Re-open database
await database.close();

await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(0));
database = await syncService.createDatabase();
database.connect(new TestConnector(), options);
Expand Down Expand Up @@ -821,10 +822,11 @@ function defineSyncTests(impl: SyncClientImplementation) {
const powersync = await syncService.createDatabase({ schema: customSchema, logger });
powersync.connect(new TestConnector(), options);

await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
expect(logMessages).toEqual(
expect.arrayContaining([expect.stringContaining('Raw tables require the Rust-based sync client')])
);
await vi.waitFor(() => {
expect(logMessages).toEqual(
expect.arrayContaining([expect.stringContaining('Raw tables require the Rust-based sync client')])
);
});
});

mockSyncServiceTest(`does not warn about raw tables if they're not used`, async ({ syncService }) => {
Expand Down
14 changes: 10 additions & 4 deletions packages/node/tests/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,16 @@ import { onTestFinished, test } from 'vitest';
import {
AbstractPowerSyncDatabase,
BucketChecksum,
column,
NodePowerSyncDatabaseOptions,
PowerSyncBackendConnector,
PowerSyncCredentials,
PowerSyncDatabase,
PowerSyncDatabaseOptions,
Schema,
StreamingSyncCheckpoint,
StreamingSyncLine,
SyncStatus,
Table
Table,
column
} from '../lib';

export async function createTempDir() {
Expand Down Expand Up @@ -67,7 +66,7 @@ export async function createDatabase(
const database = new PowerSyncDatabase({
schema: AppSchema,
...options,
logger: defaultLogger,
logger: options.logger ?? defaultLogger,
database: {
dbFilename: 'test.db',
dbLocation: tmpdir,
Expand Down Expand Up @@ -103,6 +102,9 @@ export const mockSyncServiceTest = tempDirectoryTest.extend<{
stream: ReadableStreamDefaultController<StreamingSyncLine>;
}

// Uses a unique database name per mockSyncServiceTest to avoid conflicts with other tests.
const databaseName = `test-${crypto.randomUUID()}.db`;

const listeners: Listener[] = [];

const inMemoryFetch: typeof fetch = async (info, init?) => {
Expand Down Expand Up @@ -149,6 +151,10 @@ export const mockSyncServiceTest = tempDirectoryTest.extend<{
const newConnection = async (options?: Partial<NodePowerSyncDatabaseOptions>) => {
const db = await createDatabase(tmpdir, {
...options,
database: {
dbFilename: databaseName,
...options?.database
},
remoteOptions: {
fetchImplementation: inMemoryFetch
}
Expand Down
18 changes: 17 additions & 1 deletion packages/web/tests/watch.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,11 @@ describe('Watch Tests', { sequential: true }, () => {
parameters: ['test']
})
.watch({
reportFetching: false
reportFetching: false,
// Comparisons require a comparator to be set
comparator: new ArrayComparator({
compareBy: (item) => JSON.stringify(item)
})
});

expect(watch.state.isFetching).false;
Expand All @@ -671,9 +675,21 @@ describe('Watch Tests', { sequential: true }, () => {
});
onTestFinished(dispose);

// Wait for the initial load to complete
await vi.waitFor(() => {
expect(notificationCount).equals(1);
});

notificationCount = 0; // We want to count the number of state changes after the initial load

// Should only a state change trigger for this operation
await powersync.execute('INSERT INTO assets(id, make, customer_id) VALUES (uuid(), ?, ?)', ['test', uuid()]);

// We should get an update for the change above
await vi.waitFor(() => {
expect(notificationCount).equals(1);
});

// Should not trigger any state change for these operations
await powersync.execute('INSERT INTO assets(id, make, customer_id) VALUES (uuid(), ?, ?)', ['make1', uuid()]);
await powersync.execute('INSERT INTO assets(id, make, customer_id) VALUES (uuid(), ?, ?)', ['make2', uuid()]);
Expand Down