Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ All notable changes to this project will be documented in this file.
- Serviceability: DeleteUser instruction supports atomic deallocate+closeaccount when OnchainAllocation feature is enabled
- Serviceability: CreateLink instruction supports atomic create+allocate+activate when OnchainAllocation feature is enabled
- Serviceability: DeleteLink instruction supports atomic deallocate+closeaccount when OnchainAllocation feature is enabled
- Serviceability: CreateSubscribeUser instruction supports atomic create+allocate+activate when OnchainAllocation feature is enabled

## [v0.9.0](https://github.com/malbeclabs/doublezero/compare/client/v0.8.11...client/v0.9.0) - 2026-02-27

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,7 @@ mod tests {
publisher: false,
subscriber: true,
tunnel_endpoint: Ipv4Addr::UNSPECIFIED,
dz_prefix_count: 0,
}),
"CreateSubscribeUser",
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use solana_program::{
account_info::{next_account_info, AccountInfo},
entrypoint::ProgramResult,
msg,
program_error::ProgramError,
pubkey::Pubkey,
};
use std::{fmt, net::Ipv4Addr};
Expand All @@ -35,6 +36,90 @@ impl fmt::Debug for MulticastGroupSubscribeArgs {
}
}

pub struct SubscribeUserResult {
pub mgroup: MulticastGroup,
/// True if the publisher list transitioned between empty and non-empty
/// (gained first publisher or lost last publisher). Callers that need to
/// trigger activator reprocessing should check this flag.
pub publisher_list_transitioned: bool,
}

/// Toggle a user's multicast group subscription.
///
/// Handles both create-time subscription (user lists start empty, only adds)
/// and post-activation subscription changes (add/remove toggle). The caller is
/// responsible for setting `user.status = Updating` when
/// `publisher_list_transitioned` is true and the user is already activated.
pub fn subscribe_user_to_multicastgroup(
mgroup_account: &AccountInfo,
accesspass: &AccessPass,
user: &mut User,
publisher: bool,
subscriber: bool,
) -> Result<SubscribeUserResult, ProgramError> {
let mut mgroup = MulticastGroup::try_from(mgroup_account)?;
if mgroup.status != MulticastGroupStatus::Activated {
msg!("MulticastGroupStatus: {:?}", mgroup.status);
return Err(DoubleZeroError::InvalidStatus.into());
}

// Check allowlists for additions
if publisher && !accesspass.mgroup_pub_allowlist.contains(mgroup_account.key) {
msg!("{:?}", accesspass);
return Err(DoubleZeroError::NotAllowed.into());
}
if subscriber && !accesspass.mgroup_sub_allowlist.contains(mgroup_account.key) {
msg!("{:?}", accesspass);
return Err(DoubleZeroError::NotAllowed.into());
}

let mut publisher_list_transitioned = false;

// Manage the publisher list
match publisher {
true => {
if !user.publishers.contains(mgroup_account.key) {
let was_empty = user.publishers.is_empty();
mgroup.publisher_count = mgroup.publisher_count.saturating_add(1);
user.publishers.push(*mgroup_account.key);
if was_empty {
publisher_list_transitioned = true;
}
}
}
false => {
if user.publishers.contains(mgroup_account.key) {
mgroup.publisher_count = mgroup.publisher_count.saturating_sub(1);
user.publishers.retain(|&x| x != *mgroup_account.key);
if user.publishers.is_empty() {
publisher_list_transitioned = true;
}
}
}
}

// Manage the subscriber list
match subscriber {
true => {
if !user.subscribers.contains(mgroup_account.key) {
mgroup.subscriber_count = mgroup.subscriber_count.saturating_add(1);
user.subscribers.push(*mgroup_account.key);
}
}
false => {
if user.subscribers.contains(mgroup_account.key) {
mgroup.subscriber_count = mgroup.subscriber_count.saturating_sub(1);
user.subscribers.retain(|&x| x != *mgroup_account.key);
}
}
}

Ok(SubscribeUserResult {
mgroup,
publisher_list_transitioned,
})
}

