diff --git a/.changeset/happy-seals-change.md b/.changeset/happy-seals-change.md new file mode 100644 index 000000000..c28c6d48c --- /dev/null +++ b/.changeset/happy-seals-change.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-core': patch +--- + +Fix inconsistencies between binary data being requested and actually being sent. diff --git a/modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap b/modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap index 2780b3a21..1dad61dc7 100644 --- a/modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap +++ b/modules/module-mongodb-storage/test/src/__snapshots__/storage_sync.test.ts.snap @@ -39,7 +39,7 @@ exports[`sync - mongodb > compacting data - invalidate checkpoint 2`] = ` "bucket": "mybucket[]", "data": [ { - "checksum": -93886621n, + "checksum": -93886621, "op": "CLEAR", "op_id": "2", }, @@ -74,7 +74,7 @@ exports[`sync - mongodb > compacting data - invalidate checkpoint 2`] = ` "bucket": "mybucket[]", "data": [ { - "checksum": 1859363232n, + "checksum": 1859363232, "data": "{"id":"t1","description":"Test 1b"}", "object_id": "t1", "object_type": "test", @@ -83,7 +83,7 @@ exports[`sync - mongodb > compacting data - invalidate checkpoint 2`] = ` "subkey": "e5aa2ddc-1328-58fa-a000-0b5ed31eaf1a", }, { - "checksum": 3028503153n, + "checksum": 3028503153, "data": "{"id":"t2","description":"Test 2b"}", "object_id": "t2", "object_type": "test", @@ -203,6 +203,7 @@ exports[`sync - mongodb > sends checkpoint complete line for empty checkpoint 1` "next_after": "1", }, }, + null, { "checkpoint_complete": { "last_op_id": "1", @@ -274,7 +275,7 @@ exports[`sync - mongodb > sync buckets in order 1`] = ` "bucket": "b1[]", "data": [ { - "checksum": 2912868539n, + "checksum": 2912868539, "data": "{"id":"earlier","description":"Test 2"}", "object_id": "earlier", "object_type": "test", @@ -299,7 +300,7 @@ exports[`sync - mongodb > sync buckets in order 1`] = ` "bucket": "b0[]", "data": [ { - "checksum": 920318466n, + "checksum": 920318466, "data": "{"id":"t1","description":"Test 1"}", "object_id": "t1", "object_type": "test", @@ -354,7 +355,7 @@ exports[`sync - mongodb > sync global data 1`] = ` "bucket": "mybucket[]", "data": [ { - "checksum": 920318466n, + "checksum": 920318466, "data": "{"id":"t1","description":"Test 1"}", "object_id": "t1", "object_type": "test", @@ -363,7 +364,7 @@ exports[`sync - mongodb > sync global data 1`] = ` "subkey": "e5aa2ddc-1328-58fa-a000-0b5ed31eaf1a", }, { - "checksum": 3280762209n, + "checksum": 3280762209, "data": "{"id":"t2","description":"Test 2"}", "object_id": "t2", "object_type": "test", @@ -702,7 +703,7 @@ exports[`sync - mongodb > sync updates to data query only 2`] = ` "bucket": "by_user["user1"]", "data": [ { - "checksum": 1418351250n, + "checksum": 1418351250, "data": "{"id":"list1","user_id":"user1","name":"User 1"}", "object_id": "list1", "object_type": "lists", @@ -787,7 +788,7 @@ exports[`sync - mongodb > sync updates to global data 2`] = ` "bucket": "mybucket[]", "data": [ { - "checksum": 920318466n, + "checksum": 920318466, "data": "{"id":"t1","description":"Test 1"}", "object_id": "t1", "object_type": "test", @@ -836,7 +837,7 @@ exports[`sync - mongodb > sync updates to global data 3`] = ` "bucket": "mybucket[]", "data": [ { - "checksum": 3280762209n, + "checksum": 3280762209, "data": "{"id":"t2","description":"Test 2"}", "object_id": "t2", "object_type": "test", @@ -909,7 +910,7 @@ exports[`sync - mongodb > sync updates to parameter query + data 2`] = ` "bucket": "by_user["user1"]", "data": [ { - "checksum": 1418351250n, + "checksum": 1418351250, "data": "{"id":"list1","user_id":"user1","name":"User 1"}", "object_id": "list1", "object_type": "lists", diff --git a/modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap b/modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap index dbbf0b515..b1631416f 100644 --- a/modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap +++ b/modules/module-postgres-storage/test/src/__snapshots__/storage_sync.test.ts.snap @@ -39,7 +39,7 @@ exports[`sync - postgres > compacting data - invalidate checkpoint 2`] = ` "bucket": "mybucket[]", "data": [ { - "checksum": -93886621n, + "checksum": -93886621, "op": "CLEAR", "op_id": "2", }, @@ -74,7 +74,7 @@ exports[`sync - postgres > compacting data - invalidate checkpoint 2`] = ` "bucket": "mybucket[]", "data": [ { - "checksum": 1859363232n, + "checksum": 1859363232, "data": "{"id":"t1","description":"Test 1b"}", "object_id": "t1", "object_type": "test", @@ -83,7 +83,7 @@ exports[`sync - postgres > compacting data - invalidate checkpoint 2`] = ` "subkey": "02d285ac-4f96-5124-8fba-c6d1df992dd1", }, { - "checksum": 3028503153n, + "checksum": 3028503153, "data": "{"id":"t2","description":"Test 2b"}", "object_id": "t2", "object_type": "test", @@ -203,6 +203,7 @@ exports[`sync - postgres > sends checkpoint complete line for empty checkpoint 1 "next_after": "1", }, }, + null, { "checkpoint_complete": { "last_op_id": "1", @@ -274,7 +275,7 @@ exports[`sync - postgres > sync buckets in order 1`] = ` "bucket": "b1[]", "data": [ { - "checksum": 2912868539n, + "checksum": 2912868539, "data": "{"id":"earlier","description":"Test 2"}", "object_id": "earlier", "object_type": "test", @@ -299,7 +300,7 @@ exports[`sync - postgres > sync buckets in order 1`] = ` "bucket": "b0[]", "data": [ { - "checksum": 920318466n, + "checksum": 920318466, "data": "{"id":"t1","description":"Test 1"}", "object_id": "t1", "object_type": "test", @@ -354,7 +355,7 @@ exports[`sync - postgres > sync global data 1`] = ` "bucket": "mybucket[]", "data": [ { - "checksum": 920318466n, + "checksum": 920318466, "data": "{"id":"t1","description":"Test 1"}", "object_id": "t1", "object_type": "test", @@ -363,7 +364,7 @@ exports[`sync - postgres > sync global data 1`] = ` "subkey": "02d285ac-4f96-5124-8fba-c6d1df992dd1", }, { - "checksum": 3280762209n, + "checksum": 3280762209, "data": "{"id":"t2","description":"Test 2"}", "object_id": "t2", "object_type": "test", @@ -702,7 +703,7 @@ exports[`sync - postgres > sync updates to data query only 2`] = ` "bucket": "by_user["user1"]", "data": [ { - "checksum": 1418351250n, + "checksum": 1418351250, "data": "{"id":"list1","user_id":"user1","name":"User 1"}", "object_id": "list1", "object_type": "lists", @@ -787,7 +788,7 @@ exports[`sync - postgres > sync updates to global data 2`] = ` "bucket": "mybucket[]", "data": [ { - "checksum": 920318466n, + "checksum": 920318466, "data": "{"id":"t1","description":"Test 1"}", "object_id": "t1", "object_type": "test", @@ -836,7 +837,7 @@ exports[`sync - postgres > sync updates to global data 3`] = ` "bucket": "mybucket[]", "data": [ { - "checksum": 3280762209n, + "checksum": 3280762209, "data": "{"id":"t2","description":"Test 2"}", "object_id": "t2", "object_type": "test", @@ -909,7 +910,7 @@ exports[`sync - postgres > sync updates to parameter query + data 2`] = ` "bucket": "by_user["user1"]", "data": [ { - "checksum": 1418351250n, + "checksum": 1418351250, "data": "{"id":"list1","user_id":"user1","name":"User 1"}", "object_id": "list1", "object_type": "lists", diff --git a/packages/service-core-tests/src/tests/register-sync-tests.ts b/packages/service-core-tests/src/tests/register-sync-tests.ts index 4c7224b43..48fba5bff 100644 --- a/packages/service-core-tests/src/tests/register-sync-tests.ts +++ b/packages/service-core-tests/src/tests/register-sync-tests.ts @@ -89,7 +89,8 @@ export function registerSyncTests(factory: storage.TestStorageFactory) { raw_data: true }, tracker, - token: { sub: '', exp: Date.now() / 1000 + 10 } as any + token: { sub: '', exp: Date.now() / 1000 + 10 } as any, + isEncodingAsBson: false }); const lines = await consumeCheckpointLines(stream); @@ -149,7 +150,8 @@ bucket_definitions: raw_data: true }, tracker, - token: { sub: '', exp: Date.now() / 1000 + 10 } as any + token: { sub: '', exp: Date.now() / 1000 + 10 } as any, + isEncodingAsBson: false }); const lines = await consumeCheckpointLines(stream); @@ -211,7 +213,8 @@ bucket_definitions: raw_data: true }, tracker, - token: { sub: '', exp: Date.now() / 1000 + 10 } as any + token: { sub: '', exp: Date.now() / 1000 + 10 } as any, + isEncodingAsBson: false }); let sentCheckpoints = 0; @@ -320,7 +323,8 @@ bucket_definitions: raw_data: true }, tracker, - token: { sub: 'user_one', exp: Date.now() / 1000 + 100000 } as any + token: { sub: 'user_one', exp: Date.now() / 1000 + 100000 } as any, + isEncodingAsBson: false }); let sentCheckpoints = 0; @@ -460,7 +464,8 @@ bucket_definitions: raw_data: true }, tracker, - token: { sub: '', exp: Date.now() / 1000 + 10 } as any + token: { sub: '', exp: Date.now() / 1000 + 10 } as any, + isEncodingAsBson: false }); let sentRows = 0; @@ -575,7 +580,8 @@ bucket_definitions: raw_data: true }, tracker, - token: { sub: '', exp: Date.now() / 1000 + 100000 } as any + token: { sub: '', exp: Date.now() / 1000 + 100000 } as any, + isEncodingAsBson: false }); const lines: any[] = []; @@ -640,7 +646,8 @@ bucket_definitions: raw_data: false }, tracker, - token: { sub: '', exp: Date.now() / 1000 + 10 } as any + token: { sub: '', exp: Date.now() / 1000 + 10 } as any, + isEncodingAsBson: false }); const lines = await consumeCheckpointLines(stream); @@ -668,7 +675,8 @@ bucket_definitions: raw_data: true }, tracker, - token: { sub: '', exp: 0 } as any + token: { sub: '', exp: 0 } as any, + isEncodingAsBson: false }); const lines = await consumeCheckpointLines(stream); @@ -698,7 +706,8 @@ bucket_definitions: raw_data: true }, tracker, - token: { sub: '', exp: Date.now() / 1000 + 10 } as any + token: { sub: '', exp: Date.now() / 1000 + 10 } as any, + isEncodingAsBson: false }); const iter = stream[Symbol.asyncIterator](); context.onTestFinished(() => { @@ -771,7 +780,8 @@ bucket_definitions: raw_data: true }, tracker, - token: { sub: 'user1', exp: Date.now() / 1000 + 100 } as any + token: { sub: 'user1', exp: Date.now() / 1000 + 100 } as any, + isEncodingAsBson: false }); const iter = stream[Symbol.asyncIterator](); context.onTestFinished(() => { @@ -846,7 +856,8 @@ bucket_definitions: raw_data: true }, tracker, - token: { sub: 'user1', exp: Date.now() / 1000 + 100 } as any + token: { sub: 'user1', exp: Date.now() / 1000 + 100 } as any, + isEncodingAsBson: false }); const iter = stream[Symbol.asyncIterator](); context.onTestFinished(() => { @@ -912,7 +923,8 @@ bucket_definitions: raw_data: true }, tracker, - token: { sub: 'user1', exp: Date.now() / 1000 + 100 } as any + token: { sub: 'user1', exp: Date.now() / 1000 + 100 } as any, + isEncodingAsBson: false }); const iter = stream[Symbol.asyncIterator](); context.onTestFinished(() => { @@ -979,7 +991,8 @@ bucket_definitions: raw_data: true }, tracker, - token: { sub: '', exp: exp } as any + token: { sub: '', exp: exp } as any, + isEncodingAsBson: false }); const iter = stream[Symbol.asyncIterator](); context.onTestFinished(() => { @@ -1041,7 +1054,8 @@ bucket_definitions: raw_data: true }, tracker, - token: { sub: '', exp: Date.now() / 1000 + 10 } as any + token: { sub: '', exp: Date.now() / 1000 + 10 } as any, + isEncodingAsBson: false }); const iter = stream[Symbol.asyncIterator](); @@ -1166,7 +1180,8 @@ bucket_definitions: raw_data: true }, tracker, - token: { sub: 'test', exp: Date.now() / 1000 + 10 } as any + token: { sub: 'test', exp: Date.now() / 1000 + 10 } as any, + isEncodingAsBson: false }; const stream1 = sync.streamResponse(params); const lines1 = await consumeCheckpointLines(stream1); diff --git a/packages/service-core/src/routes/endpoints/socket-route.ts b/packages/service-core/src/routes/endpoints/socket-route.ts index b3badfb0e..1c3f8d8b1 100644 --- a/packages/service-core/src/routes/endpoints/socket-route.ts +++ b/packages/service-core/src/routes/endpoints/socket-route.ts @@ -90,8 +90,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => bucketStorage: bucketStorage, syncRules: syncRules, params: { - ...params, - binary_data: true // always true for web sockets + ...params }, token: context!.token_payload!, tokenStreamOptions: { @@ -100,7 +99,8 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => }, tracker, signal, - logger + logger, + isEncodingAsBson: true })) { if (signal.aborted) { break; diff --git a/packages/service-core/src/routes/endpoints/sync-stream.ts b/packages/service-core/src/routes/endpoints/sync-stream.ts index 8ce2aadf4..15b16bd33 100644 --- a/packages/service-core/src/routes/endpoints/sync-stream.ts +++ b/packages/service-core/src/routes/endpoints/sync-stream.ts @@ -32,9 +32,9 @@ export const syncStreamed = routeDefinition({ const clientId = payload.params.client_id; const streamStart = Date.now(); // This falls back to JSON unless there's preference for the bson-stream in the Accept header. - const useBson = - payload.request.headers.accept && - new Negotiator(payload.request).mediaType(supportedContentTypes) == concatenatedBsonContentType; + const useBson = payload.request.headers.accept + ? new Negotiator(payload.request).mediaType(supportedContentTypes) == concatenatedBsonContentType + : false; logger.defaultMeta = { ...logger.defaultMeta, @@ -76,7 +76,8 @@ export const syncStreamed = routeDefinition({ token: payload.context.token_payload!, tracker, signal: controller.signal, - logger + logger, + isEncodingAsBson: useBson }); const byteContents = useBson ? sync.bsonLines(syncLines) : sync.ndjson(syncLines); diff --git a/packages/service-core/src/sync/sync.ts b/packages/service-core/src/sync/sync.ts index fec216a2e..177036972 100644 --- a/packages/service-core/src/sync/sync.ts +++ b/packages/service-core/src/sync/sync.ts @@ -27,6 +27,7 @@ export interface SyncStreamParameters { params: util.StreamingSyncRequest; token: auth.JwtPayload; logger?: Logger; + isEncodingAsBson: boolean; /** * If this signal is aborted, the stream response ends as soon as possible, without error. */ @@ -39,7 +40,17 @@ export interface SyncStreamParameters { export async function* streamResponse( options: SyncStreamParameters ): AsyncIterable { - const { syncContext, bucketStorage, syncRules, params, token, tokenStreamOptions, tracker, signal } = options; + const { + syncContext, + bucketStorage, + syncRules, + params, + token, + tokenStreamOptions, + tracker, + signal, + isEncodingAsBson + } = options; const logger = options.logger ?? defaultLogger; // We also need to be able to abort, so we create our own controller. @@ -65,7 +76,8 @@ export async function* streamResponse( token, tracker, controller.signal, - logger + logger, + isEncodingAsBson ); // Merge the two streams, and abort as soon as one of the streams end. const merged = mergeAsyncIterables([stream, ki], controller.signal); @@ -93,9 +105,10 @@ async function* streamResponseInner( tokenPayload: RequestJwtPayload, tracker: RequestTracker, signal: AbortSignal, - logger: Logger + logger: Logger, + isEncodingAsBson: boolean ): AsyncGenerator { - const { raw_data, binary_data } = params; + const { raw_data } = params; const userId = tokenPayload.sub; const checkpointUserId = util.checkpointUserId(userId as string, params.client_id); @@ -225,8 +238,7 @@ async function* streamResponseInner( checkpoint: next.value.value.checkpoint, bucketsToFetch: buckets, checkpointLine: line, - raw_data, - binary_data, + legacyDataLines: !isEncodingAsBson && params.raw_data != true, onRowsSent: markOperationsSent, abort_connection: signal, abort_batch: abortCheckpointSignal, @@ -255,8 +267,8 @@ interface BucketDataRequest { checkpointLine: CheckpointLine; /** Subset of checkpointLine.bucketsToFetch, filtered by priority. */ bucketsToFetch: BucketDescription[]; - raw_data: boolean | undefined; - binary_data: boolean | undefined; + /** Whether data lines should be encoded in a legacy format where {@link util.OplogEntry.data} is a nested object. */ + legacyDataLines: boolean; /** Signals that the connection was aborted and that streaming should stop ASAP. */ abort_connection: AbortSignal; /** @@ -317,8 +329,7 @@ async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator 50_000) { - // IMPORTANT: This does not affect the output stream, but is used to flush - // iterator memory in case if large data sent. - yield { data: null, done: false }; - } + const line = legacyDataLines + ? // We need to preserve the embedded data exactly, so this uses a JsonContainer + // and JSONBig to stringify. + JSONBig.stringify({ + data: transformLegacyResponse(r) + } satisfies util.StreamingSyncData) + : // We can send the object as-is, which will be converted to JSON or BSON by a downstream transformer. + ({ data: r } satisfies util.StreamingSyncData); + + yield { data: line, done: false }; + + // IMPORTANT: This does not affect the output stream, but is used to flush + // iterator memory in case if large data sent. + yield { data: null, done: false }; + onRowsSent(statsForBatch(r)); checkpointLine.updateBucketPosition({ bucket: r.bucket, nextAfter: BigInt(r.next_after), hasMore: r.has_more }); diff --git a/packages/service-core/src/util/protocol-types.ts b/packages/service-core/src/util/protocol-types.ts index 6edafbae4..21e5716c2 100644 --- a/packages/service-core/src/util/protocol-types.ts +++ b/packages/service-core/src/util/protocol-types.ts @@ -76,11 +76,6 @@ export const StreamingSyncRequest = t.object({ */ raw_data: t.boolean.optional(), - /** - * Data is received in a serialized BSON Buffer - */ - binary_data: t.boolean.optional(), - /** * Client parameters to be passed to the sync rules. */