Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/plenty-dryers-look.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/service-core': patch
'@powersync/service-image': patch
---

Log user_id and sync stats for each connection
10 changes: 9 additions & 1 deletion packages/service-core/src/routes/endpoints/socket-route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { streamResponse } from '../../sync/sync.js';
import * as util from '../../util/util-index.js';
import { SocketRouteGenerator } from '../router-socket.js';
import { SyncRoutes } from './sync-stream.js';
import { RequestTracker } from '../../sync/RequestTracker.js';

export const syncStreamReactive: SocketRouteGenerator = (router) =>
router.reactiveStream<util.StreamingSyncRequest, any>(SyncRoutes.STREAM, {
Expand Down Expand Up @@ -66,6 +67,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
});

Metrics.getInstance().concurrent_connections.add(1);
const tracker = new RequestTracker();
try {
for await (const data of streamResponse({
storage,
Expand All @@ -79,6 +81,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
// RSocket handles keepalive events by default
keep_alive: false
},
tracker,
signal: controller.signal
})) {
if (data == null) {
Expand All @@ -94,7 +97,7 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
const serialized = serialize(data) as Buffer;
responder.onNext({ data: serialized }, false);
requestedN--;
Metrics.getInstance().data_synced_bytes.add(serialized.length);
tracker.addDataSynced(serialized.length);
}

if (requestedN <= 0) {
Expand Down Expand Up @@ -126,6 +129,11 @@ export const syncStreamReactive: SocketRouteGenerator = (router) =>
responder.onComplete();
removeStopHandler();
disposer();
logger.info(`Sync stream complete`, {
user_id: syncParams.user_id,
operations_synced: tracker.operationsSynced,
data_synced_bytes: tracker.dataSyncedBytes
});
Metrics.getInstance().concurrent_connections.add(-1);
}
}
Expand Down
11 changes: 10 additions & 1 deletion packages/service-core/src/routes/endpoints/sync-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import * as util from '../../util/util-index.js';
import { Metrics } from '../../metrics/Metrics.js';
import { authUser } from '../auth.js';
import { routeDefinition } from '../router.js';
import { RequestTracker } from '../../sync/RequestTracker.js';

export enum SyncRoutes {
STREAM = '/sync/stream'
Expand Down Expand Up @@ -43,6 +44,7 @@ export const syncStreamed = routeDefinition({
});
}
const controller = new AbortController();
const tracker = new RequestTracker();
try {
Metrics.getInstance().concurrent_connections.add(1);
const stream = Readable.from(
Expand All @@ -53,9 +55,11 @@ export const syncStreamed = routeDefinition({
params,
syncParams,
token: payload.context.token_payload!,
tracker,
signal: controller.signal
})
)
),
tracker
),
{ objectMode: false, highWaterMark: 16 * 1024 }
);
Expand Down Expand Up @@ -86,6 +90,11 @@ export const syncStreamed = routeDefinition({
afterSend: async () => {
controller.abort();
Metrics.getInstance().concurrent_connections.add(-1);
logger.info(`Sync stream complete`, {
user_id: syncParams.user_id,
operations_synced: tracker.operationsSynced,
data_synced_bytes: tracker.dataSyncedBytes
});
}
});
} catch (ex) {
Expand Down
21 changes: 21 additions & 0 deletions packages/service-core/src/sync/RequestTracker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import { Metrics } from '../metrics/Metrics.js';

/**
* Record sync stats per request stream.
*/
export class RequestTracker {
operationsSynced = 0;
dataSyncedBytes = 0;

addOperationsSynced(operations: number) {
this.operationsSynced += operations;

Metrics.getInstance().operations_synced_total.add(operations);
}

addDataSynced(bytes: number) {
this.dataSyncedBytes += bytes;

Metrics.getInstance().data_synced_bytes.add(bytes);
}
}
51 changes: 41 additions & 10 deletions packages/service-core/src/sync/sync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { logger } from '@powersync/lib-services-framework';
import { Metrics } from '../metrics/Metrics.js';
import { mergeAsyncIterables } from './merge.js';
import { TokenStreamOptions, tokenStream } from './util.js';
import { RequestTracker } from './RequestTracker.js';

/**
* Maximum number of connections actively fetching data.
Expand All @@ -28,12 +29,14 @@ export interface SyncStreamParameters {
*/
signal?: AbortSignal;
tokenStreamOptions?: Partial<TokenStreamOptions>;

tracker: RequestTracker;
}

