From f2d6473d1ad54fdc2a88ef53acfd836965f8737e Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 17 Jul 2024 10:54:58 +0200 Subject: [PATCH 1/3] Log user_id and sync stats per connection. --- .../src/routes/endpoints/socket-route.ts | 10 ++++- .../src/routes/endpoints/sync-stream.ts | 11 +++++- .../service-core/src/sync/RequestTracker.ts | 21 +++++++++++ packages/service-core/src/sync/sync.ts | 37 +++++++++++++++---- packages/service-core/src/sync/util.ts | 8 +++- packages/service-core/test/src/sync.test.ts | 8 ++++ 6 files changed, 83 insertions(+), 12 deletions(-) create mode 100644 packages/service-core/src/sync/RequestTracker.ts diff --git a/packages/service-core/src/routes/endpoints/socket-route.ts b/packages/service-core/src/routes/endpoints/socket-route.ts index 7fd91e75a..799f94400 100644 --- a/packages/service-core/src/routes/endpoints/socket-route.ts +++ b/packages/service-core/src/routes/endpoints/socket-route.ts @@ -7,6 +7,7 @@ import { streamResponse } from '../../sync/sync.js'; import * as util from '../../util/util-index.js'; import { SocketRouteGenerator } from '../router-socket.js'; import { SyncRoutes } from './sync-stream.js'; +import { RequestTracker } from '../../sync/RequestTracker.js'; export const syncStreamReactive: SocketRouteGenerator = (router) => router.reactiveStream(SyncRoutes.STREAM, { @@ -66,6 +67,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => }); Metrics.getInstance().concurrent_connections.add(1); + const tracker = new RequestTracker(); try { for await (const data of streamResponse({ storage, @@ -79,6 +81,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => // RSocket handles keepalive events by default keep_alive: false }, + tracker, signal: controller.signal })) { if (data == null) { @@ -94,7 +97,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => const serialized = serialize(data) as Buffer; responder.onNext({ data: serialized }, false); requestedN--; - Metrics.getInstance().data_synced_bytes.add(serialized.length); + tracker.addDataSynced(serialized.length); } if (requestedN <= 0) { @@ -126,6 +129,11 @@ export const syncStreamReactive: SocketRouteGenerator = (router) => responder.onComplete(); removeStopHandler(); disposer(); + logger.info(`Sync stream complete`, { + user_id: syncParams.user_id, + operations_synced: tracker.operationsSynced, + data_synced_bytes: tracker.dataSyncedBytes + }); Metrics.getInstance().concurrent_connections.add(-1); } } diff --git a/packages/service-core/src/routes/endpoints/sync-stream.ts b/packages/service-core/src/routes/endpoints/sync-stream.ts index f51ef6708..404bdb268 100644 --- a/packages/service-core/src/routes/endpoints/sync-stream.ts +++ b/packages/service-core/src/routes/endpoints/sync-stream.ts @@ -8,6 +8,7 @@ import * as util from '../../util/util-index.js'; import { Metrics } from '../../metrics/Metrics.js'; import { authUser } from '../auth.js'; import { routeDefinition } from '../router.js'; +import { RequestTracker } from '../../sync/RequestTracker.js'; export enum SyncRoutes { STREAM = '/sync/stream' @@ -43,6 +44,7 @@ export const syncStreamed = routeDefinition({ }); } const controller = new AbortController(); + const tracker = new RequestTracker(); try { Metrics.getInstance().concurrent_connections.add(1); const stream = Readable.from( @@ -53,9 +55,11 @@ export const syncStreamed = routeDefinition({ params, syncParams, token: payload.context.token_payload!, + tracker, signal: controller.signal }) - ) + ), + tracker ), { objectMode: false, highWaterMark: 16 * 1024 } ); @@ -86,6 +90,11 @@ export const syncStreamed = routeDefinition({ afterSend: async () => { controller.abort(); Metrics.getInstance().concurrent_connections.add(-1); + logger.info(`Sync stream complete`, { + user_id: syncParams.user_id, + operations_synced: tracker.operationsSynced, + data_synced_bytes: tracker.dataSyncedBytes + }); } }); } catch (ex) { diff --git a/packages/service-core/src/sync/RequestTracker.ts b/packages/service-core/src/sync/RequestTracker.ts new file mode 100644 index 000000000..81d717d26 --- /dev/null +++ b/packages/service-core/src/sync/RequestTracker.ts @@ -0,0 +1,21 @@ +import { Metrics } from '../metrics/Metrics.js'; + +/** + * Record sync stats per request stream. + */ +export class RequestTracker { + operationsSynced = 0; + dataSyncedBytes = 0; + + addOperationsSynced(operations: number) { + this.operationsSynced += operations; + + Metrics.getInstance().operations_synced_total.add(operations); + } + + addDataSynced(bytes: number) { + this.dataSyncedBytes += bytes; + + Metrics.getInstance().data_synced_bytes.add(bytes); + } +} diff --git a/packages/service-core/src/sync/sync.ts b/packages/service-core/src/sync/sync.ts index a0f7af1af..f7e8ae5a5 100644 --- a/packages/service-core/src/sync/sync.ts +++ b/packages/service-core/src/sync/sync.ts @@ -11,6 +11,7 @@ import { logger } from '@powersync/lib-services-framework'; import { Metrics } from '../metrics/Metrics.js'; import { mergeAsyncIterables } from './merge.js'; import { TokenStreamOptions, tokenStream } from './util.js'; +import { RequestTracker } from './RequestTracker.js'; /** * Maximum number of connections actively fetching data. @@ -28,12 +29,14 @@ export interface SyncStreamParameters { */ signal?: AbortSignal; tokenStreamOptions?: Partial; + + tracker: RequestTracker; } export async function* streamResponse( options: SyncStreamParameters ): AsyncIterable { - const { storage, params, syncParams, token, tokenStreamOptions, signal } = options; + const { storage, params, syncParams, token, tokenStreamOptions, tracker, signal } = options; // We also need to be able to abort, so we create our own controller. const controller = new AbortController(); if (signal) { @@ -49,7 +52,7 @@ export async function* streamResponse( } } const ki = tokenStream(token, controller.signal, tokenStreamOptions); - const stream = streamResponseInner(storage, params, syncParams, controller.signal); + const stream = streamResponseInner(storage, params, syncParams, tracker, controller.signal); // Merge the two streams, and abort as soon as one of the streams end. const merged = mergeAsyncIterables([stream, ki], controller.signal); @@ -72,6 +75,7 @@ async function* streamResponseInner( storage: storage.BucketStorageFactory, params: util.StreamingSyncRequest, syncParams: RequestParameters, + tracker: RequestTracker, signal: AbortSignal ): AsyncGenerator { // Bucket state of bucket id -> op_id. @@ -137,10 +141,12 @@ async function* streamResponseInner( } bucketsToFetch = diff.updatedBuckets.map((c) => c.bucket); - let message = `Updated checkpoint: ${checkpoint} | write: ${writeCheckpoint} | `; + let message = `Updated checkpoint | user: ${syncParams.user_id} | `; + message += `op: ${checkpoint} | `; + message += `write: ${writeCheckpoint} | `; message += `buckets: ${allBuckets.length} | `; message += `updated: ${limitedBuckets(diff.updatedBuckets, 20)} | `; - message += `removed: ${limitedBuckets(diff.removedBuckets, 20)} | `; + message += `removed: ${limitedBuckets(diff.removedBuckets, 20)}`; logger.info(message); const checksum_line: util.StreamingSyncCheckpointDiff = { @@ -172,7 +178,16 @@ async function* streamResponseInner( // This incrementally updates dataBuckets with each individual bucket position. // At the end of this, we can be sure that all buckets have data up to the checkpoint. - yield* bucketDataInBatches({ storage, checkpoint, bucketsToFetch, dataBuckets, raw_data, binary_data, signal }); + yield* bucketDataInBatches({ + storage, + checkpoint, + bucketsToFetch, + dataBuckets, + raw_data, + binary_data, + signal, + tracker + }); await new Promise((resolve) => setTimeout(resolve, 10)); } @@ -186,6 +201,7 @@ interface BucketDataRequest { dataBuckets: Map; raw_data: boolean | undefined; binary_data: boolean | undefined; + tracker: RequestTracker; signal: AbortSignal; } @@ -221,11 +237,16 @@ async function* bucketDataInBatches(request: BucketDataRequest) { } } +interface BucketDataBatchResult { + done: boolean; + data: any; +} + /** * Extracted as a separate internal function just to avoid memory leaks. */ -async function* bucketDataBatch(request: BucketDataRequest) { - const { storage, checkpoint, bucketsToFetch, dataBuckets, raw_data, binary_data, signal } = request; +async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator { + const { storage, checkpoint, bucketsToFetch, dataBuckets, raw_data, binary_data, tracker, signal } = request; const [_, release] = await syncSemaphore.acquire(); try { @@ -272,7 +293,7 @@ async function* bucketDataBatch(request: BucketDataRequest) { // iterator memory in case if large data sent. yield { data: null, done: false }; } - Metrics.getInstance().operations_synced_total.add(r.data.length); + tracker.addOperationsSynced(r.data.length); dataBuckets.set(r.bucket, r.next_after); } diff --git a/packages/service-core/src/sync/util.ts b/packages/service-core/src/sync/util.ts index b0f6153cc..362706487 100644 --- a/packages/service-core/src/sync/util.ts +++ b/packages/service-core/src/sync/util.ts @@ -2,6 +2,7 @@ import * as timers from 'timers/promises'; import * as util from '../util/util-index.js'; import { Metrics } from '../metrics/Metrics.js'; +import { RequestTracker } from './RequestTracker.js'; export type TokenStreamOptions = { /** @@ -89,10 +90,13 @@ export async function* ndjson(iterator: AsyncIterable): AsyncGenerator { +export async function* transformToBytesTracked( + iterator: AsyncIterable, + tracker: RequestTracker +): AsyncGenerator { for await (let data of iterator) { const encoded = Buffer.from(data, 'utf8'); - Metrics.getInstance().data_synced_bytes.add(encoded.length); + tracker.addDataSynced(encoded.length); yield encoded; } } diff --git a/packages/service-core/test/src/sync.test.ts b/packages/service-core/test/src/sync.test.ts index 778ae8b29..91f242c0c 100644 --- a/packages/service-core/test/src/sync.test.ts +++ b/packages/service-core/test/src/sync.test.ts @@ -9,6 +9,7 @@ import { streamResponse } from '../../src/sync/sync.js'; import * as timers from 'timers/promises'; import { lsnMakeComparable } from '@powersync/service-jpgwire'; import { RequestParameters } from '@powersync/service-sync-rules'; +import { RequestTracker } from '@/sync/RequestTracker.js'; describe('sync - mongodb', function () { defineTests(MONGO_STORAGE_FACTORY); @@ -38,6 +39,8 @@ bucket_definitions: `; function defineTests(factory: StorageFactory) { + const tracker = new RequestTracker(); + test('sync global data', async () => { const f = await factory(); @@ -78,6 +81,7 @@ function defineTests(factory: StorageFactory) { include_checksum: true, raw_data: true }, + tracker, syncParams: new RequestParameters({ sub: '' }, {}), token: { exp: Date.now() / 1000 + 10 } as any }); @@ -118,6 +122,7 @@ function defineTests(factory: StorageFactory) { include_checksum: true, raw_data: false }, + tracker, syncParams: new RequestParameters({ sub: '' }, {}), token: { exp: Date.now() / 1000 + 10 } as any }); @@ -146,6 +151,7 @@ function defineTests(factory: StorageFactory) { include_checksum: true, raw_data: true }, + tracker, syncParams: new RequestParameters({ sub: '' }, {}), token: { exp: 0 } as any }); @@ -172,6 +178,7 @@ function defineTests(factory: StorageFactory) { include_checksum: true, raw_data: true }, + tracker, syncParams: new RequestParameters({ sub: '' }, {}), token: { exp: Date.now() / 1000 + 10 } as any }); @@ -232,6 +239,7 @@ function defineTests(factory: StorageFactory) { include_checksum: true, raw_data: true }, + tracker, syncParams: new RequestParameters({ sub: '' }, {}), token: { exp: exp } as any }); From 6143b06db600cbb89239247972dab40c31e71ce6 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 17 Jul 2024 11:00:43 +0200 Subject: [PATCH 2/3] Structured logging for checkpoints. --- packages/service-core/src/sync/sync.ts | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/packages/service-core/src/sync/sync.ts b/packages/service-core/src/sync/sync.ts index f7e8ae5a5..8659e358f 100644 --- a/packages/service-core/src/sync/sync.ts +++ b/packages/service-core/src/sync/sync.ts @@ -113,6 +113,11 @@ async function* streamResponseInner( }); if (allBuckets.length > 1000) { + logger.error(`Too many buckets`, { + checkpoint, + user_id: syncParams.user_id, + buckets: allBuckets.length + }); // TODO: Limit number of buckets even before we get to this point throw new Error(`Too many buckets: ${allBuckets.length}`); } @@ -141,13 +146,18 @@ async function* streamResponseInner( } bucketsToFetch = diff.updatedBuckets.map((c) => c.bucket); - let message = `Updated checkpoint | user: ${syncParams.user_id} | `; - message += `op: ${checkpoint} | `; + let message = `Updated checkpoint: ${checkpoint} | `; message += `write: ${writeCheckpoint} | `; message += `buckets: ${allBuckets.length} | `; message += `updated: ${limitedBuckets(diff.updatedBuckets, 20)} | `; message += `removed: ${limitedBuckets(diff.removedBuckets, 20)}`; - logger.info(message); + logger.info(message, { + checkpoint, + user_id: syncParams.user_id, + buckets: allBuckets.length, + updated: diff.updatedBuckets.length, + removed: diff.removedBuckets.length + }); const checksum_line: util.StreamingSyncCheckpointDiff = { checkpoint_diff: { @@ -162,7 +172,7 @@ async function* streamResponseInner( } else { let message = `New checkpoint: ${checkpoint} | write: ${writeCheckpoint} | `; message += `buckets: ${allBuckets.length} ${limitedBuckets(allBuckets, 20)}`; - logger.info(message); + logger.info(message, { checkpoint, user_id: syncParams.user_id, buckets: allBuckets.length }); bucketsToFetch = allBuckets; const checksum_line: util.StreamingSyncCheckpoint = { checkpoint: { From bdbf95cb562f8e6b990b3581325a01d1928dfdd0 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 17 Jul 2024 11:04:17 +0200 Subject: [PATCH 3/3] Add changeset. --- .changeset/plenty-dryers-look.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changeset/plenty-dryers-look.md diff --git a/.changeset/plenty-dryers-look.md b/.changeset/plenty-dryers-look.md new file mode 100644 index 000000000..a2c73083e --- /dev/null +++ b/.changeset/plenty-dryers-look.md @@ -0,0 +1,6 @@ +--- +'@powersync/service-core': patch +'@powersync/service-image': patch +--- + +Log user_id and sync stats for each connection