[MongoDB] Direct BSON Buffer -> JSON conversion#599
Conversation
|
simolus3
left a comment
There was a problem hiding this comment.
This also looks good to me, the only potential issue I see are UTF-16 surrogate pairs. It might make sense to add tests for those.
I didn't check testcases and the benchmark in detail, but the implementation makes sense to me.
6d7e475 to
b68bea1
Compare
d02a866 to
57e73c3
Compare
Merges upstream main which includes PR #591 (raw change streams) and PR #599 (direct BSON Buffer -> JSON conversion). Auth fix conflicts (types.ts, config.test.ts) resolved — both sides had the same fix, upstream also added database name decoding. ChangeStream.ts has 11 unresolved conflicts — PR #591 replaced the MongoDB driver ChangeStream with a custom RawChangeStream using raw aggregate + getMore. Our Cosmos DB changes need to be re-applied to the new code structure. Resolved in the next commit.
Merges upstream main which includes PR #591 (raw change streams) and PR #599 (direct BSON Buffer -> JSON conversion). Auth fix conflicts (types.ts, config.test.ts) resolved — both sides had the same fix, upstream also added database name decoding. ChangeStream.ts has 11 unresolved conflicts — PR #591 replaced the MongoDB driver ChangeStream with a custom RawChangeStream using raw aggregate + getMore. Our Cosmos DB changes need to be re-applied to the new code structure. Resolved in the next commit. resolve: ChangeStream.ts merge conflicts for raw change streams Re-applied all Cosmos DB changes to the new raw change stream code structure from PR #591. The raw aggregate approach is better for Cosmos DB: no lazy ChangeStream init, explicit cursor management, $changeStream stage built directly in pipeline. Changes applied to new structure: - detectCosmosDb() calls in getSnapshotLsn, initReplication, streamChangesInternal - getEventTimestamp() adapted to ProjectedChangeStreamDocument type - Sentinel checkpoint with BSON.deserialize for fullDocument (raw Buffer) - Pipeline guards: skip $changeStreamSplitLargeEvent and showExpandedEvents - Cluster-level aggregate (admin db + allChangesForCluster) when isCosmosDb - startAtOperationTime fix (startAfter != null) - Keepalive guard for Cosmos DB resume tokens - .lte() dedup guard skip on Cosmos DB - wallTime tracking for replication lag - Added changeset for @powersync/service-module-mongodb (minor) Verified: 59/59 standard MongoDB tests pass. Cosmos DB cluster is currently down — tests blocked by TLS timeout. Code audit of RawChangeStream.ts found no compatibility issues: cursor ID type auto-fixed by BigInt, postBatchResumeToken needs empirical verification when cluster is back.
Merges upstream main which includes PR #591 (raw change streams) and PR #599 (direct BSON Buffer -> JSON conversion). Auth fix conflicts (types.ts, config.test.ts) resolved — both sides had the same fix, upstream also added database name decoding. ChangeStream.ts has 11 unresolved conflicts — PR #591 replaced the MongoDB driver ChangeStream with a custom RawChangeStream using raw aggregate + getMore. Our Cosmos DB changes need to be re-applied to the new code structure. Resolved in the next commit. resolve: ChangeStream.ts merge conflicts for raw change streams Re-applied all Cosmos DB changes to the new raw change stream code structure from PR #591. The raw aggregate approach is better for Cosmos DB: no lazy ChangeStream init, explicit cursor management, $changeStream stage built directly in pipeline. Changes applied to new structure: - detectCosmosDb() calls in getSnapshotLsn, initReplication, streamChangesInternal - getEventTimestamp() adapted to ProjectedChangeStreamDocument type - Sentinel checkpoint with BSON.deserialize for fullDocument (raw Buffer) - Pipeline guards: skip $changeStreamSplitLargeEvent and showExpandedEvents - Cluster-level aggregate (admin db + allChangesForCluster) when isCosmosDb - startAtOperationTime fix (startAfter != null) - Keepalive guard for Cosmos DB resume tokens - .lte() dedup guard skip on Cosmos DB - wallTime tracking for replication lag - Added changeset for @powersync/service-module-mongodb (minor) Verified: 59/59 standard MongoDB tests pass. Cosmos DB cluster is currently down — tests blocked by TLS timeout. Code audit of RawChangeStream.ts found no compatibility issues: cursor ID type auto-fixed by BigInt, postBatchResumeToken needs empirical verification when cluster is back.
Merges upstream main which includes PR #591 (raw change streams) and PR #599 (direct BSON Buffer -> JSON conversion). Auth fix conflicts (types.ts, config.test.ts) resolved — both sides had the same fix, upstream also added database name decoding. ChangeStream.ts has 11 unresolved conflicts — PR #591 replaced the MongoDB driver ChangeStream with a custom RawChangeStream using raw aggregate + getMore. Our Cosmos DB changes need to be re-applied to the new code structure. Resolved in the next commit. resolve: ChangeStream.ts merge conflicts for raw change streams Re-applied all Cosmos DB changes to the new raw change stream code structure from PR #591. The raw aggregate approach is better for Cosmos DB: no lazy ChangeStream init, explicit cursor management, $changeStream stage built directly in pipeline. Changes applied to new structure: - detectCosmosDb() calls in getSnapshotLsn, initReplication, streamChangesInternal - getEventTimestamp() adapted to ProjectedChangeStreamDocument type - Sentinel checkpoint with BSON.deserialize for fullDocument (raw Buffer) - Pipeline guards: skip $changeStreamSplitLargeEvent and showExpandedEvents - Cluster-level aggregate (admin db + allChangesForCluster) when isCosmosDb - startAtOperationTime fix (startAfter != null) - Keepalive guard for Cosmos DB resume tokens - .lte() dedup guard skip on Cosmos DB - wallTime tracking for replication lag - Added changeset for @powersync/service-module-mongodb (minor) Verified: 59/59 standard MongoDB tests pass. Cosmos DB cluster is currently down — tests blocked by TLS timeout. Code audit of RawChangeStream.ts found no compatibility issues: cursor ID type auto-fixed by BigInt, postBatchResumeToken needs empirical verification when cluster is back.
Merges upstream main which includes PR #591 (raw change streams) and PR #599 (direct BSON Buffer -> JSON conversion). Auth fix conflicts (types.ts, config.test.ts) resolved — both sides had the same fix, upstream also added database name decoding. ChangeStream.ts has 11 unresolved conflicts — PR #591 replaced the MongoDB driver ChangeStream with a custom RawChangeStream using raw aggregate + getMore. Our Cosmos DB changes need to be re-applied to the new code structure. Resolved in the next commit. resolve: ChangeStream.ts merge conflicts for raw change streams Re-applied all Cosmos DB changes to the new raw change stream code structure from PR #591. The raw aggregate approach is better for Cosmos DB: no lazy ChangeStream init, explicit cursor management, $changeStream stage built directly in pipeline. Changes applied to new structure: - detectCosmosDb() calls in getSnapshotLsn, initReplication, streamChangesInternal - getEventTimestamp() adapted to ProjectedChangeStreamDocument type - Sentinel checkpoint with BSON.deserialize for fullDocument (raw Buffer) - Pipeline guards: skip $changeStreamSplitLargeEvent and showExpandedEvents - Cluster-level aggregate (admin db + allChangesForCluster) when isCosmosDb - startAtOperationTime fix (startAfter != null) - Keepalive guard for Cosmos DB resume tokens - .lte() dedup guard skip on Cosmos DB - wallTime tracking for replication lag - Added changeset for @powersync/service-module-mongodb (minor) Verified: 59/59 standard MongoDB tests pass. Cosmos DB cluster is currently down — tests blocked by TLS timeout. Code audit of RawChangeStream.ts found no compatibility issues: cursor ID type auto-fixed by BigInt, postBatchResumeToken needs empirical verification when cluster is back.
Builds on #598.
This provides an alternative implementation for
rawToSqliteRow. For nested documents and arrays, this converts from Buffer -> JSON Buffer -> string, without intermediate bson.deserialize or JSON(Big).stringify. This can significantly reduce allocations and improve throughput in cases with large nested documents or arrays.Initial micro-benchmarks comparing the new approach to the old one:
End-to-end benchmarks compared to main (excludes #598 and #591). This is tested using documents of 100KB+ in size for initial snapshot, and making small updates to 2MB+ in size for the change stream test. This uses a local bucket storage database on NVMe disk, which significantly reduces the typical bucket storage overhead, instead just focusing on the CPU and memory overhead.
Implementation
The implementation uses a custom BSON parser. For each top-level value converted to JSON, we write the results into a Buffer, then convert that buffer to a string. In my early benchmarks, this was faster than using direct string concatenation.
I attempted to optimize the common cases as much as possible. The more esoteric types like regular expressions, DBPointer, etc are supported, but not specifically optimized for performance.
The implementation was largely using AI-assisted development (Codex), but with lots of manual effort to direct, review and test the implementation.
Since there are many edge cases, this relies on an extensive test suite to check for correctness, including matching the old implementation for the most part.
The old implementation is still kept around for:
Copying from the jsdocs:
This attempts to match the behavior of
bson.deserialize -> constructAfterRecord -> applyRowContextfor the most part, with some intentional differences:General principles followed:
Future optimizations
With these changes, CPU should be much less of a bottleneck for replicating from MongoDB. If we do need to optimize it further, there are some options: