-
Notifications
You must be signed in to change notification settings - Fork 25
feat: add program subscription support #703
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
📝 WalkthroughWalkthroughAdds program-account subscription support across the remote account provider stack: new program_subs state in ChainPubsubActor, SubMuxClient, and config; new ProgramSubscribe and AccountUnsubscribe actor messages; AccountSubscriptionTask::SubscribeProgram variant; a public subscribe_program API path; wiring to initialize and re-establish program subscriptions on startup/reconnect; and updated cleanup/error paths to drain both account and program subscriptions. Possibly related PRs
Suggested reviewers
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Manual Deploy AvailableYou can trigger a manual deploy of this PR branch to testnet: Alternative: Comment
Comment updated automatically when the PR is synchronized. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
magicblock-chainlink/src/submux/mod.rs (1)
311-344: Good reconnection logic, but consider logging program subscription count.The program resubscription loop correctly:
- Copies the program IDs from the mutex
- Iterates and subscribes each program
- Returns false on any failure (triggering retry)
Consider adding a debug log for visibility into program resubscription similar to account resubscription:
// Resubscribe all program subscriptions let programs: HashSet<Pubkey> = program_subs.lock().unwrap().iter().copied().collect(); + if !programs.is_empty() { + debug!("Resubscribing {} program subscriptions after reconnect", programs.len()); + } for program_id in programs {
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (6)
magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs(17 hunks)magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs(6 hunks)magicblock-chainlink/src/remote_account_provider/config.rs(5 hunks)magicblock-chainlink/src/remote_account_provider/mod.rs(1 hunks)magicblock-chainlink/src/submux/mod.rs(10 hunks)magicblock-chainlink/src/submux/subscription_task.rs(5 hunks)
🧰 Additional context used
🧠 Learnings (5)
📓 Common learnings
Learnt from: bmuddha
Repo: magicblock-labs/magicblock-validator PR: 578
File: magicblock-aperture/src/requests/websocket/account_subscribe.rs:18-27
Timestamp: 2025-10-21T14:00:54.642Z
Learning: In magicblock-aperture account_subscribe handler (src/requests/websocket/account_subscribe.rs), the RpcAccountInfoConfig fields data_slice, commitment, and min_context_slot are currently ignored—only encoding is applied. This is tracked as technical debt in issue #579: https://github.com/magicblock-labs/magicblock-validator/issues/579
Learnt from: thlorenz
Repo: magicblock-labs/magicblock-validator PR: 621
File: magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs:457-495
Timestamp: 2025-11-07T14:20:31.457Z
Learning: In magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs, the unsubscribe closure returned by PubSubConnection::account_subscribe(...) resolves to () (unit), not a Result. Downstream code should not attempt to inspect an unsubscribe result and can optionally wrap it in a timeout to guard against hangs.
📚 Learning: 2025-11-07T14:20:31.457Z
Learnt from: thlorenz
Repo: magicblock-labs/magicblock-validator PR: 621
File: magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs:457-495
Timestamp: 2025-11-07T14:20:31.457Z
Learning: In magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs, the unsubscribe closure returned by PubSubConnection::account_subscribe(...) resolves to () (unit), not a Result. Downstream code should not attempt to inspect an unsubscribe result and can optionally wrap it in a timeout to guard against hangs.
Applied to files:
magicblock-chainlink/src/remote_account_provider/mod.rsmagicblock-chainlink/src/submux/subscription_task.rsmagicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rsmagicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rsmagicblock-chainlink/src/remote_account_provider/config.rsmagicblock-chainlink/src/submux/mod.rs
📚 Learning: 2025-11-19T09:34:37.917Z
Learnt from: thlorenz
Repo: magicblock-labs/magicblock-validator PR: 621
File: test-integration/test-chainlink/tests/ix_remote_account_provider.rs:62-63
Timestamp: 2025-11-19T09:34:37.917Z
Learning: In test-integration/test-chainlink/tests/ix_remote_account_provider.rs and similar test files, the `_fwd_rx` receiver returned by `init_remote_account_provider()` is intentionally kept alive (but unused) to prevent "receiver dropped" errors on the sender side. The pattern `let (remote_account_provider, _fwd_rx) = init_remote_account_provider().await;` should NOT be changed to `let (remote_account_provider, _) = ...` because dropping the receiver would cause send() operations to fail.
Applied to files:
magicblock-chainlink/src/remote_account_provider/mod.rsmagicblock-chainlink/src/submux/subscription_task.rsmagicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rsmagicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rsmagicblock-chainlink/src/remote_account_provider/config.rsmagicblock-chainlink/src/submux/mod.rs
📚 Learning: 2025-10-21T14:00:54.642Z
Learnt from: bmuddha
Repo: magicblock-labs/magicblock-validator PR: 578
File: magicblock-aperture/src/requests/websocket/account_subscribe.rs:18-27
Timestamp: 2025-10-21T14:00:54.642Z
Learning: In magicblock-aperture account_subscribe handler (src/requests/websocket/account_subscribe.rs), the RpcAccountInfoConfig fields data_slice, commitment, and min_context_slot are currently ignored—only encoding is applied. This is tracked as technical debt in issue #579: https://github.com/magicblock-labs/magicblock-validator/issues/579
Applied to files:
magicblock-chainlink/src/remote_account_provider/mod.rsmagicblock-chainlink/src/submux/subscription_task.rsmagicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rsmagicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rsmagicblock-chainlink/src/remote_account_provider/config.rsmagicblock-chainlink/src/submux/mod.rs
📚 Learning: 2025-10-26T16:53:29.820Z
Learnt from: thlorenz
Repo: magicblock-labs/magicblock-validator PR: 587
File: magicblock-chainlink/src/remote_account_provider/mod.rs:134-0
Timestamp: 2025-10-26T16:53:29.820Z
Learning: In magicblock-chainlink/src/remote_account_provider/mod.rs, the `Endpoint::separate_pubsub_url_and_api_key()` method uses `split_once("?api-key=")` because the api-key parameter is always the only query parameter right after `?`. No additional query parameter parsing is needed for this use case.
Applied to files:
magicblock-chainlink/src/remote_account_provider/mod.rsmagicblock-chainlink/src/remote_account_provider/config.rs
🧬 Code graph analysis (4)
magicblock-chainlink/src/remote_account_provider/mod.rs (1)
magicblock-chainlink/src/remote_account_provider/config.rs (1)
program_subs(74-76)
magicblock-chainlink/src/submux/subscription_task.rs (1)
magicblock-version/src/lib.rs (1)
client(42-44)
magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs (2)
magicblock-chainlink/src/remote_account_provider/config.rs (2)
default(80-88)program_subs(74-76)magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs (7)
subscriptions(175-175)subscriptions(303-305)subscriptions(491-494)unsubscribe(158-161)unsubscribe(272-288)unsubscribe(462-470)send(388-397)
magicblock-chainlink/src/submux/mod.rs (2)
magicblock-chainlink/src/remote_account_provider/config.rs (2)
program_subs(74-76)default(80-88)magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs (3)
subscribe_program(154-157)subscribe_program(254-270)subscribe_program(455-460)
🔇 Additional comments (19)
magicblock-chainlink/src/submux/subscription_task.rs (3)
14-19: LGTM! Clean extension of the enum.The
SubscribeProgramvariant correctly mirrors theSubscribevariant structure with(Pubkey, usize)for program_id and required confirmations.
44-47: LGTM! Pattern matching correctly unified.Using
Subscribe(_, n) | SubscribeProgram(_, n)cleanly extracts the confirmation count from both variants.
59-68: LGTM! Validation correctly extended.The zero-confirmation check now properly covers both
SubscribeandSubscribeProgramvariants.magicblock-chainlink/src/remote_account_provider/config.rs (5)
1-4: LGTM! Clean import additions.The imports for
HashSetandPubkeyare correctly scoped for the newprogram_subsfield.
20-22: LGTM! Clear documentation.The doc comment clearly explains the purpose of
program_subsas a backup mechanism for direct account subscriptions.
47-52: Verify that..Default::default()initializesprogram_subsas intended.Using struct update syntax here means
program_subswill be initialized tovec![dlp::id()].into_iter().collect()from theDefaultimplementation. This is correct for production but may be unexpected for callers oftry_new_with_metricswho might assume only the explicitly passed fields are set.If explicit control over
program_subsis needed in the future, consider adding a builder pattern or an additional constructor parameter.
73-76: LGTM! Efficient accessor.Returning
&HashSet<Pubkey>avoids cloning and is consistent with other accessors in the struct.
86-86: LGTM! Default includes delegation program.As per PR objectives, the default configuration includes the delegation program (
dlp::id()) for program subscriptions.magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs (5)
17-20: LGTM! Appropriate imports for program subscription types.The new imports align with the program subscription functionality being added.
39-42: LGTM! Consistent type alias pattern.
ProgramSubscribeResultfollows the same pattern as the existingSubscribeResulttype alias.
89-110: Verify the safety of the lifetime transmute.The
unsafetransmute extends the stream lifetime from'_to'static. The SAFETY comment mirrors the existingaccount_subscribeimplementation, which is a good sign for consistency.However, verify that:
- The
PubsubClientstored inArcSwapis only replaced during reconnection (when the old stream would be invalid anyway)- No code path can drop the underlying client while a stream is still in use
Based on the existing pattern in
account_subscribe(lines 80-85), this appears to follow the established convention for this codebase.
154-157: LGTM! Clean trait extension.The
subscribe_programmethod follows the same async pattern assubscribe.
254-270: LGTM! Consistent implementation pattern.The
subscribe_programimplementation correctly:
- Creates a oneshot channel for the response
- Sends
ProgramSubscribemessage to the actor- Logs appropriate warnings on receive errors
This mirrors the existing
subscribemethod implementation.magicblock-chainlink/src/submux/mod.rs (2)
140-141: LGTM! Consistent state tracking pattern.Using
Arc<Mutex<HashSet<Pubkey>>>forprogram_subsfollows the same pattern as other shared state in this struct.
205-211: LGTM! Clean initialization and propagation.The
program_subsis properly initialized withDefault::default()(empty set) and cloned for the reconnectors.magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs (4)
84-86: LGTM!The new
program_subsfield mirrors the existingsubscriptionsfield structure appropriately, enabling parallel tracking of program-level subscriptions with proper lifecycle management.
113-120: LGTM!The
ProgramSubscribemessage variant is well-documented. The note about only forwarding updates for accounts that are also directly subscribed (insubscriptions) clearly explains the filtering behavior implemented inadd_program_sub.
181-190: LGTM!Chaining both
subscriptionsandprogram_subsdrain iterators ensures all subscriptions are properly cancelled during shutdown.
712-732: Good refactoring.Extracting
drain_subscriptionsas a helper avoids code duplication and ensures consistent cleanup behavior for both account and program subscriptions.
magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs
Outdated
Show resolved
Hide resolved
* master: fix: don't unbork for same delegation slot as remote slot (#702)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs (1)
145-157: subscribe_program API wiring through the actor is straightforward and aligned with subscribe/unsubscribeThe new
ChainPubsubClient::subscribe_programtrait method and itsChainPubsubClientImplimplementation follow the same pattern assubscribe/unsubscribe: send aProgramSubscribemessage, await the oneshot, and log a detailed warning onRecvError. Error propagation (RemoteAccountProviderResult<()>) is preserved end‑to‑end. The only minor readability nit is that the actor message field is namedpubkeyeven though it represents a program ID; renaming it toprogram_idin the enum and call sites would make logs slightly clearer but is not functionally required.Also applies to: 218-270
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (2)
magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs(17 hunks)magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs(6 hunks)
🧰 Additional context used
🧠 Learnings (7)
📓 Common learnings
Learnt from: thlorenz
Repo: magicblock-labs/magicblock-validator PR: 703
File: magicblock-chainlink/src/submux/mod.rs:652-654
Timestamp: 2025-12-01T16:02:05.344Z
Learning: In magicblock-chainlink/src/submux/mod.rs, the subscribe_program method intentionally adds program_id to program_subs before attempting the subscription. This ensures that even if the initial subscription fails or only partially succeeds across clients, the reconnection logic will retry the subscription. This is a deliberate design pattern for resilience in the multi-client architecture and should not be "fixed" to remove entries on failure.
Learnt from: bmuddha
Repo: magicblock-labs/magicblock-validator PR: 578
File: magicblock-aperture/src/requests/websocket/account_subscribe.rs:18-27
Timestamp: 2025-10-21T14:00:54.642Z
Learning: In magicblock-aperture account_subscribe handler (src/requests/websocket/account_subscribe.rs), the RpcAccountInfoConfig fields data_slice, commitment, and min_context_slot are currently ignored—only encoding is applied. This is tracked as technical debt in issue #579: https://github.com/magicblock-labs/magicblock-validator/issues/579
Learnt from: thlorenz
Repo: magicblock-labs/magicblock-validator PR: 621
File: magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs:457-495
Timestamp: 2025-11-07T14:20:31.457Z
Learning: In magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs, the unsubscribe closure returned by PubSubConnection::account_subscribe(...) resolves to () (unit), not a Result. Downstream code should not attempt to inspect an unsubscribe result and can optionally wrap it in a timeout to guard against hangs.
📚 Learning: 2025-11-07T14:20:31.457Z
Learnt from: thlorenz
Repo: magicblock-labs/magicblock-validator PR: 621
File: magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs:457-495
Timestamp: 2025-11-07T14:20:31.457Z
Learning: In magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs, the unsubscribe closure returned by PubSubConnection::account_subscribe(...) resolves to () (unit), not a Result. Downstream code should not attempt to inspect an unsubscribe result and can optionally wrap it in a timeout to guard against hangs.
Applied to files:
magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rsmagicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs
📚 Learning: 2025-12-01T16:02:05.344Z
Learnt from: thlorenz
Repo: magicblock-labs/magicblock-validator PR: 703
File: magicblock-chainlink/src/submux/mod.rs:652-654
Timestamp: 2025-12-01T16:02:05.344Z
Learning: In magicblock-chainlink/src/submux/mod.rs, the subscribe_program method intentionally adds program_id to program_subs before attempting the subscription. This ensures that even if the initial subscription fails or only partially succeeds across clients, the reconnection logic will retry the subscription. This is a deliberate design pattern for resilience in the multi-client architecture and should not be "fixed" to remove entries on failure.
Applied to files:
magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rsmagicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs
📚 Learning: 2025-11-19T09:34:37.917Z
Learnt from: thlorenz
Repo: magicblock-labs/magicblock-validator PR: 621
File: test-integration/test-chainlink/tests/ix_remote_account_provider.rs:62-63
Timestamp: 2025-11-19T09:34:37.917Z
Learning: In test-integration/test-chainlink/tests/ix_remote_account_provider.rs and similar test files, the `_fwd_rx` receiver returned by `init_remote_account_provider()` is intentionally kept alive (but unused) to prevent "receiver dropped" errors on the sender side. The pattern `let (remote_account_provider, _fwd_rx) = init_remote_account_provider().await;` should NOT be changed to `let (remote_account_provider, _) = ...` because dropping the receiver would cause send() operations to fail.
Applied to files:
magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rsmagicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs
📚 Learning: 2025-10-21T14:00:54.642Z
Learnt from: bmuddha
Repo: magicblock-labs/magicblock-validator PR: 578
File: magicblock-aperture/src/requests/websocket/account_subscribe.rs:18-27
Timestamp: 2025-10-21T14:00:54.642Z
Learning: In magicblock-aperture account_subscribe handler (src/requests/websocket/account_subscribe.rs), the RpcAccountInfoConfig fields data_slice, commitment, and min_context_slot are currently ignored—only encoding is applied. This is tracked as technical debt in issue #579: https://github.com/magicblock-labs/magicblock-validator/issues/579
Applied to files:
magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rsmagicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs
📚 Learning: 2025-10-26T16:53:29.820Z
Learnt from: thlorenz
Repo: magicblock-labs/magicblock-validator PR: 587
File: magicblock-chainlink/src/remote_account_provider/mod.rs:134-0
Timestamp: 2025-10-26T16:53:29.820Z
Learning: In magicblock-chainlink/src/remote_account_provider/mod.rs, the `Endpoint::separate_pubsub_url_and_api_key()` method uses `split_once("?api-key=")` because the api-key parameter is always the only query parameter right after `?`. No additional query parameter parsing is needed for this use case.
Applied to files:
magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs
📚 Learning: 2025-11-07T13:09:52.253Z
Learnt from: bmuddha
Repo: magicblock-labs/magicblock-validator PR: 589
File: test-kit/src/lib.rs:275-0
Timestamp: 2025-11-07T13:09:52.253Z
Learning: In test-kit, the transaction scheduler in ExecutionTestEnv is not expected to shut down during tests. Therefore, using `.unwrap()` in test helper methods like `schedule_transaction` is acceptable and will not cause issues in the test environment.
Applied to files:
magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs
🧬 Code graph analysis (2)
magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs (1)
magicblock-chainlink/src/submux/mod.rs (1)
subscribe_program(641-662)
magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs (1)
magicblock-chainlink/src/remote_account_provider/chain_pubsub_client.rs (7)
subscriptions(175-175)subscriptions(303-305)subscriptions(500-503)unsubscribe(158-161)unsubscribe(272-288)unsubscribe(471-479)send(388-397)
🔇 Additional comments (1)
magicblock-chainlink/src/remote_account_provider/chain_pubsub_actor.rs (1)
396-402: Extending add_sub to participate in program abort logic is consistent and preserves behaviorUpdating
add_subto acceptprogram_subsand thread it intoabort_and_signal_connection_issueensures that when an account subscription detects a connection issue (e.g., RPC error or EOF), both account and program subscriptions are drained and canceled together. The rest of the function—early‑existence check, immediate insertion intosubsto close unsubscribe races, subscription setup, and timeout‑wrappedunsubscribe()that ignores the inner result—remains unchanged and aligns with prior guidance about the unsubscribe closure returning().Based on learnings, this matches the intended unsubscribe pattern for the underlying pubsub client.
Also applies to: 445-457, 491-520
GabrielePicco
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
As discussed, we can abstract a pubsub-agnostic implementation so the same behavior can run over gRPC or Laser.
* master: feat: add program subscription support (#703) feat: add a way to remove confined accounts on start (#726) chore: remove unused dependencies in the workspace (#732) fix: prevent removal of delegated/undelegating accounts on eviction (#724) fix: Fix per-account livelock (#728) fix: properly prune program cache upon corruption (#730)
Summary
Add support for program account subscriptions to the pubsub layer. This enables subscribing to
accounts within specific programs (e.g., the delegation program) as a backup mechanism for
direct account subscriptions. Updates to accounts within subscribed programs that are also
directly subscribed will be forwarded to the account subscription system.
Details
This implementation extends the pubsub infrastructure across multiple layers to support program
subscriptions:
ChainPubsubActor
program_subsfield to track program subscriptions separately from account subscriptionsProgramSubscribemessage variant to handle program subscription requestsadd_program_submethod that establishes a program subscription via the RPC connection and relays matching account updates (only for accounts also directly subscribed)ChainPubsubClient
program_subscribemethod to the trait for subscribing to programsChainPubsubClientImplto forward program subscription requests to the actorprogram_subscribetoPubSubConnectionwith proper stream lifetime handling via unsafe transmuteRemoteAccountProviderConfig
program_subsfield containing a set of program IDs to subscribe to on startupSubMuxClient
program_subsfieldsubscribe_programmethod that adds the program to the tracking set and broadcasts the subscription request to all clientsAccountSubscriptionTask
SubscribeProgramvariant to the task enumSummary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.