export async function* streamResponse(
options: SyncStreamParameters
): AsyncIterable<util.StreamingSyncLine | string | null> {
const { storage, params, syncParams, token, tokenStreamOptions, signal } = options;
const { storage, params, syncParams, token, tokenStreamOptions, tracker, signal } = options;
// We also need to be able to abort, so we create our own controller.
const controller = new AbortController();
if (signal) {
Expand All @@ -49,7 +52,7 @@ export async function* streamResponse(
}
}
const ki = tokenStream(token, controller.signal, tokenStreamOptions);
const stream = streamResponseInner(storage, params, syncParams, controller.signal);
const stream = streamResponseInner(storage, params, syncParams, tracker, controller.signal);
// Merge the two streams, and abort as soon as one of the streams end.
const merged = mergeAsyncIterables([stream, ki], controller.signal);

Expand All @@ -72,6 +75,7 @@ async function* streamResponseInner(
storage: storage.BucketStorageFactory,
params: util.StreamingSyncRequest,
syncParams: RequestParameters,
tracker: RequestTracker,
signal: AbortSignal
): AsyncGenerator<util.StreamingSyncLine | string | null> {
// Bucket state of bucket id -> op_id.
Expand Down Expand Up @@ -109,6 +113,11 @@ async function* streamResponseInner(
});

if (allBuckets.length > 1000) {
logger.error(`Too many buckets`, {
checkpoint,
user_id: syncParams.user_id,
buckets: allBuckets.length
});
// TODO: Limit number of buckets even before we get to this point
throw new Error(`Too many buckets: ${allBuckets.length}`);
}
Expand Down Expand Up @@ -137,11 +146,18 @@ async function* streamResponseInner(
}
bucketsToFetch = diff.updatedBuckets.map((c) => c.bucket);

let message = `Updated checkpoint: ${checkpoint} | write: ${writeCheckpoint} | `;
let message = `Updated checkpoint: ${checkpoint} | `;
message += `write: ${writeCheckpoint} | `;
message += `buckets: ${allBuckets.length} | `;
message += `updated: ${limitedBuckets(diff.updatedBuckets, 20)} | `;
message += `removed: ${limitedBuckets(diff.removedBuckets, 20)} | `;
logger.info(message);
message += `removed: ${limitedBuckets(diff.removedBuckets, 20)}`;
logger.info(message, {
checkpoint,
user_id: syncParams.user_id,
buckets: allBuckets.length,
updated: diff.updatedBuckets.length,
removed: diff.removedBuckets.length
});

const checksum_line: util.StreamingSyncCheckpointDiff = {
checkpoint_diff: {
Expand All @@ -156,7 +172,7 @@ async function* streamResponseInner(
} else {
let message = `New checkpoint: ${checkpoint} | write: ${writeCheckpoint} | `;
message += `buckets: ${allBuckets.length} ${limitedBuckets(allBuckets, 20)}`;
logger.info(message);
logger.info(message, { checkpoint, user_id: syncParams.user_id, buckets: allBuckets.length });
bucketsToFetch = allBuckets;
const checksum_line: util.StreamingSyncCheckpoint = {
checkpoint: {
Expand All @@ -172,7 +188,16 @@ async function* streamResponseInner(

// This incrementally updates dataBuckets with each individual bucket position.
// At the end of this, we can be sure that all buckets have data up to the checkpoint.
yield* bucketDataInBatches({ storage, checkpoint, bucketsToFetch, dataBuckets, raw_data, binary_data, signal });
yield* bucketDataInBatches({
storage,
checkpoint,
bucketsToFetch,
dataBuckets,
raw_data,
binary_data,
signal,
tracker
});

await new Promise((resolve) => setTimeout(resolve, 10));
}
Expand All @@ -186,6 +211,7 @@ interface BucketDataRequest {
dataBuckets: Map<string, string>;
raw_data: boolean | undefined;
binary_data: boolean | undefined;
tracker: RequestTracker;
signal: AbortSignal;
}

Expand Down Expand Up @@ -221,11 +247,16 @@ async function* bucketDataInBatches(request: BucketDataRequest) {
}
}

interface BucketDataBatchResult {
done: boolean;
data: any;
}

/**
* Extracted as a separate internal function just to avoid memory leaks.
*/
async function* bucketDataBatch(request: BucketDataRequest) {
const { storage, checkpoint, bucketsToFetch, dataBuckets, raw_data, binary_data, signal } = request;
async function* bucketDataBatch(request: BucketDataRequest): AsyncGenerator<BucketDataBatchResult, void> {
const { storage, checkpoint, bucketsToFetch, dataBuckets, raw_data, binary_data, tracker, signal } = request;

const [_, release] = await syncSemaphore.acquire();
try {
Expand Down Expand Up @@ -272,7 +303,7 @@ async function* bucketDataBatch(request: BucketDataRequest) {
// iterator memory in case if large data sent.
yield { data: null, done: false };
}
Metrics.getInstance().operations_synced_total.add(r.data.length);
tracker.addOperationsSynced(r.data.length);

dataBuckets.set(r.bucket, r.next_after);
}
Expand Down
8 changes: 6 additions & 2 deletions packages/service-core/src/sync/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import * as timers from 'timers/promises';

import * as util from '../util/util-index.js';
import { Metrics } from '../metrics/Metrics.js';
import { RequestTracker } from './RequestTracker.js';

export type TokenStreamOptions = {
/**
Expand Down Expand Up @@ -89,10 +90,13 @@ export async function* ndjson(iterator: AsyncIterable<string | null | Record<str
}
}

export async function* transformToBytesTracked(iterator: AsyncIterable<string>): AsyncGenerator<Buffer> {
export async function* transformToBytesTracked(
iterator: AsyncIterable<string>,
tracker: RequestTracker
): AsyncGenerator<Buffer> {
for await (let data of iterator) {
const encoded = Buffer.from(data, 'utf8');
Metrics.getInstance().data_synced_bytes.add(encoded.length);
tracker.addDataSynced(encoded.length);
yield encoded;
}
}
8 changes: 8 additions & 0 deletions packages/service-core/test/src/sync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { streamResponse } from '../../src/sync/sync.js';
import * as timers from 'timers/promises';
import { lsnMakeComparable } from '@powersync/service-jpgwire';
import { RequestParameters } from '@powersync/service-sync-rules';
import { RequestTracker } from '@/sync/RequestTracker.js';

describe('sync - mongodb', function () {
defineTests(MONGO_STORAGE_FACTORY);
Expand Down Expand Up @@ -38,6 +39,8 @@ bucket_definitions:
`;

function defineTests(factory: StorageFactory) {
const tracker = new RequestTracker();

test('sync global data', async () => {
const f = await factory();

Expand Down Expand Up @@ -78,6 +81,7 @@ function defineTests(factory: StorageFactory) {
include_checksum: true,
raw_data: true
},
tracker,
syncParams: new RequestParameters({ sub: '' }, {}),
token: { exp: Date.now() / 1000 + 10 } as any
});
Expand Down Expand Up @@ -118,6 +122,7 @@ function defineTests(factory: StorageFactory) {
include_checksum: true,
raw_data: false
},
tracker,
syncParams: new RequestParameters({ sub: '' }, {}),
token: { exp: Date.now() / 1000 + 10 } as any
});
Expand Down Expand Up @@ -146,6 +151,7 @@ function defineTests(factory: StorageFactory) {
include_checksum: true,
raw_data: true
},
tracker,
syncParams: new RequestParameters({ sub: '' }, {}),
token: { exp: 0 } as any
});
Expand All @@ -172,6 +178,7 @@ function defineTests(factory: StorageFactory) {
include_checksum: true,
raw_data: true
},
tracker,
syncParams: new RequestParameters({ sub: '' }, {}),
token: { exp: Date.now() / 1000 + 10 } as any
});
Expand Down Expand Up @@ -232,6 +239,7 @@ function defineTests(factory: StorageFactory) {
include_checksum: true,
raw_data: true
},
tracker,
syncParams: new RequestParameters({ sub: '' }, {}),
token: { exp: exp } as any
});
Expand Down