Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions common/client-core/src/client/base_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ where
event_tx,
);

let mix_tx = mix_traffic_controller.mix_rx();
let mix_tx = mix_traffic_controller.mix_tx();
let client_tx = mix_traffic_controller.client_tx();

shutdown_tracker.try_spawn_named(
Expand Down Expand Up @@ -1004,8 +1004,8 @@ where
// Create a shutdown tracker for this client - either as a child of provided tracker
// or get one from the registry
let shutdown_tracker = match self.shutdown {
Some(parent_tracker) => parent_tracker.child_tracker(),
None => nym_task::get_sdk_shutdown_tracker()?,
Some(parent_tracker) => parent_tracker.clone(),
None => nym_task::create_sdk_shutdown_tracker()?,
};

Self::start_event_control(self.event_tx, event_receiver, &shutdown_tracker);
Expand Down Expand Up @@ -1044,7 +1044,7 @@ where
self.user_agent.clone(),
generate_client_stats_id(*self_address.identity()),
input_sender.clone(),
&shutdown_tracker.child_tracker(),
&shutdown_tracker.clone(),
);

// needs to be started as the first thing to block if required waiting for the gateway
Expand All @@ -1054,7 +1054,7 @@ where
shared_topology_accessor.clone(),
self_address.gateway(),
self.wait_for_gateway,
&shutdown_tracker.child_tracker(),
&shutdown_tracker.clone(),
)
.await?;

Expand All @@ -1074,15 +1074,15 @@ where
stats_reporter.clone(),
#[cfg(unix)]
self.connection_fd_callback,
&shutdown_tracker.child_tracker(),
&shutdown_tracker.clone(),
)
.await?;
let gateway_ws_fd = gateway_transceiver.ws_fd();

let reply_storage = Self::setup_persistent_reply_storage(
reply_storage_backend,
key_rotation_config,
&shutdown_tracker.child_tracker(),
&shutdown_tracker.clone(),
)
.await?;

Expand All @@ -1093,7 +1093,7 @@ where
reply_storage.key_storage(),
reply_controller_sender.clone(),
stats_reporter.clone(),
&shutdown_tracker.child_tracker(),
&shutdown_tracker.clone(),
);

// The message_sender is the transmitter for any component generating sphinx packets
Expand All @@ -1103,7 +1103,7 @@ where

let (message_sender, client_request_sender) = Self::start_mix_traffic_controller(
gateway_transceiver,
&shutdown_tracker.child_tracker(),
&shutdown_tracker.clone(),
EventSender(event_sender),
);

Expand Down Expand Up @@ -1134,7 +1134,7 @@ where
shared_lane_queue_lengths.clone(),
client_connection_rx,
stats_reporter.clone(),
&shutdown_tracker.child_tracker(),
&shutdown_tracker.clone(),
);

if !self
Expand All @@ -1150,7 +1150,7 @@ where
shared_topology_accessor.clone(),
message_sender,
stats_reporter.clone(),
&shutdown_tracker.child_tracker(),
&shutdown_tracker.clone(),
);
}

