Conversation
…h generic Subscription Custom struct duplicated cancel-on-drop, AtomicBool dedup, message_bus storage, tokio::spawn in async Drop, and inline message dispatch — all already provided by the generic Subscription<T>. Replaced with a 30-line StreamDecoder impl (mirrors MarketDepths / Bar / Trade pattern) and a 3-line factory body. Net -271 lines. Async API breaks: next() now returns Option<Result<HistoricalBarUpdate, Error>> and there is no separate error() accessor. Sync API unchanged in this PR; see #487 for the follow-up. Drop tests rewired to drive through the public factory rather than the deleted internal constructor, so they exercise the real cancel-on-drop path end-to-end. Closes #431.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
HistoricalDataStreamingSubscriptionwithSubscription<HistoricalBarUpdate>by adding aStreamDecoder<HistoricalBarUpdate>impl that mirrors theMarketDepths/Bar/Tradepattern insrc/market_data/realtime/.historical/sync.rsandhistorical/async.rscollapse to encode +builder.send::<HistoricalBarUpdate>(). Custom struct,Drop, and inline message dispatch deleted in both files.Closes #431.
Why
The custom subscription duplicated infrastructure that the generic
Subscription<T>already provides:AtomicBoolcancelled guard,message_busstorage, cancel encoding,tokio::spawnin asyncDrop, error storage, timezone-fallback resolution, and inline message-type dispatch. Each lived in two places (sync + async).After the refactor each piece has one job:
HistoricalBarUpdate::decode— message-type dispatch.decode_historical_data{,_update,_end}— payload parsing (already split, reused).HistoricalBarUpdate::cancel_message— cancel encoding.Subscription<T>— lifecycle (cancel, drop dedup, error storage).HistoricalBarUpdatejoins the same plug-in family asBar,Trade,BidAsk,MarketDepths— all consumed by the same genericSubscription<T>.Breaking changes (intentional, v3)
subscription.next().awaitnow returnsOption<Result<HistoricalBarUpdate, Error>>. The separateerror()accessor is gone — errors arrive vianext().Option<HistoricalBarUpdate>+error()). Aligning sync with async is tracked in Align sync Subscription<T>::next() with async: return Option<Result<T, Error>> #487.HistoricalDataStreamingSubscriptionis no longer exported. Callers useSubscription<HistoricalBarUpdate>directly. Per the v3 philosophy in CLAUDE.md, no type alias.Test plan
cargo fmtcargo clippy --all-targets -- -D warningscargo clippy --all-targets --features sync -- -D warningscargo clippy --all-featurescargo test(default/async): 880 lib + 73 doc + integration — all greencargo test --no-default-features --features sync: 884 + 124 — all greencargo test --all-features: 1064 + 148 — all greencargo build --example async_historical_data— cleankeep_up_to_date=true, and clean Ctrl-C cancelBehavioral assertions verified by tests
CancelHistoricalDataproto sent (factory-driven test).cancel().awaitthen drop → exactly one cancel sent (dedup viaSubscription'sAtomicBool).next()yieldsSome(Err(_))(async) /None+error()populated (sync).