pub fn process_subscribe_multicastgroup(
program_id: &Pubkey,
accounts: &[AccountInfo],
Expand Down Expand Up @@ -78,15 +163,7 @@ pub fn process_subscribe_multicastgroup(
);
assert!(user_account.is_writable, "user account is not writable");

// Parse accounts
let mut mgroup: MulticastGroup = MulticastGroup::try_from(mgroup_account)?;
if mgroup.status != MulticastGroupStatus::Activated {
#[cfg(test)]
msg!("MulticastGroupStatus: {:?}", mgroup.status);

return Err(DoubleZeroError::InvalidStatus.into());
}

// Parse and validate user
let mut user: User = User::try_from(user_account)?;
if user.status != UserStatus::Activated && user.status != UserStatus::Updating {
msg!("UserStatus: {:?}", user.status);
Expand Down Expand Up @@ -116,76 +193,22 @@ pub fn process_subscribe_multicastgroup(
return Err(DoubleZeroError::Unauthorized.into());
}

// Check if the user is in the allowlist
if value.publisher && !accesspass.mgroup_pub_allowlist.contains(mgroup_account.key) {
msg!("{:?}", accesspass);
return Err(DoubleZeroError::NotAllowed.into());
}
if value.subscriber && !accesspass.mgroup_sub_allowlist.contains(mgroup_account.key) {
msg!("{:?}", accesspass);
return Err(DoubleZeroError::NotAllowed.into());
}

// Manage the publisher lists
match value.publisher {
true => {
if !user.publishers.contains(mgroup_account.key) {
let was_empty = user.publishers.is_empty();
// Increment publisher count
mgroup.publisher_count = mgroup.publisher_count.saturating_add(1);
// Add multicast group to user's publisher list
user.publishers.push(*mgroup_account.key);
// Only trigger activator reprocessing when gaining first publisher
// (activator needs to allocate dz_ip)
if was_empty {
user.status = UserStatus::Updating;
}
}
}
false => {
if user.publishers.contains(mgroup_account.key) {
// Decrement publisher count
mgroup.publisher_count = mgroup.publisher_count.saturating_sub(1);
// Remove multicast group from user's publisher list
user.publishers.retain(|&x| x != *mgroup_account.key);
// Trigger activator reprocessing when losing last publisher
// (dz_ip no longer needed)
if user.publishers.is_empty() {
user.status = UserStatus::Updating;
}
}
}
}

// Manage the subscriber lists
match value.subscriber {
true => {
if !user.subscribers.contains(mgroup_account.key) {
// Increment subscriber count
mgroup.subscriber_count = mgroup.subscriber_count.saturating_add(1);
// Add multicast group to user's subscriber list
user.subscribers.push(*mgroup_account.key);
// No activator reprocessing needed for subscriber changes
// (subscriber groups don't affect tunnel or dz_ip config)
}
}
false => {
if user.subscribers.contains(mgroup_account.key) {
// Decrement subscriber count
mgroup.subscriber_count = mgroup.subscriber_count.saturating_sub(1);
// Remove multicast group from user's subscriber list
user.subscribers.retain(|&x| x != *mgroup_account.key);
}
}
let result = subscribe_user_to_multicastgroup(
mgroup_account,
&accesspass,
&mut user,
value.publisher,
value.subscriber,
)?;

// Trigger activator reprocessing when publisher list transitions
// (gaining first publisher requires dz_ip allocation, losing last means it's no longer needed)
if result.publisher_list_transitioned {
user.status = UserStatus::Updating;
}

try_acc_write(&mgroup, mgroup_account, payer_account, accounts)?;
try_acc_write(&result.mgroup, mgroup_account, payer_account, accounts)?;
try_acc_write(&user, user_account, payer_account, accounts)?;

#[cfg(test)]
msg!("Updated: {:?}", mgroup);
#[cfg(test)]
msg!("Updated: {:?}", user_account);

Ok(())
}
Loading
Loading