Expand Down
2 changes: 1 addition & 1 deletion common/client-core/src/client/cover_traffic_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl LoopCoverTrafficStream<OsRng> {
TrySendError::Full(_) => {
// This isn't a problem, if the channel is full means we're already sending the
// max amount of messages downstream can handle.
tracing::debug!("Failed to send cover message - channel full");
tracing::trace!("Failed to send cover message - channel full");
}
TrySendError::Closed(_) => {
tracing::warn!("Failed to send cover message - channel closed");
Expand Down
12 changes: 10 additions & 2 deletions common/client-core/src/client/mix_traffic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ pub mod transceiver;

// We remind ourselves that 32 x 32kb = 1024kb, a reasonable size for a network buffer.
pub const MIX_MESSAGE_RECEIVER_BUFFER_SIZE: usize = 32;
const MAX_FAILURE_COUNT: usize = 100;

/// Reduced from 100 to 20 to fail fast (~1-2 seconds instead of ~6 seconds).
/// If we can't send 20 packets in a row, the gateway is unreachable.
const MAX_FAILURE_COUNT: usize = 20;

// that's also disgusting.
pub struct Empty;
Expand Down Expand Up @@ -84,7 +87,7 @@ impl MixTrafficController {
self.client_tx.clone()
}

pub fn mix_rx(&self) -> BatchMixMessageSender {
pub fn mix_tx(&self) -> BatchMixMessageSender {
self.mix_tx.clone()
}

Expand Down Expand Up @@ -156,6 +159,11 @@ impl MixTrafficController {
// Do we need to handle the embedded mixnet client case
// separately?
self.event_tx.send(MixnetClientEvent::Traffic(MixTrafficEvent::FailedSendingSphinx));
// IMO it shouldn't be signalled from there but it is how it is
// TODO : report the failure upwards and shutdown from upwards
// Gateway is dead, we have to shut down currently
error!("Signalling shutdown from the MixTrafficController");
self.shutdown_token.cancel();
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ where
"failed to send mixnet packet due to closed channel (outside of shutdown!)"
);
}
// Early return to avoid further processing when channel is closed
return;
}
Ok(_) => {
let event = if fragment_id.is_some() {
Expand Down
4 changes: 2 additions & 2 deletions common/task/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ pub use crate::runtime_registry::RegistryAccessError;

/// Get or create a ShutdownTracker for SDK use.
/// This provides automatic task management without requiring manual setup.
pub fn get_sdk_shutdown_tracker() -> Result<ShutdownTracker, RegistryAccessError> {
Ok(runtime_registry::RuntimeRegistry::get_or_create_sdk()?.shutdown_tracker_owned())
pub fn create_sdk_shutdown_tracker() -> Result<ShutdownTracker, RegistryAccessError> {
Ok(runtime_registry::RuntimeRegistry::create_sdk()?.shutdown_tracker_owned())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why did you change the existing behaviour to overwrite a pre-existing shutdown manager? this could be potentially dangereous, especially if it had already registered some signals, tasks, etc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a global shutdown means that if you cancel the underlying token, you can never spin up a new mixnet client again, because the underlying token will be cancelled to start with.

The SDK manager is only used if no other shutdown manager is provided

}
50 changes: 34 additions & 16 deletions common/task/src/runtime_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,47 @@ pub(crate) struct RuntimeRegistry {
pub enum RegistryAccessError {
#[error("the runtime registry is poisoned")]
Poisoned,

#[error("The SDK ShutdownManager already exists")]
ExistingShutdownManager,

#[error("No existing SDK ShutdownManager")]
MissingShutdownManager,
}

impl RuntimeRegistry {
/// Get or create a ShutdownManager for SDK use.
/// Create a ShutdownManager for SDK use.
/// This manager doesn't listen to OS signals, making it suitable for library use.
pub(crate) fn get_or_create_sdk() -> Result<Arc<ShutdownManager>, RegistryAccessError> {
let guard = REGISTRY
.sdk_manager
.read()
.map_err(|_| RegistryAccessError::Poisoned)?;
if let Some(manager) = guard.as_ref() {
return Ok(manager.clone());
}
drop(guard);

/// This function overwrite any existing manager!
pub(crate) fn create_sdk() -> Result<Arc<ShutdownManager>, RegistryAccessError> {
let mut guard = REGISTRY
.sdk_manager
.write()
.map_err(|_| RegistryAccessError::Poisoned)?;

Ok(guard
.get_or_insert_with(|| {
Arc::new(ShutdownManager::new_without_signals().with_cancel_on_panic())
})
.insert(Arc::new(
ShutdownManager::new_without_signals().with_cancel_on_panic(),
))
.clone())
}

/// Get the ShutdownManager for SDK use.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. if it's not yet used, just remove it
  2. don't leak SDK needs into the common task library (it's like exposing VPN-specific methods in the monorepo)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I merely modified the existing SDK shutdown manager. It's more of an internal shutdown manager rather than a specific one imo. Either you give your client a custom one, or it creates one for internal use. We could have an entire discussion on the shutdown process which I still think could use a proper look into

/// This manager doesn't listen to OS signals, making it suitable for library use.
/// Not yet used, but maybe in the future
#[allow(dead_code)]
pub(crate) fn get_sdk() -> Result<Arc<ShutdownManager>, RegistryAccessError> {
let guard = REGISTRY
.sdk_manager
.read()
.map_err(|_| RegistryAccessError::Poisoned)?;
if let Some(manager) = guard.as_ref() {
Ok(manager.clone())
} else {
Err(RegistryAccessError::MissingShutdownManager)
}
}

/// Check if an SDK manager has been created.
/// Useful for testing and debugging.
#[allow(dead_code)]
Expand Down Expand Up @@ -85,10 +100,13 @@ mod tests {

assert!(!RuntimeRegistry::has_sdk_manager().unwrap());

let manager1 = RuntimeRegistry::get_or_create_sdk().unwrap();
// Error if nothing was created
assert!(RuntimeRegistry::get_sdk().is_err());

let manager1 = RuntimeRegistry::create_sdk().unwrap();
assert!(RuntimeRegistry::has_sdk_manager().unwrap());

let manager2 = RuntimeRegistry::get_or_create_sdk().unwrap();
let manager2 = RuntimeRegistry::get_sdk().unwrap();
// Should return the same instance
assert!(Arc::ptr_eq(&manager1, &manager2));

Expand Down
16 changes: 6 additions & 10 deletions sdk/rust/nym-sdk/src/mixnet/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,15 +736,11 @@ where
base_builder = base_builder.with_topology_provider(topology_provider);
}

// Use custom shutdown if provided, otherwise get from registry
let shutdown_tracker = match self.custom_shutdown {
Some(custom) => custom,
None => {
// Auto-create from registry for SDK use
nym_task::get_sdk_shutdown_tracker()?
}
};
base_builder = base_builder.with_shutdown(shutdown_tracker);
// Use custom shutdown if provided, otherwise the sdk one will be used later down the line
if let Some(shutdown_tracker) = self.custom_shutdown {
base_builder = base_builder.with_shutdown(shutdown_tracker);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so why do we no longer get the default static one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still need it, but it was set a first time here, and then later in the base client startup too. There is no point in setting it in two places

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reference :

let shutdown_tracker = match self.shutdown {

I'd be fine with setting it up there and remove it here mentioned. Then BaseClientBuilder.shutdown should no longer be optional


if let Some(event_tx) = self.event_tx {
base_builder = base_builder.with_event_tx(event_tx);
}
Expand Down Expand Up @@ -809,7 +805,7 @@ where
client_output,
client_state.clone(),
nym_address,
started_client.shutdown_handle.child_tracker(),
started_client.shutdown_handle.clone(),
packet_type,
);

Expand Down
Loading