From 9ba304a381bc5b7a618fc6f7349cc02744928615 Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Fri, 24 Apr 2026 13:20:41 -0400 Subject: [PATCH 1/8] sentinel: set tunnel endpoint on multicast publisher create Mirror the CLI's behavior at client/doublezero/src/command/connect.rs:310: pick a tunnel endpoint on the user's device (public_ip or a user_tunnel_endpoint interface IP) that is not already in use by another user at the same client_ip, instead of sending UNSPECIFIED and letting the activator choose. --- .../doublezero-admin/src/cli/sentinel.rs | 18 ++ crates/sentinel/src/dz_ledger_reader.rs | 19 ++ crates/sentinel/src/dz_ledger_writer.rs | 9 +- crates/sentinel/src/lib.rs | 1 + crates/sentinel/src/multicast_create.rs | 2 + crates/sentinel/src/multicast_publisher.rs | 186 ++++++++++++++-- crates/sentinel/src/nearest_device.rs | 2 + crates/sentinel/src/tunnel_endpoint.rs | 204 ++++++++++++++++++ 8 files changed, 425 insertions(+), 16 deletions(-) create mode 100644 crates/sentinel/src/tunnel_endpoint.rs diff --git a/controlplane/doublezero-admin/src/cli/sentinel.rs b/controlplane/doublezero-admin/src/cli/sentinel.rs index cbe8eaefc4..9ffb82eae9 100644 --- a/controlplane/doublezero-admin/src/cli/sentinel.rs +++ b/controlplane/doublezero-admin/src/cli/sentinel.rs @@ -9,6 +9,7 @@ use doublezero_sentinel::{ multicast_find::{apply_filters, FindFilters}, nearest_device::{device_proximity_score, find_nearest_device_for_multicast}, output::{print_table, OutputOptions}, + tunnel_endpoint::select_tunnel_endpoint, validator_metadata_reader::{ HttpValidatorMetadataReader, ValidatorMetadataReader, DEFAULT_VALIDATOR_METADATA_URL, }, @@ -729,14 +730,31 @@ impl CreateValidatorMulticastPublishersCommand { tenant_pk: Pubkey::default(), user_type: doublezero_sdk::UserType::IBRL, publishers: vec![], + tunnel_endpoint: Ipv4Addr::UNSPECIFIED, }; + let tunnel_endpoint = device_infos + .get(&target_device_pk) + .map(|d| { + let exclude: Vec = all_users + .iter() + .filter(|u| { + u.client_ip == candidate.client_ip + && u.tunnel_endpoint != Ipv4Addr::UNSPECIFIED + }) + .map(|u| u.tunnel_endpoint) + .collect(); + select_tunnel_endpoint(d.public_ip, &d.user_tunnel_endpoints, &exclude) + }) + .unwrap_or(Ipv4Addr::UNSPECIFIED); + let ixs = match build_create_multicast_publisher_instructions( &program_id, &payer_pk, &candidate.owner, &multicast_group_pk, &dz_user, + tunnel_endpoint, ) { Ok(ixs) => ixs, Err(e) => { diff --git a/crates/sentinel/src/dz_ledger_reader.rs b/crates/sentinel/src/dz_ledger_reader.rs index 57049e0675..f1fbfee6d2 100644 --- a/crates/sentinel/src/dz_ledger_reader.rs +++ b/crates/sentinel/src/dz_ledger_reader.rs @@ -28,6 +28,7 @@ pub struct DzUser { pub tenant_pk: Pubkey, pub user_type: UserType, pub publishers: Vec, + pub tunnel_endpoint: Ipv4Addr, } /// Maps device pubkey → device code. @@ -47,6 +48,9 @@ pub struct DzDeviceInfo { pub reserved_seats: u16, pub multicast_publishers_count: u16, pub max_multicast_publishers: u16, + pub public_ip: Ipv4Addr, + /// IPs from device interfaces where `user_tunnel_endpoint == true`. + pub user_tunnel_endpoints: Vec, } // --------------------------------------------------------------------------- @@ -120,6 +124,7 @@ impl DzLedgerReader for RpcDzLedgerReader { tenant_pk: user.tenant_pk, user_type: user.user_type, publishers: user.publishers.clone(), + tunnel_endpoint: user.tunnel_endpoint, }); } @@ -276,6 +281,18 @@ pub fn fetch_device_infos( .get(&device.location_pk) .copied() .unwrap_or((0.0, 0.0)); + let user_tunnel_endpoints = device + .interfaces + .iter() + .filter_map(|iface| { + let iface = iface.into_current_version(); + if iface.user_tunnel_endpoint && iface.ip_net != Default::default() { + Some(iface.ip_net.ip()) + } else { + None + } + }) + .collect(); infos.insert( pk, DzDeviceInfo { @@ -288,6 +305,8 @@ pub fn fetch_device_infos( reserved_seats: device.reserved_seats, multicast_publishers_count: device.multicast_publishers_count, max_multicast_publishers: device.max_multicast_publishers, + public_ip: device.public_ip, + user_tunnel_endpoints, }, ); } diff --git a/crates/sentinel/src/dz_ledger_writer.rs b/crates/sentinel/src/dz_ledger_writer.rs index 0b09b8fc3f..a96da79263 100644 --- a/crates/sentinel/src/dz_ledger_writer.rs +++ b/crates/sentinel/src/dz_ledger_writer.rs @@ -1,3 +1,5 @@ +use std::net::Ipv4Addr; + use anyhow::{Context, Result}; use doublezero_serviceability::{ instructions::DoubleZeroInstruction, @@ -36,6 +38,7 @@ pub fn build_create_multicast_publisher_instructions( owner: &Pubkey, multicast_group_pk: &Pubkey, user: &DzUser, + tunnel_endpoint: Ipv4Addr, ) -> Result { let (accesspass_pda, _) = get_accesspass_pda(program_id, &user.client_ip, owner); let (globalstate_pda, _) = get_globalstate_pda(program_id); @@ -84,7 +87,7 @@ pub fn build_create_multicast_publisher_instructions( client_ip: user.client_ip, publisher: true, subscriber: false, - tunnel_endpoint: std::net::Ipv4Addr::UNSPECIFIED, + tunnel_endpoint, dz_prefix_count: 0, owner: *owner, }), @@ -139,6 +142,7 @@ mod tests { tenant_pk: Pubkey::default(), user_type: UserType::IBRL, publishers: vec![], + tunnel_endpoint: Ipv4Addr::UNSPECIFIED, }; let owner = Pubkey::new_unique(); @@ -149,6 +153,7 @@ mod tests { &owner, &multicast_group, &user, + Ipv4Addr::UNSPECIFIED, ) .unwrap(); @@ -182,6 +187,7 @@ mod tests { tenant_pk: Pubkey::default(), user_type: UserType::IBRL, publishers: vec![], + tunnel_endpoint: Ipv4Addr::UNSPECIFIED, }; let owner = Pubkey::new_unique(); @@ -192,6 +198,7 @@ mod tests { &owner, &multicast_group, &user, + Ipv4Addr::UNSPECIFIED, ) .unwrap(); diff --git a/crates/sentinel/src/lib.rs b/crates/sentinel/src/lib.rs index 5785bbf3d0..9cc741fae4 100644 --- a/crates/sentinel/src/lib.rs +++ b/crates/sentinel/src/lib.rs @@ -7,4 +7,5 @@ pub mod multicast_publisher; pub mod nearest_device; pub mod output; pub mod settings; +pub mod tunnel_endpoint; pub mod validator_metadata_reader; diff --git a/crates/sentinel/src/multicast_create.rs b/crates/sentinel/src/multicast_create.rs index 622f2ee03a..6e45de3f85 100644 --- a/crates/sentinel/src/multicast_create.rs +++ b/crates/sentinel/src/multicast_create.rs @@ -138,6 +138,7 @@ mod tests { tenant_pk: Pubkey::default(), user_type: UserType::IBRL, publishers: vec![], + tunnel_endpoint: Ipv4Addr::UNSPECIFIED, } } @@ -149,6 +150,7 @@ mod tests { tenant_pk: Pubkey::default(), user_type: UserType::Multicast, publishers: groups, + tunnel_endpoint: Ipv4Addr::UNSPECIFIED, } } diff --git a/crates/sentinel/src/multicast_publisher.rs b/crates/sentinel/src/multicast_publisher.rs index afc389d222..95e6c81657 100644 --- a/crates/sentinel/src/multicast_publisher.rs +++ b/crates/sentinel/src/multicast_publisher.rs @@ -25,6 +25,7 @@ use crate::{ dz_ledger_reader::DzUser, dz_ledger_writer::build_create_multicast_publisher_instructions, error::{rpc_with_retry, Result, SentinelError}, + tunnel_endpoint::{select_tunnel_endpoint_for_user, DeviceEndpoints}, }; // --------------------------------------------------------------------------- @@ -36,7 +37,13 @@ use crate::{ #[async_trait] pub trait MulticastDzLedgerClient: Send + Sync { async fn fetch_all_dz_users(&self) -> Result>; - async fn create_multicast_publisher(&self, mgroup_pk: &Pubkey, user: &DzUser) -> Result<()>; + async fn fetch_all_device_endpoints(&self) -> Result>; + async fn create_multicast_publisher( + &self, + mgroup_pk: &Pubkey, + user: &DzUser, + tunnel_endpoint: Ipv4Addr, + ) -> Result<()>; } /// Provides the list of active validators by IP for the sentinel. @@ -156,6 +163,7 @@ impl MulticastPublisherSenti } let all_users = self.dz_client.fetch_all_dz_users().await?; + let device_endpoints = self.dz_client.fetch_all_device_endpoints().await?; let ibrl_users: Vec<_> = all_users .iter() @@ -227,9 +235,15 @@ impl MulticastPublisherSenti .set(candidates.len() as f64); for user in candidates { + let tunnel_endpoint = select_tunnel_endpoint_for_user( + &user.device_pk, + user.client_ip, + &all_users, + &device_endpoints, + ); if let Err(err) = self .dz_client - .create_multicast_publisher(mgroup_pk, user) + .create_multicast_publisher(mgroup_pk, user, tunnel_endpoint) .await { error!( @@ -237,6 +251,7 @@ impl MulticastPublisherSenti ip = %user.client_ip, device = %user.device_pk, group = %mgroup_pk, + %tunnel_endpoint, "failed to create multicast publisher" ); metrics::counter!( @@ -249,6 +264,7 @@ impl MulticastPublisherSenti ip = %user.client_ip, device = %user.device_pk, group = %mgroup_pk, + %tunnel_endpoint, "created multicast publisher" ); metrics::counter!( @@ -383,6 +399,7 @@ impl MulticastDzLedgerClient for RpcMulticastDzLedgerClient { tenant_pk: user.tenant_pk, user_type: user.user_type, publishers: user.publishers.clone(), + tunnel_endpoint: user.tunnel_endpoint, }); } @@ -399,7 +416,74 @@ impl MulticastDzLedgerClient for RpcMulticastDzLedgerClient { Ok(users) } - async fn create_multicast_publisher(&self, mgroup_pk: &Pubkey, user: &DzUser) -> Result<()> { + async fn fetch_all_device_endpoints(&self) -> Result> { + use doublezero_sdk::{AccountData, AccountType, DeviceStatus}; + use solana_account_decoder::UiAccountEncoding; + use solana_client::{ + rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, + rpc_filter::{Memcmp, RpcFilterType}, + }; + + let device_type_byte = AccountType::Device as u8; + let accounts = self + .rpc_client + .get_program_accounts_with_config( + &self.serviceability_id, + RpcProgramAccountsConfig { + filters: Some(vec![RpcFilterType::Memcmp(Memcmp::new_raw_bytes( + 0, + vec![device_type_byte], + ))]), + account_config: RpcAccountInfoConfig { + encoding: Some(UiAccountEncoding::Base64), + ..Default::default() + }, + ..Default::default() + }, + ) + .await?; + + let mut map = HashMap::new(); + for (pk, account) in accounts { + let Ok(ad) = AccountData::try_from(account.data.as_slice()) else { + continue; + }; + let Ok(device) = ad.get_device() else { + continue; + }; + if device.status != DeviceStatus::Activated { + continue; + } + let user_tunnel_endpoints = device + .interfaces + .iter() + .filter_map(|iface| { + let iface = iface.into_current_version(); + if iface.user_tunnel_endpoint && iface.ip_net != Default::default() { + Some(iface.ip_net.ip()) + } else { + None + } + }) + .collect(); + map.insert( + pk, + DeviceEndpoints { + public_ip: device.public_ip, + user_tunnel_endpoints, + }, + ); + } + + Ok(map) + } + + async fn create_multicast_publisher( + &self, + mgroup_pk: &Pubkey, + user: &DzUser, + tunnel_endpoint: Ipv4Addr, + ) -> Result<()> { let payer_pk = self.payer.pubkey(); let ixs = build_create_multicast_publisher_instructions( &self.serviceability_id, @@ -407,6 +491,7 @@ impl MulticastDzLedgerClient for RpcMulticastDzLedgerClient { &user.owner, mgroup_pk, user, + tunnel_endpoint, ) .map_err(|e| SentinelError::Deserialize(format!("build multicast publisher ixs: {e}")))?; @@ -558,6 +643,7 @@ mod tests { tenant_pk: Pubkey::default(), user_type: UserType::IBRL, publishers: vec![], + tunnel_endpoint: Ipv4Addr::UNSPECIFIED, } } @@ -569,6 +655,7 @@ mod tests { tenant_pk: Pubkey::default(), user_type: UserType::IBRLWithAllocatedIP, publishers: vec![], + tunnel_endpoint: Ipv4Addr::UNSPECIFIED, } } @@ -580,6 +667,7 @@ mod tests { tenant_pk: Pubkey::default(), user_type: UserType::Multicast, publishers: groups, + tunnel_endpoint: Ipv4Addr::UNSPECIFIED, } } @@ -608,6 +696,7 @@ mod tests { let mut dz = MockMulticastDzLedgerClient::new(); dz.expect_fetch_all_dz_users().never(); + dz.expect_fetch_all_device_endpoints().never(); dz.expect_create_multicast_publisher().never(); let group = Pubkey::new_unique(); @@ -632,6 +721,8 @@ mod tests { let mut dz = MockMulticastDzLedgerClient::new(); dz.expect_fetch_all_dz_users() .returning(|| Ok(vec![make_multicast_publisher([10, 0, 0, 1], vec![])])); + dz.expect_fetch_all_device_endpoints() + .returning(|| Ok(HashMap::new())); dz.expect_create_multicast_publisher().never(); let group = Pubkey::new_unique(); @@ -664,6 +755,8 @@ mod tests { make_multicast_publisher(ip, vec![group_clone]), ]) }); + dz.expect_fetch_all_device_endpoints() + .returning(|| Ok(HashMap::new())); dz.expect_create_multicast_publisher().never(); let sentinel = make_sentinel(dz, api, vec![group]); @@ -709,14 +802,16 @@ mod tests { make_multicast_publisher(ip1, vec![group_clone]), ]) }); + dz.expect_fetch_all_device_endpoints() + .returning(|| Ok(HashMap::new())); // Only ip2 should be created. dz.expect_create_multicast_publisher() - .withf(move |g, u| { + .withf(move |g, u, _| { *g == group && u.client_ip == Ipv4Addr::from(ip2) && u.device_pk == device2 }) .times(1) - .returning(|_, _| Ok(())); + .returning(|_, _, _| Ok(())); let sentinel = make_sentinel(dz, api, vec![group]); sentinel.poll_cycle().await.unwrap(); @@ -754,18 +849,20 @@ mod tests { make_ibrl_user(ip2, Pubkey::new_unique()), ]) }); + dz.expect_fetch_all_device_endpoints() + .returning(|| Ok(HashMap::new())); let mut seq = mockall::Sequence::new(); // First call (ip2, higher stake) fails. dz.expect_create_multicast_publisher() .times(1) .in_sequence(&mut seq) - .returning(|_, _| Err(SentinelError::Deserialize("simulated failure".into()))); + .returning(|_, _, _| Err(SentinelError::Deserialize("simulated failure".into()))); // Second call (ip1) should still happen. dz.expect_create_multicast_publisher() .times(1) .in_sequence(&mut seq) - .returning(|_, _| Ok(())); + .returning(|_, _, _| Ok(())); let sentinel = make_sentinel(dz, api, vec![group]); sentinel.poll_cycle().await.unwrap(); @@ -797,12 +894,14 @@ mod tests { make_multicast_publisher(ip, vec![group_a_clone]), ]) }); + dz.expect_fetch_all_device_endpoints() + .returning(|| Ok(HashMap::new())); // Should only create for group B. dz.expect_create_multicast_publisher() - .withf(move |g, _| *g == group_b) + .withf(move |g, _, _| *g == group_b) .times(1) - .returning(|_, _| Ok(())); + .returning(|_, _, _| Ok(())); let sentinel = make_sentinel(dz, api, vec![group_a, group_b]); sentinel.poll_cycle().await.unwrap(); @@ -881,12 +980,14 @@ mod tests { make_ibrl_user(ip_agave, Pubkey::new_unique()), ]) }); + dz.expect_fetch_all_device_endpoints() + .returning(|| Ok(HashMap::new())); // Only JitoLabs validator should be created. dz.expect_create_multicast_publisher() - .withf(move |_, u| u.client_ip == Ipv4Addr::from(ip_jito)) + .withf(move |_, u, _| u.client_ip == Ipv4Addr::from(ip_jito)) .times(1) - .returning(|_, _| Ok(())); + .returning(|_, _, _| Ok(())); let sentinel = make_sentinel_with_filter(dz, api, vec![group], vec!["JitoLabs".into()]); sentinel.poll_cycle().await.unwrap(); @@ -933,13 +1034,15 @@ mod tests { make_ibrl_user(ip_frank, Pubkey::new_unique()), ]) }); + dz.expect_fetch_all_device_endpoints() + .returning(|| Ok(HashMap::new())); // Both JitoLabs and Frankendancer should be created; Agave skipped. let created_ips = Arc::new(std::sync::Mutex::new(HashSet::new())); let created_ips_clone = created_ips.clone(); dz.expect_create_multicast_publisher() .times(2) - .returning(move |_, u| { + .returning(move |_, u, _| { created_ips_clone.lock().unwrap().insert(u.client_ip); Ok(()) }); @@ -1026,13 +1129,15 @@ mod tests { make_ibrl_user(ip2, Pubkey::new_unique()), ]) }); + dz.expect_fetch_all_device_endpoints() + .returning(|| Ok(HashMap::new())); // Both should be created (no client filter). let created_ips = Arc::new(std::sync::Mutex::new(HashSet::new())); let created_ips_clone = created_ips.clone(); dz.expect_create_multicast_publisher() .times(2) - .returning(move |_, u| { + .returning(move |_, u, _| { created_ips_clone.lock().unwrap().insert(u.client_ip); Ok(()) }); @@ -1045,6 +1150,55 @@ mod tests { assert!(ips.contains(&Ipv4Addr::from(ip2))); } + #[tokio::test] + async fn tunnel_endpoint_is_selected_excluding_ibrl_endpoint() { + // Device has two UTE endpoints; the IBRL user already occupies the first, + // so the multicast publisher should get the second. + let group = Pubkey::new_unique(); + let ip = [10, 0, 0, 1]; + let device = Pubkey::new_unique(); + let ute1 = Ipv4Addr::new(192, 168, 1, 11); + let ute2 = Ipv4Addr::new(192, 168, 1, 12); + + let mut api = MockValidatorListReader::new(); + let mut validators = HashMap::new(); + validators.insert( + Ipv4Addr::from(ip), + ValidatorStake { + activated_stake: 100, + software_client: String::new(), + }, + ); + api.expect_fetch_validators() + .returning(move || Ok(validators.clone())); + + let mut dz = MockMulticastDzLedgerClient::new(); + dz.expect_fetch_all_dz_users().returning(move || { + let mut u = make_ibrl_user(ip, device); + u.tunnel_endpoint = ute1; + Ok(vec![u]) + }); + dz.expect_fetch_all_device_endpoints().returning(move || { + let mut map = HashMap::new(); + map.insert( + device, + DeviceEndpoints { + public_ip: Ipv4Addr::new(1, 1, 1, 1), + user_tunnel_endpoints: vec![ute1, ute2], + }, + ); + Ok(map) + }); + + dz.expect_create_multicast_publisher() + .withf(move |g, u, ep| *g == group && u.client_ip == Ipv4Addr::from(ip) && *ep == ute2) + .times(1) + .returning(|_, _, _| Ok(())); + + let sentinel = make_sentinel(dz, api, vec![group]); + sentinel.poll_cycle().await.unwrap(); + } + #[tokio::test] async fn ibrl_with_allocated_ip_users_are_candidates() { let group = Pubkey::new_unique(); @@ -1067,16 +1221,18 @@ mod tests { let mut dz = MockMulticastDzLedgerClient::new(); dz.expect_fetch_all_dz_users() .returning(move || Ok(vec![make_ibrl_with_allocated_ip_user(ip, device_clone)])); + dz.expect_fetch_all_device_endpoints() + .returning(|| Ok(HashMap::new())); dz.expect_create_multicast_publisher() - .withf(move |g, u| { + .withf(move |g, u, _| { *g == group && u.client_ip == Ipv4Addr::from(ip) && u.device_pk == device && u.user_type == UserType::IBRLWithAllocatedIP }) .times(1) - .returning(|_, _| Ok(())); + .returning(|_, _, _| Ok(())); let sentinel = make_sentinel(dz, api, vec![group]); sentinel.poll_cycle().await.unwrap(); diff --git a/crates/sentinel/src/nearest_device.rs b/crates/sentinel/src/nearest_device.rs index 2c18a29e8e..e3fa31b2bd 100644 --- a/crates/sentinel/src/nearest_device.rs +++ b/crates/sentinel/src/nearest_device.rs @@ -115,6 +115,8 @@ mod tests { reserved_seats: 0, multicast_publishers_count: 0, max_multicast_publishers: 0, + public_ip: std::net::Ipv4Addr::UNSPECIFIED, + user_tunnel_endpoints: vec![], } } diff --git a/crates/sentinel/src/tunnel_endpoint.rs b/crates/sentinel/src/tunnel_endpoint.rs new file mode 100644 index 0000000000..81dd63d52f --- /dev/null +++ b/crates/sentinel/src/tunnel_endpoint.rs @@ -0,0 +1,204 @@ +use std::net::Ipv4Addr; + +use solana_sdk::pubkey::Pubkey; + +/// Candidate tunnel endpoints for a device. +#[derive(Debug, Clone)] +pub struct DeviceEndpoints { + pub public_ip: Ipv4Addr, + /// IPs from device interfaces where `user_tunnel_endpoint == true`. + pub user_tunnel_endpoints: Vec, +} + +impl Default for DeviceEndpoints { + fn default() -> Self { + Self { + public_ip: Ipv4Addr::UNSPECIFIED, + user_tunnel_endpoints: Vec::new(), + } + } +} + +/// Select a tunnel endpoint for a user, preferring a `user_tunnel_endpoint` +/// interface IP and falling back to the device's `public_ip`. Any IP in +/// `exclude_ips` is skipped — these are the endpoints already used by other +/// user accounts at the same `client_ip`. +/// +/// Returns `Ipv4Addr::UNSPECIFIED` when no endpoint is available; the activator +/// will then reject the create. +pub fn select_tunnel_endpoint( + public_ip: Ipv4Addr, + user_tunnel_endpoints: &[Ipv4Addr], + exclude_ips: &[Ipv4Addr], +) -> Ipv4Addr { + for ep in user_tunnel_endpoints { + if !exclude_ips.contains(ep) { + return *ep; + } + } + + if public_ip != Ipv4Addr::UNSPECIFIED && !exclude_ips.contains(&public_ip) { + return public_ip; + } + + Ipv4Addr::UNSPECIFIED +} + +/// Collect the set of tunnel endpoints already in use by other user accounts +/// at the same `client_ip`. +pub fn in_use_tunnel_endpoints<'a, I>(users: I, client_ip: Ipv4Addr) -> Vec +where + I: IntoIterator, +{ + users + .into_iter() + .filter(|u| u.client_ip == client_ip && u.tunnel_endpoint != Ipv4Addr::UNSPECIFIED) + .map(|u| u.tunnel_endpoint) + .collect() +} + +/// Pick a tunnel endpoint for a user on `device_pk`, given all existing users +/// (to derive the exclude list) and the device endpoint map. Returns +/// `Ipv4Addr::UNSPECIFIED` when the device is unknown or has no free endpoint. +pub fn select_tunnel_endpoint_for_user( + device_pk: &Pubkey, + client_ip: Ipv4Addr, + all_users: &[crate::dz_ledger_reader::DzUser], + device_endpoints: &std::collections::HashMap, +) -> Ipv4Addr { + let Some(endpoints) = device_endpoints.get(device_pk) else { + return Ipv4Addr::UNSPECIFIED; + }; + let exclude = in_use_tunnel_endpoints(all_users, client_ip); + select_tunnel_endpoint( + endpoints.public_ip, + &endpoints.user_tunnel_endpoints, + &exclude, + ) +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use doublezero_sdk::UserType; + + use super::*; + use crate::dz_ledger_reader::DzUser; + + #[test] + fn prefers_first_ute_not_excluded() { + let ute1 = Ipv4Addr::new(10, 0, 0, 11); + let ute2 = Ipv4Addr::new(10, 0, 0, 12); + assert_eq!( + select_tunnel_endpoint(Ipv4Addr::new(1, 1, 1, 1), &[ute1, ute2], &[]), + ute1, + ); + } + + #[test] + fn skips_excluded_ute_and_picks_next() { + let ute1 = Ipv4Addr::new(10, 0, 0, 11); + let ute2 = Ipv4Addr::new(10, 0, 0, 12); + assert_eq!( + select_tunnel_endpoint(Ipv4Addr::new(1, 1, 1, 1), &[ute1, ute2], &[ute1]), + ute2, + ); + } + + #[test] + fn falls_back_to_public_ip_when_all_utes_excluded() { + let ute1 = Ipv4Addr::new(10, 0, 0, 11); + let public_ip = Ipv4Addr::new(1, 1, 1, 1); + assert_eq!( + select_tunnel_endpoint(public_ip, &[ute1], &[ute1]), + public_ip, + ); + } + + #[test] + fn uses_public_ip_when_no_utes() { + let public_ip = Ipv4Addr::new(1, 1, 1, 1); + assert_eq!(select_tunnel_endpoint(public_ip, &[], &[]), public_ip); + } + + #[test] + fn returns_unspecified_when_everything_excluded() { + let ute1 = Ipv4Addr::new(10, 0, 0, 11); + let public_ip = Ipv4Addr::new(1, 1, 1, 1); + assert_eq!( + select_tunnel_endpoint(public_ip, &[ute1], &[ute1, public_ip]), + Ipv4Addr::UNSPECIFIED, + ); + } + + #[test] + fn returns_unspecified_when_no_endpoints_at_all() { + assert_eq!( + select_tunnel_endpoint(Ipv4Addr::UNSPECIFIED, &[], &[]), + Ipv4Addr::UNSPECIFIED, + ); + } + + fn make_user(ip: [u8; 4], tunnel_endpoint: Ipv4Addr) -> DzUser { + DzUser { + owner: Pubkey::default(), + client_ip: Ipv4Addr::from(ip), + device_pk: Pubkey::default(), + tenant_pk: Pubkey::default(), + user_type: UserType::IBRL, + publishers: vec![], + tunnel_endpoint, + } + } + + #[test] + fn in_use_tunnel_endpoints_filters_by_client_ip_and_drops_unspecified() { + let ip_a = [10, 0, 0, 1]; + let ip_b = [10, 0, 0, 2]; + let ep1 = Ipv4Addr::new(1, 1, 1, 1); + let ep2 = Ipv4Addr::new(2, 2, 2, 2); + let users = vec![ + make_user(ip_a, ep1), + make_user(ip_a, Ipv4Addr::UNSPECIFIED), + make_user(ip_b, ep2), + ]; + let in_use = in_use_tunnel_endpoints(&users, Ipv4Addr::from(ip_a)); + assert_eq!(in_use, vec![ep1]); + } + + #[test] + fn select_for_user_uses_exclude_list() { + let device_pk = Pubkey::new_unique(); + let client_ip = Ipv4Addr::new(10, 0, 0, 1); + let ute1 = Ipv4Addr::new(192, 168, 1, 11); + let ute2 = Ipv4Addr::new(192, 168, 1, 12); + + let mut device_endpoints = HashMap::new(); + device_endpoints.insert( + device_pk, + DeviceEndpoints { + public_ip: Ipv4Addr::new(1, 1, 1, 1), + user_tunnel_endpoints: vec![ute1, ute2], + }, + ); + + // One existing IBRL user at this client_ip already consumed ute1. + let users = vec![make_user([10, 0, 0, 1], ute1)]; + + let ep = select_tunnel_endpoint_for_user(&device_pk, client_ip, &users, &device_endpoints); + assert_eq!(ep, ute2); + } + + #[test] + fn select_for_user_unknown_device_returns_unspecified() { + let device_pk = Pubkey::new_unique(); + let client_ip = Ipv4Addr::new(10, 0, 0, 1); + let device_endpoints = HashMap::new(); + let users: Vec = vec![]; + assert_eq!( + select_tunnel_endpoint_for_user(&device_pk, client_ip, &users, &device_endpoints), + Ipv4Addr::UNSPECIFIED, + ); + } +} From 249549fa07f3d1f738040f62f64fb62edc500355 Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Fri, 24 Apr 2026 13:28:09 -0400 Subject: [PATCH 2/8] sentinel: hoist local use imports to top of file --- crates/sentinel/src/multicast_publisher.rs | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/crates/sentinel/src/multicast_publisher.rs b/crates/sentinel/src/multicast_publisher.rs index 95e6c81657..0fcc250517 100644 --- a/crates/sentinel/src/multicast_publisher.rs +++ b/crates/sentinel/src/multicast_publisher.rs @@ -6,7 +6,13 @@ use std::{ }; use async_trait::async_trait; -use solana_client::nonblocking::rpc_client::RpcClient; +use doublezero_sdk::{AccountData, AccountType, DeviceStatus, UserStatus}; +use solana_account_decoder::UiAccountEncoding; +use solana_client::{ + nonblocking::rpc_client::RpcClient, + rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, + rpc_filter::{Memcmp, RpcFilterType}, +}; use solana_sdk::{ commitment_config::CommitmentConfig, hash::Hash, @@ -349,13 +355,6 @@ impl RpcMulticastDzLedgerClient { #[async_trait] impl MulticastDzLedgerClient for RpcMulticastDzLedgerClient { async fn fetch_all_dz_users(&self) -> Result> { - use doublezero_sdk::{AccountData, AccountType, UserStatus}; - use solana_account_decoder::UiAccountEncoding; - use solana_client::{ - rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, - rpc_filter::{Memcmp, RpcFilterType}, - }; - let user_type_byte = AccountType::User as u8; let accounts = self .rpc_client @@ -417,13 +416,6 @@ impl MulticastDzLedgerClient for RpcMulticastDzLedgerClient { } async fn fetch_all_device_endpoints(&self) -> Result> { - use doublezero_sdk::{AccountData, AccountType, DeviceStatus}; - use solana_account_decoder::UiAccountEncoding; - use solana_client::{ - rpc_config::{RpcAccountInfoConfig, RpcProgramAccountsConfig}, - rpc_filter::{Memcmp, RpcFilterType}, - }; - let device_type_byte = AccountType::Device as u8; let accounts = self .rpc_client From 7f8495acbd73b2a234e300110b908f5f8205acea Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Fri, 24 Apr 2026 13:28:44 -0400 Subject: [PATCH 3/8] sentinel: changelog for tunnel endpoint selection --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d7b2c7186..5283ba8c4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,7 @@ All notable changes to this project will be documented in this file. - SDK now auto-detects the correct AccessPass PDA (static or dynamic) for allowlist operations based on whether an `allow_multiple_ip` pass exists - Sentinel - Make the multicast publisher worker's `--client-filter` flag repeatable so multiple validator client names can be matched in one run (OR semantics), matching the admin CLI behavior + - Set a concrete `tunnel_endpoint` on multicast publisher create, preferring a `user_tunnel_endpoint` interface IP and falling back to the device's `public_ip`, excluding IPs already in use by another user at the same `client_ip` ## [v0.18.0](https://github.com/malbeclabs/doublezero/compare/client/v0.17.0...client/v0.18.0) - 2026-04-17 From 637c987fd7cbdcf62234fcd90636c17c9e3baece Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Fri, 24 Apr 2026 13:46:06 -0400 Subject: [PATCH 4/8] admin/sentinel: show tunnel endpoint in create dry-run plan --- .../doublezero-admin/src/cli/sentinel.rs | 30 +++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/controlplane/doublezero-admin/src/cli/sentinel.rs b/controlplane/doublezero-admin/src/cli/sentinel.rs index 9ffb82eae9..0e79c83e5f 100644 --- a/controlplane/doublezero-admin/src/cli/sentinel.rs +++ b/controlplane/doublezero-admin/src/cli/sentinel.rs @@ -621,10 +621,13 @@ impl CreateValidatorMulticastPublishersCommand { // Display plan (snapshot — target devices are re-evaluated at execution time). eprintln!( - "\nWill create {} multicast publisher user(s) on group {}:\n", + "\nWill create {} multicast publisher user(s) on group {}:", candidates.len(), self.multicast_group, ); + eprintln!( + " user_type=Multicast cyoa_type=GREOverDIA publisher=true subscriber=false\n" + ); #[derive(Tabled, Serialize)] struct PlanRow { #[tabled(rename = "OWNER")] @@ -635,6 +638,8 @@ impl CreateValidatorMulticastPublishersCommand { device: String, #[tabled(rename = "NEAREST DEVICE")] nearest_device: String, + #[tabled(rename = "TUNNEL ENDPOINT")] + tunnel_endpoint: String, #[tabled(rename = "CLIENT")] client: String, #[tabled(rename = "STAKE (SOL)")] @@ -644,11 +649,12 @@ impl CreateValidatorMulticastPublishersCommand { let plan_rows: Vec = candidates .iter() .map(|c| { - let nearest = match find_nearest_device_for_multicast( + let target_device = find_nearest_device_for_multicast( &c.device_pk, &device_infos, latency_map.as_ref(), - ) { + ); + let nearest = match target_device { None => "none".to_string(), Some(d) if d.pk == c.device_pk => d.code.clone(), Some(d) => { @@ -659,11 +665,29 @@ impl CreateValidatorMulticastPublishersCommand { fmt_nearest_label(&d.code, score, self.nearest_via_geo) } }; + let tunnel_endpoint = target_device + .map(|d| { + let exclude: Vec = all_users + .iter() + .filter(|u| { + u.client_ip == c.client_ip + && u.tunnel_endpoint != Ipv4Addr::UNSPECIFIED + }) + .map(|u| u.tunnel_endpoint) + .collect(); + select_tunnel_endpoint(d.public_ip, &d.user_tunnel_endpoints, &exclude) + }) + .unwrap_or(Ipv4Addr::UNSPECIFIED); PlanRow { owner: c.owner.to_string(), client_ip: c.client_ip.to_string(), device: c.device_label.clone(), nearest_device: nearest, + tunnel_endpoint: if tunnel_endpoint == Ipv4Addr::UNSPECIFIED { + "(activator-assigned)".to_string() + } else { + tunnel_endpoint.to_string() + }, client: c.software_client.clone(), stake_sol: format!("{:.2}", c.stake_sol), } From 6d468c80bdf8093bc75187c9f20d9d77351bde56 Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Fri, 24 Apr 2026 13:48:03 -0400 Subject: [PATCH 5/8] admin/sentinel: add device ip column to create dry-run plan --- controlplane/doublezero-admin/src/cli/sentinel.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/controlplane/doublezero-admin/src/cli/sentinel.rs b/controlplane/doublezero-admin/src/cli/sentinel.rs index 0e79c83e5f..8c8a8e244c 100644 --- a/controlplane/doublezero-admin/src/cli/sentinel.rs +++ b/controlplane/doublezero-admin/src/cli/sentinel.rs @@ -638,6 +638,8 @@ impl CreateValidatorMulticastPublishersCommand { device: String, #[tabled(rename = "NEAREST DEVICE")] nearest_device: String, + #[tabled(rename = "DEVICE IP")] + device_ip: String, #[tabled(rename = "TUNNEL ENDPOINT")] tunnel_endpoint: String, #[tabled(rename = "CLIENT")] @@ -665,6 +667,9 @@ impl CreateValidatorMulticastPublishersCommand { fmt_nearest_label(&d.code, score, self.nearest_via_geo) } }; + let device_ip = target_device + .map(|d| d.public_ip.to_string()) + .unwrap_or_else(|| "-".to_string()); let tunnel_endpoint = target_device .map(|d| { let exclude: Vec = all_users @@ -683,6 +688,7 @@ impl CreateValidatorMulticastPublishersCommand { client_ip: c.client_ip.to_string(), device: c.device_label.clone(), nearest_device: nearest, + device_ip, tunnel_endpoint: if tunnel_endpoint == Ipv4Addr::UNSPECIFIED { "(activator-assigned)".to_string() } else { From 18e5242dd182936b83e05b9fc541cdd18ac85b51 Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Fri, 24 Apr 2026 13:52:58 -0400 Subject: [PATCH 6/8] sentinel: treat legacy UNSPECIFIED tunnel_endpoint as device public_ip Users created before the tunnel_endpoint field was populated onchain store UNSPECIFIED, but the activator implicitly routes their tunnel through the device's public_ip. The exclude list built for new multicast publisher creates was dropping these users, so the sentinel could select public_ip and collide with an existing legacy tunnel. Resolve UNSPECIFIED users to their device's public_ip when building the exclude list (mirrors client/doublezero/src/command/connect.rs:939). --- .../doublezero-admin/src/cli/sentinel.rs | 47 +++++++---- crates/sentinel/src/multicast_publisher.rs | 52 ++++++++++++ crates/sentinel/src/tunnel_endpoint.rs | 80 ++++++++++++++++--- 3 files changed, 151 insertions(+), 28 deletions(-) diff --git a/controlplane/doublezero-admin/src/cli/sentinel.rs b/controlplane/doublezero-admin/src/cli/sentinel.rs index 8c8a8e244c..cbf8a903b4 100644 --- a/controlplane/doublezero-admin/src/cli/sentinel.rs +++ b/controlplane/doublezero-admin/src/cli/sentinel.rs @@ -672,14 +672,7 @@ impl CreateValidatorMulticastPublishersCommand { .unwrap_or_else(|| "-".to_string()); let tunnel_endpoint = target_device .map(|d| { - let exclude: Vec = all_users - .iter() - .filter(|u| { - u.client_ip == c.client_ip - && u.tunnel_endpoint != Ipv4Addr::UNSPECIFIED - }) - .map(|u| u.tunnel_endpoint) - .collect(); + let exclude = tunnel_exclude_ips(&all_users, c.client_ip, &device_infos); select_tunnel_endpoint(d.public_ip, &d.user_tunnel_endpoints, &exclude) }) .unwrap_or(Ipv4Addr::UNSPECIFIED); @@ -766,14 +759,8 @@ impl CreateValidatorMulticastPublishersCommand { let tunnel_endpoint = device_infos .get(&target_device_pk) .map(|d| { - let exclude: Vec = all_users - .iter() - .filter(|u| { - u.client_ip == candidate.client_ip - && u.tunnel_endpoint != Ipv4Addr::UNSPECIFIED - }) - .map(|u| u.tunnel_endpoint) - .collect(); + let exclude = + tunnel_exclude_ips(&all_users, candidate.client_ip, &device_infos); select_tunnel_endpoint(d.public_ip, &d.user_tunnel_endpoints, &exclude) }) .unwrap_or(Ipv4Addr::UNSPECIFIED); @@ -846,6 +833,34 @@ impl CreateValidatorMulticastPublishersCommand { // Helpers // --------------------------------------------------------------------------- +/// Build the set of tunnel endpoints already in use by users at `client_ip`. +/// +/// For users with an explicit `tunnel_endpoint` onchain, use that value. +/// For legacy users where `tunnel_endpoint == UNSPECIFIED`, the activator +/// implicitly routes their tunnel through the device's `public_ip`, so +/// resolve it from `device_infos` rather than dropping the entry. +fn tunnel_exclude_ips( + users: &[DzUser], + client_ip: Ipv4Addr, + device_infos: &HashMap, +) -> Vec { + users + .iter() + .filter(|u| u.client_ip == client_ip) + .map(|u| { + if u.tunnel_endpoint != Ipv4Addr::UNSPECIFIED { + u.tunnel_endpoint + } else { + device_infos + .get(&u.device_pk) + .map(|d| d.public_ip) + .unwrap_or(Ipv4Addr::UNSPECIFIED) + } + }) + .filter(|ip| *ip != Ipv4Addr::UNSPECIFIED) + .collect() +} + /// Format a nearest-device label with its proximity score. /// Shows `"code (1234 µs)"` in latency mode or `"code (365 km)"` in geo mode. /// Falls back to plain `code` when the score is infinite (no latency data). diff --git a/crates/sentinel/src/multicast_publisher.rs b/crates/sentinel/src/multicast_publisher.rs index 0fcc250517..9829894754 100644 --- a/crates/sentinel/src/multicast_publisher.rs +++ b/crates/sentinel/src/multicast_publisher.rs @@ -1191,6 +1191,58 @@ mod tests { sentinel.poll_cycle().await.unwrap(); } + #[tokio::test] + async fn legacy_unspecified_tunnel_endpoint_excludes_device_public_ip() { + // An IBRL user predating the tunnel_endpoint field stores UNSPECIFIED + // onchain; the activator implicitly put its tunnel on the device's + // public_ip. The multicast publisher must therefore skip the public_ip + // and land on a UTE. + let group = Pubkey::new_unique(); + let ip = [10, 0, 0, 1]; + let device = Pubkey::new_unique(); + let public_ip = Ipv4Addr::new(1, 1, 1, 1); + let ute = Ipv4Addr::new(192, 168, 1, 11); + + let mut api = MockValidatorListReader::new(); + let mut validators = HashMap::new(); + validators.insert( + Ipv4Addr::from(ip), + ValidatorStake { + activated_stake: 100, + software_client: String::new(), + }, + ); + api.expect_fetch_validators() + .returning(move || Ok(validators.clone())); + + let mut dz = MockMulticastDzLedgerClient::new(); + dz.expect_fetch_all_dz_users().returning(move || { + // Legacy IBRL user: tunnel_endpoint is UNSPECIFIED. + Ok(vec![make_ibrl_user(ip, device)]) + }); + dz.expect_fetch_all_device_endpoints().returning(move || { + let mut map = HashMap::new(); + map.insert( + device, + DeviceEndpoints { + public_ip, + user_tunnel_endpoints: vec![ute], + }, + ); + Ok(map) + }); + + dz.expect_create_multicast_publisher() + .withf(move |g, u, ep| { + *g == group && u.client_ip == Ipv4Addr::from(ip) && *ep == ute + }) + .times(1) + .returning(|_, _, _| Ok(())); + + let sentinel = make_sentinel(dz, api, vec![group]); + sentinel.poll_cycle().await.unwrap(); + } + #[tokio::test] async fn ibrl_with_allocated_ip_users_are_candidates() { let group = Pubkey::new_unique(); diff --git a/crates/sentinel/src/tunnel_endpoint.rs b/crates/sentinel/src/tunnel_endpoint.rs index 81dd63d52f..976d249ff2 100644 --- a/crates/sentinel/src/tunnel_endpoint.rs +++ b/crates/sentinel/src/tunnel_endpoint.rs @@ -46,14 +46,33 @@ pub fn select_tunnel_endpoint( /// Collect the set of tunnel endpoints already in use by other user accounts /// at the same `client_ip`. -pub fn in_use_tunnel_endpoints<'a, I>(users: I, client_ip: Ipv4Addr) -> Vec +/// +/// A user with `tunnel_endpoint == UNSPECIFIED` is a legacy account from +/// before the field was populated onchain; the activator implicitly routes +/// its tunnel through the device's `public_ip`, so resolve it from +/// `device_endpoints` rather than dropping the user. +pub fn in_use_tunnel_endpoints<'a, I>( + users: I, + client_ip: Ipv4Addr, + device_endpoints: &std::collections::HashMap, +) -> Vec where I: IntoIterator, { users .into_iter() - .filter(|u| u.client_ip == client_ip && u.tunnel_endpoint != Ipv4Addr::UNSPECIFIED) - .map(|u| u.tunnel_endpoint) + .filter(|u| u.client_ip == client_ip) + .map(|u| { + if u.tunnel_endpoint != Ipv4Addr::UNSPECIFIED { + u.tunnel_endpoint + } else { + device_endpoints + .get(&u.device_pk) + .map(|d| d.public_ip) + .unwrap_or(Ipv4Addr::UNSPECIFIED) + } + }) + .filter(|ip| *ip != Ipv4Addr::UNSPECIFIED) .collect() } @@ -69,7 +88,7 @@ pub fn select_tunnel_endpoint_for_user( let Some(endpoints) = device_endpoints.get(device_pk) else { return Ipv4Addr::UNSPECIFIED; }; - let exclude = in_use_tunnel_endpoints(all_users, client_ip); + let exclude = in_use_tunnel_endpoints(all_users, client_ip, device_endpoints); select_tunnel_endpoint( endpoints.public_ip, &endpoints.user_tunnel_endpoints, @@ -140,11 +159,11 @@ mod tests { ); } - fn make_user(ip: [u8; 4], tunnel_endpoint: Ipv4Addr) -> DzUser { + fn make_user(ip: [u8; 4], device_pk: Pubkey, tunnel_endpoint: Ipv4Addr) -> DzUser { DzUser { owner: Pubkey::default(), client_ip: Ipv4Addr::from(ip), - device_pk: Pubkey::default(), + device_pk, tenant_pk: Pubkey::default(), user_type: UserType::IBRL, publishers: vec![], @@ -153,20 +172,57 @@ mod tests { } #[test] - fn in_use_tunnel_endpoints_filters_by_client_ip_and_drops_unspecified() { + fn in_use_tunnel_endpoints_filters_by_client_ip() { let ip_a = [10, 0, 0, 1]; let ip_b = [10, 0, 0, 2]; let ep1 = Ipv4Addr::new(1, 1, 1, 1); let ep2 = Ipv4Addr::new(2, 2, 2, 2); let users = vec![ - make_user(ip_a, ep1), - make_user(ip_a, Ipv4Addr::UNSPECIFIED), - make_user(ip_b, ep2), + make_user(ip_a, Pubkey::new_unique(), ep1), + make_user(ip_b, Pubkey::new_unique(), ep2), ]; - let in_use = in_use_tunnel_endpoints(&users, Ipv4Addr::from(ip_a)); + let in_use = in_use_tunnel_endpoints(&users, Ipv4Addr::from(ip_a), &HashMap::new()); assert_eq!(in_use, vec![ep1]); } + #[test] + fn in_use_tunnel_endpoints_resolves_legacy_unspecified_via_device_public_ip() { + // A legacy user predating the tunnel_endpoint field stores UNSPECIFIED + // onchain; the activator implicitly routes its tunnel through the + // device's public_ip, so we must add that to the in-use set. + let ip_a = [10, 0, 0, 1]; + let device_a = Pubkey::new_unique(); + let public_ip_a = Ipv4Addr::new(1, 1, 1, 1); + + let mut device_endpoints = HashMap::new(); + device_endpoints.insert( + device_a, + DeviceEndpoints { + public_ip: public_ip_a, + user_tunnel_endpoints: vec![], + }, + ); + + let users = vec![make_user(ip_a, device_a, Ipv4Addr::UNSPECIFIED)]; + let in_use = in_use_tunnel_endpoints(&users, Ipv4Addr::from(ip_a), &device_endpoints); + assert_eq!(in_use, vec![public_ip_a]); + } + + #[test] + fn in_use_tunnel_endpoints_drops_legacy_when_device_unknown() { + // If the user's device isn't in the endpoint map (e.g., deactivated), + // we have no way to resolve the implicit endpoint — drop the entry + // rather than poison the exclude list with UNSPECIFIED. + let ip_a = [10, 0, 0, 1]; + let users = vec![make_user( + ip_a, + Pubkey::new_unique(), + Ipv4Addr::UNSPECIFIED, + )]; + let in_use = in_use_tunnel_endpoints(&users, Ipv4Addr::from(ip_a), &HashMap::new()); + assert!(in_use.is_empty()); + } + #[test] fn select_for_user_uses_exclude_list() { let device_pk = Pubkey::new_unique(); @@ -184,7 +240,7 @@ mod tests { ); // One existing IBRL user at this client_ip already consumed ute1. - let users = vec![make_user([10, 0, 0, 1], ute1)]; + let users = vec![make_user([10, 0, 0, 1], device_pk, ute1)]; let ep = select_tunnel_endpoint_for_user(&device_pk, client_ip, &users, &device_endpoints); assert_eq!(ep, ute2); From 2821d7b899b76173873cd44eb1b45367c17969d4 Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Fri, 24 Apr 2026 13:56:20 -0400 Subject: [PATCH 7/8] sentinel: cargo fmt --- crates/sentinel/src/multicast_publisher.rs | 4 +--- crates/sentinel/src/tunnel_endpoint.rs | 6 +----- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/crates/sentinel/src/multicast_publisher.rs b/crates/sentinel/src/multicast_publisher.rs index 9829894754..48a516d4d5 100644 --- a/crates/sentinel/src/multicast_publisher.rs +++ b/crates/sentinel/src/multicast_publisher.rs @@ -1233,9 +1233,7 @@ mod tests { }); dz.expect_create_multicast_publisher() - .withf(move |g, u, ep| { - *g == group && u.client_ip == Ipv4Addr::from(ip) && *ep == ute - }) + .withf(move |g, u, ep| *g == group && u.client_ip == Ipv4Addr::from(ip) && *ep == ute) .times(1) .returning(|_, _, _| Ok(())); diff --git a/crates/sentinel/src/tunnel_endpoint.rs b/crates/sentinel/src/tunnel_endpoint.rs index 976d249ff2..f4d6492f0a 100644 --- a/crates/sentinel/src/tunnel_endpoint.rs +++ b/crates/sentinel/src/tunnel_endpoint.rs @@ -214,11 +214,7 @@ mod tests { // we have no way to resolve the implicit endpoint — drop the entry // rather than poison the exclude list with UNSPECIFIED. let ip_a = [10, 0, 0, 1]; - let users = vec![make_user( - ip_a, - Pubkey::new_unique(), - Ipv4Addr::UNSPECIFIED, - )]; + let users = vec![make_user(ip_a, Pubkey::new_unique(), Ipv4Addr::UNSPECIFIED)]; let in_use = in_use_tunnel_endpoints(&users, Ipv4Addr::from(ip_a), &HashMap::new()); assert!(in_use.is_empty()); } From 87a9f7587dd4f29e28ddec81b450e5d450840e65 Mon Sep 17 00:00:00 2001 From: Steven Normore Date: Fri, 24 Apr 2026 14:07:37 -0400 Subject: [PATCH 8/8] sentinel: skip candidate and log error when no tunnel endpoint is available When all of a device's tunnel endpoints (public_ip + UTEs) are already in use by the candidate's client_ip, selection returns UNSPECIFIED. Previously the sentinel forwarded that to create_multicast_publisher and the activator rejected the user, burning a create tx for a guaranteed rejection. Skip the candidate with an error log and doublezero_sentinel_multicast_pub_no_endpoint metric; next poll cycle retries if state changes. The admin CLI's create command does the same (writes to stderr). --- .../doublezero-admin/src/cli/sentinel.rs | 10 + crates/sentinel/src/multicast_publisher.rs | 198 +++++++++++++----- 2 files changed, 157 insertions(+), 51 deletions(-) diff --git a/controlplane/doublezero-admin/src/cli/sentinel.rs b/controlplane/doublezero-admin/src/cli/sentinel.rs index cbf8a903b4..2853538300 100644 --- a/controlplane/doublezero-admin/src/cli/sentinel.rs +++ b/controlplane/doublezero-admin/src/cli/sentinel.rs @@ -765,6 +765,16 @@ impl CreateValidatorMulticastPublishersCommand { }) .unwrap_or(Ipv4Addr::UNSPECIFIED); + if tunnel_endpoint == Ipv4Addr::UNSPECIFIED { + eprintln!( + " Error: no tunnel endpoint available on device {} for {} — all public_ip \ + and user_tunnel_endpoint IPs are already in use by this client_ip. Skipping.", + target_device_label, candidate.client_ip, + ); + skipped += 1; + continue; + } + let ixs = match build_create_multicast_publisher_instructions( &program_id, &payer_pk, diff --git a/crates/sentinel/src/multicast_publisher.rs b/crates/sentinel/src/multicast_publisher.rs index 48a516d4d5..fa69b4677f 100644 --- a/crates/sentinel/src/multicast_publisher.rs +++ b/crates/sentinel/src/multicast_publisher.rs @@ -247,6 +247,22 @@ impl MulticastPublisherSenti &all_users, &device_endpoints, ); + if tunnel_endpoint == Ipv4Addr::UNSPECIFIED { + error!( + ip = %user.client_ip, + device = %user.device_pk, + group = %mgroup_pk, + "skipping multicast publisher create: no tunnel endpoint available \ + on device (all public_ip and user_tunnel_endpoint IPs are already in \ + use by this client_ip); will retry on next poll" + ); + metrics::counter!( + "doublezero_sentinel_multicast_pub_no_endpoint", + "group" => mgroup_pk.to_string() + ) + .increment(1); + continue; + } if let Err(err) = self .dz_client .create_multicast_publisher(mgroup_pk, user, tunnel_endpoint) @@ -663,6 +679,26 @@ mod tests { } } + /// Build a minimal `DeviceEndpoints` map covering every device seen in `users`. + /// Each device gets a unique synthetic `public_ip` plus one UTE so tunnel-endpoint + /// selection returns a concrete (non-UNSPECIFIED) IP even when the IBRL user's + /// legacy UNSPECIFIED tunnel_endpoint implicitly occupies the device's public_ip. + fn default_endpoints_for(users: &[DzUser]) -> HashMap { + let mut map = HashMap::new(); + let mut next = 1u8; + for u in users { + map.entry(u.device_pk).or_insert_with(|| { + let ep = DeviceEndpoints { + public_ip: Ipv4Addr::new(100, 0, 0, next), + user_tunnel_endpoints: vec![Ipv4Addr::new(192, 168, 0, next)], + }; + next = next.wrapping_add(1); + ep + }); + } + map + } + fn make_sentinel_with_filter( dz: MockMulticastDzLedgerClient, api: MockValidatorListReader, @@ -783,19 +819,19 @@ mod tests { api.expect_fetch_validators() .returning(move || Ok(validators.clone())); - let group_clone = group; - let device2_clone = device2; + let users = vec![ + make_ibrl_user(ip1, Pubkey::new_unique()), + make_ibrl_user(ip2, device2), + make_ibrl_user(ip3, Pubkey::new_unique()), + make_multicast_publisher(ip1, vec![group]), + ]; + let endpoints = default_endpoints_for(&users); let mut dz = MockMulticastDzLedgerClient::new(); - dz.expect_fetch_all_dz_users().returning(move || { - Ok(vec![ - make_ibrl_user(ip1, Pubkey::new_unique()), - make_ibrl_user(ip2, device2_clone), - make_ibrl_user(ip3, Pubkey::new_unique()), - make_multicast_publisher(ip1, vec![group_clone]), - ]) - }); + let users_clone = users.clone(); + dz.expect_fetch_all_dz_users() + .returning(move || Ok(users_clone.clone())); dz.expect_fetch_all_device_endpoints() - .returning(|| Ok(HashMap::new())); + .returning(move || Ok(endpoints.clone())); // Only ip2 should be created. dz.expect_create_multicast_publisher() @@ -834,15 +870,17 @@ mod tests { api.expect_fetch_validators() .returning(move || Ok(validators.clone())); + let users = vec![ + make_ibrl_user(ip1, Pubkey::new_unique()), + make_ibrl_user(ip2, Pubkey::new_unique()), + ]; + let endpoints = default_endpoints_for(&users); let mut dz = MockMulticastDzLedgerClient::new(); - dz.expect_fetch_all_dz_users().returning(move || { - Ok(vec![ - make_ibrl_user(ip1, Pubkey::new_unique()), - make_ibrl_user(ip2, Pubkey::new_unique()), - ]) - }); + let users_clone = users.clone(); + dz.expect_fetch_all_dz_users() + .returning(move || Ok(users_clone.clone())); dz.expect_fetch_all_device_endpoints() - .returning(|| Ok(HashMap::new())); + .returning(move || Ok(endpoints.clone())); let mut seq = mockall::Sequence::new(); // First call (ip2, higher stake) fails. @@ -878,16 +916,17 @@ mod tests { api.expect_fetch_validators() .returning(move || Ok(validators.clone())); - let group_a_clone = group_a; + let users = vec![ + make_ibrl_user(ip, Pubkey::new_unique()), + make_multicast_publisher(ip, vec![group_a]), + ]; + let endpoints = default_endpoints_for(&users); let mut dz = MockMulticastDzLedgerClient::new(); - dz.expect_fetch_all_dz_users().returning(move || { - Ok(vec![ - make_ibrl_user(ip, Pubkey::new_unique()), - make_multicast_publisher(ip, vec![group_a_clone]), - ]) - }); + let users_clone = users.clone(); + dz.expect_fetch_all_dz_users() + .returning(move || Ok(users_clone.clone())); dz.expect_fetch_all_device_endpoints() - .returning(|| Ok(HashMap::new())); + .returning(move || Ok(endpoints.clone())); // Should only create for group B. dz.expect_create_multicast_publisher() @@ -965,15 +1004,17 @@ mod tests { api.expect_fetch_validators() .returning(move || Ok(validators.clone())); + let users = vec![ + make_ibrl_user(ip_jito, Pubkey::new_unique()), + make_ibrl_user(ip_agave, Pubkey::new_unique()), + ]; + let endpoints = default_endpoints_for(&users); let mut dz = MockMulticastDzLedgerClient::new(); - dz.expect_fetch_all_dz_users().returning(move || { - Ok(vec![ - make_ibrl_user(ip_jito, Pubkey::new_unique()), - make_ibrl_user(ip_agave, Pubkey::new_unique()), - ]) - }); + let users_clone = users.clone(); + dz.expect_fetch_all_dz_users() + .returning(move || Ok(users_clone.clone())); dz.expect_fetch_all_device_endpoints() - .returning(|| Ok(HashMap::new())); + .returning(move || Ok(endpoints.clone())); // Only JitoLabs validator should be created. dz.expect_create_multicast_publisher() @@ -1018,16 +1059,18 @@ mod tests { api.expect_fetch_validators() .returning(move || Ok(validators.clone())); + let users = vec![ + make_ibrl_user(ip_jito, Pubkey::new_unique()), + make_ibrl_user(ip_agave, Pubkey::new_unique()), + make_ibrl_user(ip_frank, Pubkey::new_unique()), + ]; + let endpoints = default_endpoints_for(&users); let mut dz = MockMulticastDzLedgerClient::new(); - dz.expect_fetch_all_dz_users().returning(move || { - Ok(vec![ - make_ibrl_user(ip_jito, Pubkey::new_unique()), - make_ibrl_user(ip_agave, Pubkey::new_unique()), - make_ibrl_user(ip_frank, Pubkey::new_unique()), - ]) - }); + let users_clone = users.clone(); + dz.expect_fetch_all_dz_users() + .returning(move || Ok(users_clone.clone())); dz.expect_fetch_all_device_endpoints() - .returning(|| Ok(HashMap::new())); + .returning(move || Ok(endpoints.clone())); // Both JitoLabs and Frankendancer should be created; Agave skipped. let created_ips = Arc::new(std::sync::Mutex::new(HashSet::new())); @@ -1114,15 +1157,17 @@ mod tests { api.expect_fetch_validators() .returning(move || Ok(validators.clone())); + let users = vec![ + make_ibrl_user(ip1, Pubkey::new_unique()), + make_ibrl_user(ip2, Pubkey::new_unique()), + ]; + let endpoints = default_endpoints_for(&users); let mut dz = MockMulticastDzLedgerClient::new(); - dz.expect_fetch_all_dz_users().returning(move || { - Ok(vec![ - make_ibrl_user(ip1, Pubkey::new_unique()), - make_ibrl_user(ip2, Pubkey::new_unique()), - ]) - }); + let users_clone = users.clone(); + dz.expect_fetch_all_dz_users() + .returning(move || Ok(users_clone.clone())); dz.expect_fetch_all_device_endpoints() - .returning(|| Ok(HashMap::new())); + .returning(move || Ok(endpoints.clone())); // Both should be created (no client filter). let created_ips = Arc::new(std::sync::Mutex::new(HashSet::new())); @@ -1191,6 +1236,55 @@ mod tests { sentinel.poll_cycle().await.unwrap(); } + #[tokio::test] + async fn skips_candidate_when_no_tunnel_endpoint_available() { + // Device has one UTE; the IBRL user already occupies it AND the + // device's public_ip is also excluded (via a second user with matching + // client_ip). No endpoint left — sentinel should skip rather than + // send UNSPECIFIED for a guaranteed activator rejection. + let group = Pubkey::new_unique(); + let ip = [10, 0, 0, 1]; + let device = Pubkey::new_unique(); + let public_ip = Ipv4Addr::new(1, 1, 1, 1); + let ute = Ipv4Addr::new(192, 168, 1, 11); + + let mut api = MockValidatorListReader::new(); + let mut validators = HashMap::new(); + validators.insert( + Ipv4Addr::from(ip), + ValidatorStake { + activated_stake: 100, + software_client: String::new(), + }, + ); + api.expect_fetch_validators() + .returning(move || Ok(validators.clone())); + + let mut dz = MockMulticastDzLedgerClient::new(); + dz.expect_fetch_all_dz_users().returning(move || { + let mut u1 = make_ibrl_user(ip, device); + u1.tunnel_endpoint = ute; + let mut u2 = make_ibrl_user(ip, device); + u2.tunnel_endpoint = public_ip; + Ok(vec![u1, u2]) + }); + dz.expect_fetch_all_device_endpoints().returning(move || { + let mut map = HashMap::new(); + map.insert( + device, + DeviceEndpoints { + public_ip, + user_tunnel_endpoints: vec![ute], + }, + ); + Ok(map) + }); + dz.expect_create_multicast_publisher().never(); + + let sentinel = make_sentinel(dz, api, vec![group]); + sentinel.poll_cycle().await.unwrap(); + } + #[tokio::test] async fn legacy_unspecified_tunnel_endpoint_excludes_device_public_ip() { // An IBRL user predating the tunnel_endpoint field stores UNSPECIFIED @@ -1259,12 +1353,14 @@ mod tests { api.expect_fetch_validators() .returning(move || Ok(validators.clone())); - let device_clone = device; + let users = vec![make_ibrl_with_allocated_ip_user(ip, device)]; + let endpoints = default_endpoints_for(&users); let mut dz = MockMulticastDzLedgerClient::new(); + let users_clone = users.clone(); dz.expect_fetch_all_dz_users() - .returning(move || Ok(vec![make_ibrl_with_allocated_ip_user(ip, device_clone)])); + .returning(move || Ok(users_clone.clone())); dz.expect_fetch_all_device_endpoints() - .returning(|| Ok(HashMap::new())); + .returning(move || Ok(endpoints.clone())); dz.expect_create_multicast_publisher() .withf(move |g, u, _| {