Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions modules/module-mongodb/src/replication/ChangeStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -517,14 +517,13 @@ export class ChangeStream {
const startAfter = mongoLsnToTimestamp(lastLsn) ?? undefined;
logger.info(`Resume streaming at ${startAfter?.inspect()} / ${lastLsn}`);

// TODO: Use changeStreamSplitLargeEvent

const filters = this.getSourceNamespaceFilters();

const pipeline: mongo.Document[] = [
{
$match: filters.$match
}
},
{ $changeStreamSplitLargeEvent: {} }
];

let fullDocument: 'required' | 'updateLookup';
Expand Down Expand Up @@ -568,22 +567,49 @@ export class ChangeStream {
// no data to replicate.
let waitForCheckpointLsn: string | null = await createCheckpoint(this.client, this.defaultDb);

let splitDocument: mongo.ChangeStreamDocument | null = null;

while (true) {
if (this.abort_signal.aborted) {
break;
}

const changeDocument = await stream.tryNext();
const originalChangeDocument = await stream.tryNext();

if (changeDocument == null || this.abort_signal.aborted) {
if (originalChangeDocument == null || this.abort_signal.aborted) {
continue;
}
await touch();

if (startAfter != null && changeDocument.clusterTime?.lte(startAfter)) {
if (startAfter != null && originalChangeDocument.clusterTime?.lte(startAfter)) {
continue;
}

let changeDocument = originalChangeDocument;
if (originalChangeDocument?.splitEvent != null) {
// Handle split events from $changeStreamSplitLargeEvent.
// This is only relevant for very large update operations.
const splitEvent = originalChangeDocument?.splitEvent;

if (splitDocument == null) {
splitDocument = originalChangeDocument;
} else {
splitDocument = Object.assign(splitDocument, originalChangeDocument);
}

if (splitEvent.fragment == splitEvent.of) {
// Got all fragments
changeDocument = splitDocument;
splitDocument = null;
} else {
// Wait for more fragments
continue;
}
} else if (splitDocument != null) {
// We were waiting for fragments, but got a different event
throw new Error(`Incomplete splitEvent: ${JSON.stringify(splitDocument.splitEvent)}`);
}

// console.log('event', changeDocument);

if (
Expand Down
40 changes: 29 additions & 11 deletions modules/module-mongodb/test/src/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -352,39 +352,57 @@ bucket_definitions:
expect(data).toMatchObject([putOp('test_data', { id: test_id, description: 'test1' })]);
});

// Not correctly implemented yet
test.skip('large record', async () => {
test('large record', async () => {
// Test a large update.

// Without $changeStreamSplitLargeEvent, we get this error:
// MongoServerError: PlanExecutor error during aggregation :: caused by :: BSONObj size: 33554925 (0x20001ED) is invalid.
// Size must be between 0 and 16793600(16MB)

await using context = await ChangeStreamTestContext.open(factory);
await context.updateSyncRules(`bucket_definitions:
global:
data:
- SELECT _id as id, description, other FROM "test_data"`);
- SELECT _id as id, name, other FROM "test_data"`);
const { db } = context;

await context.replicateSnapshot();

// 16MB
const largeDescription = crypto.randomBytes(8_000_000 - 100).toString('hex');

const collection = db.collection('test_data');
const result = await collection.insertOne({ description: largeDescription });
const result = await collection.insertOne({ name: 't1' });
const test_id = result.insertedId;

await collection.updateOne({ _id: test_id }, { $set: { name: 't2' } });
// 12MB field.
// The field appears twice in the ChangeStream event, so the total size
// is > 16MB.

// We don't actually have this description field in the sync rules,
// That causes other issues, not relevant for this specific test.
const largeDescription = crypto.randomBytes(12000000 / 2).toString('hex');

await collection.updateOne({ _id: test_id }, { $set: { description: largeDescription } });
context.startStreaming();

const data = await context.getBucketData('global[]');
expect(data.length).toEqual(2);
const row = JSON.parse(data[0].data as string);
delete row.description;
expect(row).toEqual({ id: test_id.toHexString() });
const row1 = JSON.parse(data[0].data as string);
expect(row1).toEqual({ id: test_id.toHexString(), name: 't1' });
delete data[0].data;
expect(data[0]).toMatchObject({
object_id: test_id.toHexString(),
object_type: 'test_data',
op: 'PUT',
op_id: '1'
});
const row2 = JSON.parse(data[1].data as string);
expect(row2).toEqual({ id: test_id.toHexString(), name: 't1' });
delete data[1].data;
expect(data[1]).toMatchObject({
object_id: test_id.toHexString(),
object_type: 'test_data',
op: 'PUT',
op_id: '2'
});
});

test('collection not in sync rules', async () => {
Expand Down
Loading