Skip to content

Remove DataStream, support binary sync lines over HTTP#821

Merged
simolus3 merged 14 commits intomainfrom
remove-datastream
Mar 31, 2026
Merged

Remove DataStream, support binary sync lines over HTTP#821
simolus3 merged 14 commits intomainfrom
remove-datastream

Conversation

@simolus3
Copy link
Copy Markdown
Contributor

@simolus3 simolus3 commented Jan 13, 2026

This refactors AbstractRemote and AbstractStreamingSyncImplementation to have clearer control flow and to support streaming BSON documents over HTTP.

In our SDK, we're using the DataStream utility as a polyfill replacing parts of the ReadableStreams web API and async iterables in JavaScript (since RN supports neither). However, that abstraction turned out to hurt us quite a bit:

  1. For some original sync tests, race conditions where causing events to get lost.
  2. For the Rust sync client, we had a number of issues where we were trying to post completed token refreshs or crud upload notifications into a closed stream, causing exceptions
  3. Most recently, there was another race condition causing CI failures: Fix: DataStream Race Condition (and flaky unit tests) #819

Clearly, we're having a hard time using DataStream correctly. By design, DataStream is the equivalent of an asynchronous channel supporting multiple producers and multiple consumers. This makes implementing it quite complicated. However, we don't need most of that in practice!

  1. We're only using DataStream to iterate over sync lines in an async while loop. There is never any concurrency on the consuming side, and there is never more than one consumer.
  2. There are technically multiple producers, but there is one clear "main" producer and another one adding secondary events like crud uploads. So we can scope the lifetime of the stream to the main producer.

This PR replaces DataStream with a variant of AsyncIterator that only has a next method. This encodes the expectation that we'll only ever have one listener owning the stream. I have also deliberately hidden the return method. Instead, the only way to cancel a stream is to pass an AbortSignal to the place creating it and marking that as aborted. This simplifies handling streams in the remote implementation, and makes data flow a bit easier to understand:

  1. When reading streams via fetch, no translation is necessary because the default response reader already implements the format we need.
  2. To translate between the push-based mechanism from RSocket and async iterators, we can use the event-iterator package we already depend on and bundle. We just need to rewrite Symbol.asyncIterator to a polyfill for RN.
  3. Things like parsing lines as BSON or JSON are now a stream transformation instead of being done in AbstractRemote, making control flow easier to reason about.

This also allows us to reuse the networking logic between the legacy and the Rust sync client. Finally, I've added support for receiving binary sync lines over HTTP, a feature we support in our Kotlin and Dart SDKs.

Benchmarks

I ran a very simple benchmark in the node demo app that consists of:

  1. Connecting and waiting for the initial sync without any stream subscription.
  2. Subscribing to a stream with 1M rows and waiting for that to sync for the first time (this is not a great benchmark since the sync-local step takes around 3 seconds here, but if something was truly horrible here we'd see it).
Benchmark Before After
RSocket 9.97s 9.52s
HTTP, text 10.93s 10.77s
HTTP, bson (unsupported) 10.74s

@changeset-bot
Copy link
Copy Markdown

changeset-bot bot commented Jan 13, 2026

🦋 Changeset detected

Latest commit: 68aaaf2

The changes in this PR will be included in the next version bump.

This PR includes changesets to release 8 packages
Name Type
@powersync/react-native Minor
@powersync/common Minor
@powersync/adapter-sql-js Patch
@powersync/node Patch
@powersync/op-sqlite Patch
@powersync/tanstack-react-query Patch
@powersync/web Patch
@powersync/diagnostics-app Patch

Not sure what this means? Click here to learn what changesets are.

Click here if you're a maintainer who wants to add another changeset to this PR

@simolus3 simolus3 marked this pull request as ready for review January 15, 2026 16:26
@simolus3 simolus3 requested a review from rkistner January 15, 2026 16:29
Copy link
Copy Markdown
Contributor

@rkistner rkistner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the change to remove DataStream - I agree plain AsyncIterators are better here.

I do think we need to test this properly to make sure there are no regressions. I also wonder whether we should stick to JSON for HTTP streams to start with, and make it either opt-in, or part of a major release to switch to BSON? Apart from potential bugs, I've seen cases where BSON is actually larger than the equivalent JSON for the same amount of data.

What is the default for HTTP in other SDKs currently?

Comment thread packages/common/src/utils/stream_transform.ts Outdated
@simolus3
Copy link
Copy Markdown
Contributor Author

What is the default for HTTP in other SDKs currently?

Dart, Kotlin and native all unconditionally prefer BSON when the Rust client is used, they'd only use JSON on older service versions.

I've seen cases where BSON is actually larger than the equivalent JSON for the same amount of data.

To be fair, I would expect the (currently JSON strings) oplog data lines to make up the bulk of traffic for PowerSync. So whether the container around that is BSON or JSON is probably insignificant.

and make it either opt-in, or part of a major release to switch to BSON

I'm not a fan of making this an option. If using BSON has benefits, we should aim to use it by default. If it's worse, we shouldn't have it in our SDKs at all.
As far as I can remember, the original motivation for BSON-over-HTTP was to support syncing binary data. Having every SDK use BSON by default was a requirement for that, but there has been no further progress since.

So at this point, I'm actually not sure if adding BSON support over HTTP is something worth doing. It being cheaper to decode in the core extension (even with oplog data being JSON, we don't have to scan for escape characters in those large buffers) is the only current benefit I see.

I can remove BSON support from this PR specifically to get async iterators done earlier, but we should come up with a plan for this. We should have a consistent approach to this in our SDKs, so if we don't want BSON we should remove it from the sync service HTTP endpoint and our other SDKs as well.

@rkistner
Copy link
Copy Markdown
Contributor

My biggest concern is that we shouldn't switch from json to bson by default in a patch release. It can make sense to do the switch for the future plan, but then we need to make sure to test the performance properly, and make it a new minor version at minimum.

@simolus3 simolus3 merged commit 9cfca81 into main Mar 31, 2026
16 checks passed
@simolus3 simolus3 deleted the remove-datastream branch March 31, 2026 11:38
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants