From 76031d21b47ea2c7a9d763dcf83e4bf3ec1d7955 Mon Sep 17 00:00:00 2001 From: Nicolas DI PRIMA Date: Tue, 16 Apr 2024 13:37:06 +0100 Subject: [PATCH] remove the use of the Mutexes in the Routing thread closes #38 --- netsim-async/examples/flooding_async.rs | 79 ++++++---- netsim-async/examples/simple_async.rs | 16 +- netsim-async/src/lib.rs | 28 +++- netsim-async/src/sim_context.rs | 8 +- netsim-core/src/bus.rs | 75 ++++++--- netsim-core/src/policy.rs | 2 + netsim-core/src/sim_context.rs | 200 +++++++++++------------- netsim/examples/flood.rs | 54 ++++--- netsim/examples/simple.rs | 16 +- netsim/src/sim_context.rs | 8 +- netsim/src/sim_socket.rs | 30 +++- 11 files changed, 294 insertions(+), 222 deletions(-) diff --git a/netsim-async/examples/flooding_async.rs b/netsim-async/examples/flooding_async.rs index a2e0bf2..f1625f6 100644 --- a/netsim-async/examples/flooding_async.rs +++ b/netsim-async/examples/flooding_async.rs @@ -1,7 +1,6 @@ use clap::Parser; use netsim_async::{HasBytesSize, SimConfiguration, SimId, SimSocket}; -use netsim_core::{Bandwidth, Edge, EdgePolicy, Latency, NodePolicy, PacketLoss}; -use std::time::Duration; +use netsim_core::{time::Duration, Bandwidth, Edge, EdgePolicy, Latency, NodePolicy, PacketLoss}; use tokio::time::{sleep, Instant}; type SimContext = netsim_async::SimContext; @@ -9,19 +8,25 @@ type SimContext = netsim_async::SimContext; #[derive(Parser)] struct Command { #[arg(long, default_value = "60")] - time: u64, + time: Duration, #[arg(long, default_value = "10")] - every: u64, + every: Duration, + + #[arg(long, default_value = "10")] + idle: Duration, } -const LATENCY: Duration = Duration::from_millis(60); +const LATENCY: std::time::Duration = std::time::Duration::from_millis(60); #[tokio::main] async fn main() { let cmd = Command::parse(); - let configuration = SimConfiguration::default(); + let configuration = SimConfiguration { + idle_duration: cmd.idle.into_duration(), + ..SimConfiguration::default() + }; let mut context: SimContext = SimContext::with_config(configuration); @@ -31,38 +36,44 @@ async fn main() { let tap = Tap { socket: context.open().unwrap(), sink_id: sink.socket.id(), - every: Duration::from_millis(cmd.every), + every: cmd.every, }; - context.set_node_policy( - sink.socket.id(), - NodePolicy { - bandwidth_down: Bandwidth::bits_per(u64::MAX, Duration::from_secs(1)), - bandwidth_up: Bandwidth::bits_per(u64::MAX, Duration::from_secs(1)), - location: None, - }, - ); - context.set_node_policy( - tap.socket.id(), - NodePolicy { - bandwidth_down: Bandwidth::bits_per(u64::MAX, Duration::from_secs(1)), - bandwidth_up: Bandwidth::bits_per(u64::MAX, Duration::from_secs(1)), - location: None, - }, - ); - context.set_edge_policy( - Edge::new((tap.socket.id(), sink.socket.id())), - EdgePolicy { - latency: Latency::new(LATENCY), - packet_loss: PacketLoss::NONE, - ..Default::default() - }, - ); + context + .set_node_policy( + sink.socket.id(), + NodePolicy { + bandwidth_down: Bandwidth::MAX, + bandwidth_up: Bandwidth::MAX, + location: None, + }, + ) + .unwrap(); + context + .set_node_policy( + tap.socket.id(), + NodePolicy { + bandwidth_down: Bandwidth::MAX, + bandwidth_up: Bandwidth::MAX, + location: None, + }, + ) + .unwrap(); + context + .set_edge_policy( + Edge::new((tap.socket.id(), sink.socket.id())), + EdgePolicy { + latency: Latency::new(LATENCY), + packet_loss: PacketLoss::NONE, + ..Default::default() + }, + ) + .unwrap(); let sink = tokio::spawn(sink.work()); let tap = tokio::spawn(tap.work()); - sleep(Duration::from_secs(cmd.time)).await; + sleep(cmd.time.into_duration()).await; context.shutdown().unwrap(); sink.await.unwrap(); @@ -90,7 +101,7 @@ impl Sink { } let len = delays.len(); - let total = delays.iter().copied().sum::(); + let total = delays.iter().copied().sum::(); let avg = total / delays.len() as u32; println!("sent {len} messages over. Msg received with an average of {avg:?} delays to the expected LATENCY"); @@ -117,7 +128,7 @@ impl Tap { async fn work(mut self) { while self.send_msg() { let now = Instant::now(); - sleep(self.every).await; + sleep(self.every.into_duration()).await; let elapsed = now.elapsed(); println!("{elapsed:?}"); diff --git a/netsim-async/examples/simple_async.rs b/netsim-async/examples/simple_async.rs index 1795224..9cc870e 100644 --- a/netsim-async/examples/simple_async.rs +++ b/netsim-async/examples/simple_async.rs @@ -12,13 +12,15 @@ async fn main() { let net1 = context.open().unwrap(); let mut net2 = context.open().unwrap(); - context.set_edge_policy( - Edge::new((net1.id(), net2.id())), - EdgePolicy { - latency: Latency::new(Duration::from_secs(1)), - ..Default::default() - }, - ); + context + .set_edge_policy( + Edge::new((net1.id(), net2.id())), + EdgePolicy { + latency: Latency::new(Duration::from_secs(1)), + ..Default::default() + }, + ) + .unwrap(); net1.send_to(net2.id(), MSG).unwrap(); diff --git a/netsim-async/src/lib.rs b/netsim-async/src/lib.rs index b67bea2..bb4acc4 100644 --- a/netsim-async/src/lib.rs +++ b/netsim-async/src/lib.rs @@ -11,7 +11,10 @@ pub use netsim_core::{ SimId, }; -pub struct SimSocket { +pub struct SimSocket +where + T: HasBytesSize, +{ reader: SimSocketReadHalf, writer: SimSocketWriteHalf, } @@ -21,13 +24,23 @@ pub struct SimSocketReadHalf { down: SimDownLink, } -pub struct SimSocketWriteHalf { +pub struct SimSocketWriteHalf +where + T: HasBytesSize, +{ id: SimId, - up: BusSender, + up: BusSender>, } -impl SimSocket { - pub(crate) fn new(id: SimId, to_bus: BusSender, receiver: SimDownLink) -> Self { +impl SimSocket +where + T: HasBytesSize, +{ + pub(crate) fn new( + id: SimId, + to_bus: BusSender>, + receiver: SimDownLink, + ) -> Self { let reader = SimSocketReadHalf { id, down: receiver }; let writer = SimSocketWriteHalf { id, up: to_bus }; @@ -57,7 +70,10 @@ where } } -impl SimSocketWriteHalf { +impl SimSocketWriteHalf +where + T: HasBytesSize, +{ pub fn id(&self) -> SimId { self.id } diff --git a/netsim-async/src/sim_context.rs b/netsim-async/src/sim_context.rs index 0dc4687..f8f947f 100644 --- a/netsim-async/src/sim_context.rs +++ b/netsim-async/src/sim_context.rs @@ -42,19 +42,19 @@ where self.core.shutdown() } - pub fn set_node_policy(&mut self, node: SimId, policy: NodePolicy) { + pub fn set_node_policy(&mut self, node: SimId, policy: NodePolicy) -> Result<()> { self.core.set_node_policy(node, policy) } - pub fn set_edge_policy(&mut self, edge: Edge, policy: EdgePolicy) { + pub fn set_edge_policy(&mut self, edge: Edge, policy: EdgePolicy) -> Result<()> { self.core.set_edge_policy(edge, policy) } - pub fn reset_node_policy(&mut self, node: SimId) { + pub fn reset_node_policy(&mut self, node: SimId) -> Result<()> { self.core.reset_node_policy(node) } - pub fn reset_edge_policy(&mut self, edge: Edge) { + pub fn reset_edge_policy(&mut self, edge: Edge) -> Result<()> { self.core.reset_edge_policy(edge) } } diff --git a/netsim-core/src/bus.rs b/netsim-core/src/bus.rs index b37f827..5151e4e 100644 --- a/netsim-core/src/bus.rs +++ b/netsim-core/src/bus.rs @@ -1,50 +1,87 @@ -use crate::Msg; +use crate::{sim_context::Link, Edge, EdgePolicy, Msg, NodePolicy, SimId}; use anyhow::{anyhow, Result}; use std::sync::mpsc; -pub enum BusMessage { - Message(Msg), +pub enum BusMessage { + Message(Msg), + NodeAdd(UpLink, mpsc::SyncSender), + NodePolicyDefault(NodePolicy), + NodePolicySet(SimId, NodePolicy), + NodePolicyReset(SimId), + EdgePolicyDefault(EdgePolicy), + EdgePolicySet(Edge, EdgePolicy), + EdgePolicyReset(Edge), Shutdown, Disconnected, } -pub struct BusSender { - sender: mpsc::Sender>, +pub struct BusSender { + sender: mpsc::Sender>, } -pub(crate) struct BusReceiver { - receiver: mpsc::Receiver>, +pub(crate) struct BusReceiver { + receiver: mpsc::Receiver>, } -pub(crate) fn open_bus() -> (BusSender, BusReceiver) { +pub(crate) fn open_bus() -> (BusSender, BusReceiver) { let (sender, receiver) = mpsc::channel(); (BusSender::new(sender), BusReceiver::new(receiver)) } -impl BusSender { - fn new(sender: mpsc::Sender>) -> Self { +impl BusSender { + fn new(sender: mpsc::Sender>) -> Self { Self { sender } } - pub fn send_msg(&self, msg: Msg) -> Result<()> { + fn send(&self, msg: BusMessage) -> Result<()> { self.sender - .send(BusMessage::Message(msg)) + .send(msg) .map_err(|error| anyhow!("failed to send message: {error}")) } + pub fn send_msg(&self, msg: Msg) -> Result<()> { + self.send(BusMessage::Message(msg)) + } + + pub fn send_node_add(&self, link: UpLink, reply: mpsc::SyncSender) -> Result<()> { + self.send(BusMessage::NodeAdd(link, reply)) + } + + pub fn send_node_policy_default(&self, policy: NodePolicy) -> Result<()> { + self.send(BusMessage::NodePolicyDefault(policy)) + } + + pub fn send_node_policy_set(&self, id: SimId, policy: NodePolicy) -> Result<()> { + self.send(BusMessage::NodePolicySet(id, policy)) + } + + pub fn send_node_policy_reset(&self, id: SimId) -> Result<()> { + self.send(BusMessage::NodePolicyReset(id)) + } + + pub fn send_edge_policy_default(&self, policy: EdgePolicy) -> Result<()> { + self.send(BusMessage::EdgePolicyDefault(policy)) + } + + pub fn send_edge_policy_set(&self, id: Edge, policy: EdgePolicy) -> Result<()> { + self.send(BusMessage::EdgePolicySet(id, policy)) + } + + pub fn send_edge_policy_reset(&self, id: Edge) -> Result<()> { + self.send(BusMessage::EdgePolicyReset(id)) + } + pub(crate) fn send_shutdown(&self) -> Result<()> { - self.sender - .send(BusMessage::Shutdown) - .map_err(|error| anyhow!("failed to send message: {error}")) + self.send(BusMessage::Shutdown) } } -impl BusReceiver { - fn new(receiver: mpsc::Receiver>) -> Self { +impl BusReceiver { + fn new(receiver: mpsc::Receiver>) -> Self { Self { receiver } } - pub(crate) fn try_receive(&mut self) -> Option> { + pub(crate) fn try_receive(&mut self) -> Option> { match self.receiver.try_recv() { Ok(bus_msg) => Some(bus_msg), Err(mpsc::TryRecvError::Empty) => None, @@ -53,7 +90,7 @@ impl BusReceiver { } } -impl Clone for BusSender { +impl Clone for BusSender { fn clone(&self) -> Self { Self { sender: self.sender.clone(), diff --git a/netsim-core/src/policy.rs b/netsim-core/src/policy.rs index 0a78a01..ab37cc5 100644 --- a/netsim-core/src/policy.rs +++ b/netsim-core/src/policy.rs @@ -79,6 +79,8 @@ pub struct Policy { } impl Bandwidth { + pub const MAX: Self = Self(u64::MAX); + pub const fn bits_per(bits: u64, duration: Duration) -> Self { Self(bits * duration.as_millis() as u64) } diff --git a/netsim-core/src/sim_context.rs b/netsim-core/src/sim_context.rs index 745b7f0..598000b 100644 --- a/netsim-core/src/sim_context.rs +++ b/netsim-core/src/sim_context.rs @@ -2,14 +2,10 @@ use crate::{ bus::{open_bus, BusMessage, BusReceiver, BusSender}, congestion_queue::CongestionQueue, policy::PolicyOutcome, - Edge, EdgePolicy, HasBytesSize, Msg, NodePolicy, OnDrop, Policy, SimConfiguration, SimId, -}; -use anyhow::{anyhow, bail, Context, Result}; -use std::{ - sync::{Arc, Mutex, RwLock}, - thread, - time::{Duration, Instant}, + Edge, EdgePolicy, HasBytesSize, Msg, NodePolicy, Policy, SimConfiguration, SimId, }; +use anyhow::{bail, Context, Result}; +use std::{sync::mpsc, thread, time::Instant}; /// the collections of up links to other sockets /// @@ -34,27 +30,19 @@ pub(crate) struct SimLink { /// will not be able to send messages to nodes of different context. /// pub struct SimContextCore { - policy: Arc>, - - next_sim_id: SimId, - - bus: BusSender, - - links: Arc>>, + bus: BusSender, mux_handler: thread::JoinHandle>, } pub struct SimMuxCore { - policy: Arc>, - - on_drop: Option>, + next_sim_id: SimId, - idle_duration: Duration, + configuration: SimConfiguration, - bus: BusReceiver, + bus: BusReceiver, - links: Arc>>, + links: SimLinks, msgs: CongestionQueue, } @@ -95,33 +83,20 @@ where /// Note that this function starts a _multiplexer_ in a physical thread. /// pub fn with_config(configuration: SimConfiguration) -> Self { - let policy = Arc::new(RwLock::new(configuration.policy)); - let links = Arc::new(Mutex::new(Vec::new())); - let next_sim_id = SimId::ZERO; // Starts at 0 - let (sender, receiver) = open_bus(); - let mux = SimMuxCore::new( - Arc::clone(&policy), - configuration.on_drop, - configuration.idle_duration, - receiver, - Arc::clone(&links), - ); + let mux = SimMuxCore::::new(configuration, receiver); let mux_handler = thread::spawn(|| run_mux(mux)); Self { - policy, - next_sim_id, bus: sender, - links, mux_handler, } } - pub fn configuration(&self) -> &Arc> { - &self.policy + pub fn configuration(&self) -> Policy { + todo!() } /// set a specific policy between the two `Node` that compose the [`Edge`]. @@ -130,15 +105,17 @@ where /// To reset, use [`SimContext::reset_edge_policy`], and the default /// policy will be used again. /// - pub fn set_edge_policy(&mut self, edge: Edge, policy: EdgePolicy) { - self.policy.write().unwrap().set_edge_policy(edge, policy) + #[inline] + pub fn set_edge_policy(&mut self, edge: Edge, policy: EdgePolicy) -> Result<()> { + self.bus().send_edge_policy_set(edge, policy) } /// Reset the [`EdgePolicy`] between two nodes of an [`Edge`]. The default /// EdgePolicy for this SimContext will be used. /// - pub fn reset_edge_policy(&mut self, edge: Edge) { - self.policy.write().unwrap().reset_edge_policy(edge) + #[inline] + pub fn reset_edge_policy(&mut self, edge: Edge) -> Result<()> { + self.bus().send_edge_policy_reset(edge) } /// Set a specific [`NodePolicy`] for a given node ([SimSocket]). @@ -149,58 +126,32 @@ where /// Call [`SimContext::reset_node_policy`] to reset the [`NodePolicy`] /// so that the default policy will be used onward. /// - pub fn set_node_policy(&mut self, node: SimId, policy: NodePolicy) { - let _not_found = self - .links - .lock() - .unwrap() - .get_mut(node.into_index()) - .map(|node| node.policy = Some(policy)); - - debug_assert!( - _not_found.is_some(), - "we aren't expecting users to create SimId themselves and it should always be valid" - ) + #[inline] + pub fn set_node_policy(&mut self, node: SimId, policy: NodePolicy) -> Result<()> { + self.bus().send_node_policy_set(node, policy) } /// Reset the specific [`NodePolicy`] associated to the given node /// ([SimSocket]) so that the default policy will be used again going /// forward. - pub fn reset_node_policy(&mut self, node: SimId) { - let _not_found = self - .links - .lock() - .unwrap() - .get_mut(node.into_index()) - .map(|node| node.policy = None); - - debug_assert!( - _not_found.is_some(), - "we aren't expecting users to create SimId themselves and it should always be valid" - ) + #[inline] + pub fn reset_node_policy(&mut self, node: SimId) -> Result<()> { + self.bus().send_node_policy_reset(node) } - pub fn bus(&self) -> BusSender { + #[inline] + pub fn bus(&self) -> BusSender { self.bus.clone() } + #[inline] pub fn new_link(&mut self, link: UpLink) -> Result { - let id = self.next_sim_id; - - self.links - .lock() - .map_err(|error| anyhow!("Failed to lock on the links: {error}"))? - .push(SimLink::new(link)); + let (send_reply, reply) = mpsc::sync_channel(1); + self.bus().send_node_add(link, send_reply)?; - self.next_sim_id = id.next(); - - debug_assert_eq!( - self.links.lock().unwrap().len(), - self.next_sim_id.into_index(), - "The next available SimId is the lenght of the vec" - ); - - Ok(id) + reply + .recv() + .context("Failed to receive reply from the Routing thread") } /// Shutdown the context. All remaining opened [SimSocket] will become @@ -227,38 +178,27 @@ impl SimMuxCore where UpLink: Link, { - fn new( - policy: Arc>, - on_drop: Option>, - idle_duration: Duration, - bus: BusReceiver, - links: Arc>>, - ) -> Self { + fn new(configuration: SimConfiguration, bus: BusReceiver) -> Self { let msgs = CongestionQueue::new(); + let next_sim_id = SimId::ZERO; // Starts at 0 + let links = Vec::new(); Self { - policy, - on_drop, - idle_duration, + configuration, + next_sim_id, links, bus, msgs, } } - pub fn configuration(&self) -> &Arc> { - &self.policy - } - /// process an inbound message /// /// The message propagation speed will be computed based on /// the upload, download and general link speed between pub fn inbound_message(&mut self, time: Instant, msg: Msg) -> Result<()> { - let mut configuration = self.policy.write().expect("Never poisonned"); - - match configuration.process(&msg) { + match self.configuration.policy.process(&msg) { PolicyOutcome::Drop => { - if let Some(on_drop) = self.on_drop.as_ref() { + if let Some(on_drop) = self.configuration.on_drop.as_ref() { on_drop.handle(msg.into_content()) } } @@ -274,11 +214,9 @@ where /// This function may returns an empty `Vec` and this /// simply means there are no messages to be forwarded pub fn outbound_messages(&mut self, time: Instant) -> Result>> { - Ok(self.msgs.pop_many( - time, - &self.links.lock().unwrap(), - &self.policy.read().unwrap(), - )) + Ok(self + .msgs + .pop_many(time, &self.links, &self.configuration.policy)) } /// get the earliest time to the next message @@ -300,12 +238,8 @@ where fn propagate_msg(&mut self, msg: Msg) -> Result<()> { let dst = msg.to(); - let mut addresses = self - .links - .lock() - .map_err(|error| anyhow!("Failed to acquire address, mutex poisonned {error}"))?; - if let Some(sim_link) = addresses.get_mut(dst.into_index()) { + if let Some(sim_link) = self.links.get_mut(dst.into_index()) { let _error = sim_link.link.send(msg); Ok(()) } else { @@ -320,6 +254,52 @@ where return Ok(MuxOutcome::Shutdown); } BusMessage::Message(msg) => self.inbound_message(time, msg)?, + + BusMessage::NodeAdd(link, reply) => { + let id = self.next_sim_id; + + self.links.push(SimLink::new(link)); + self.next_sim_id = self.next_sim_id.next(); + + debug_assert_eq!( + self.links.len(), + self.next_sim_id.into_index(), + "The next available SimId is the lenght of the vec" + ); + + if let Err(error) = reply.send(id) { + bail!("Failed to reply to a new node creation request: {error:?}") + } + } + + BusMessage::NodePolicyDefault(policy) => { + self.configuration.policy.set_default_node_policy(policy) + } + BusMessage::NodePolicySet(id, policy) => { + let _policy_set = self + .links + .get_mut(id.into_index()) + .map(|node| node.policy = Some(policy)) + .is_some(); + + debug_assert!(_policy_set, "We should always have a node for any given ID") + } + BusMessage::NodePolicyReset(id) => { + let _policy_set = self + .links + .get_mut(id.into_index()) + .map(|node| node.policy = None) + .is_some(); + + debug_assert!(_policy_set, "We should always have a node for any given ID") + } + BusMessage::EdgePolicyDefault(policy) => { + self.configuration.policy.set_default_edge_policy(policy) + } + BusMessage::EdgePolicySet(id, policy) => { + self.configuration.policy.set_edge_policy(id, policy) + } + BusMessage::EdgePolicyReset(id) => self.configuration.policy.reset_edge_policy(id), } } @@ -330,10 +310,10 @@ where pub(crate) fn sleep_time(&mut self, current_time: Instant) -> Instant { let Some(time) = self.earliest_outbound_time() else { - return current_time + self.idle_duration; + return current_time + self.configuration.idle_duration; }; - std::cmp::min(time, current_time + self.idle_duration) + std::cmp::min(time, current_time + self.configuration.idle_duration) } } diff --git a/netsim/examples/flood.rs b/netsim/examples/flood.rs index c0ccaf4..b4f42bf 100644 --- a/netsim/examples/flood.rs +++ b/netsim/examples/flood.rs @@ -47,14 +47,16 @@ fn main() { socket: context.open().unwrap(), latency: cmd.latency, }; - context.set_node_policy( - sink.socket.id(), - NodePolicy { - bandwidth_down: cmd.bandwidth_down, - bandwidth_up: cmd.bandwidth_up, - location: None, - }, - ); + context + .set_node_policy( + sink.socket.id(), + NodePolicy { + bandwidth_down: cmd.bandwidth_down, + bandwidth_up: cmd.bandwidth_up, + location: None, + }, + ) + .unwrap(); let mut taps = Vec::with_capacity(cmd.num_tap); for _ in 0..cmd.num_tap { @@ -64,22 +66,26 @@ fn main() { every: cmd.every, }; - context.set_node_policy( - tap.socket.id(), - NodePolicy { - bandwidth_down: cmd.bandwidth_down, - bandwidth_up: cmd.bandwidth_up, - location: None, - }, - ); - context.set_edge_policy( - Edge::new((tap.socket.id(), sink.socket.id())), - EdgePolicy { - latency: Latency::new(cmd.latency.into_duration()), - packet_loss: PacketLoss::NONE, - ..Default::default() - }, - ); + context + .set_node_policy( + tap.socket.id(), + NodePolicy { + bandwidth_down: cmd.bandwidth_down, + bandwidth_up: cmd.bandwidth_up, + location: None, + }, + ) + .unwrap(); + context + .set_edge_policy( + Edge::new((tap.socket.id(), sink.socket.id())), + EdgePolicy { + latency: Latency::new(cmd.latency.into_duration()), + packet_loss: PacketLoss::NONE, + ..Default::default() + }, + ) + .unwrap(); taps.push(tap); } diff --git a/netsim/examples/simple.rs b/netsim/examples/simple.rs index 07c75ef..803549a 100644 --- a/netsim/examples/simple.rs +++ b/netsim/examples/simple.rs @@ -10,13 +10,15 @@ fn main() { let net1 = context.open().unwrap(); let mut net2 = context.open().unwrap(); - context.set_edge_policy( - Edge::new((net1.id(), net2.id())), - EdgePolicy { - latency: Latency::new(Duration::from_secs(1)), - ..Default::default() - }, - ); + context + .set_edge_policy( + Edge::new((net1.id(), net2.id())), + EdgePolicy { + latency: Latency::new(Duration::from_secs(1)), + ..Default::default() + }, + ) + .unwrap(); net1.send_to(net2.id(), MSG).unwrap(); diff --git a/netsim/src/sim_context.rs b/netsim/src/sim_context.rs index 6bfdf12..609c8c9 100644 --- a/netsim/src/sim_context.rs +++ b/netsim/src/sim_context.rs @@ -44,19 +44,19 @@ where self.core.shutdown() } - pub fn set_node_policy(&mut self, node: SimId, policy: NodePolicy) { + pub fn set_node_policy(&mut self, node: SimId, policy: NodePolicy) -> Result<()> { self.core.set_node_policy(node, policy) } - pub fn set_edge_policy(&mut self, edge: Edge, policy: EdgePolicy) { + pub fn set_edge_policy(&mut self, edge: Edge, policy: EdgePolicy) -> Result<()> { self.core.set_edge_policy(edge, policy) } - pub fn reset_node_policy(&mut self, node: SimId) { + pub fn reset_node_policy(&mut self, node: SimId) -> Result<()> { self.core.reset_node_policy(node) } - pub fn reset_edge_policy(&mut self, edge: Edge) { + pub fn reset_edge_policy(&mut self, edge: Edge) -> Result<()> { self.core.reset_edge_policy(edge) } } diff --git a/netsim/src/sim_socket.rs b/netsim/src/sim_socket.rs index d40e58f..77f14e3 100644 --- a/netsim/src/sim_socket.rs +++ b/netsim/src/sim_socket.rs @@ -1,9 +1,15 @@ -use crate::{sim_link::SimDownLink, HasBytesSize, SimId}; +use crate::{ + sim_link::{SimDownLink, SimUpLink}, + HasBytesSize, SimId, +}; use anyhow::Result; use netsim_core::{BusSender, Msg}; use std::sync::mpsc; -pub struct SimSocket { +pub struct SimSocket +where + T: HasBytesSize, +{ reader: SimSocketReadHalf, writer: SimSocketWriteHalf, } @@ -13,9 +19,12 @@ pub struct SimSocketReadHalf { down: SimDownLink, } -pub struct SimSocketWriteHalf { +pub struct SimSocketWriteHalf +where + T: HasBytesSize, +{ id: SimId, - up: BusSender, + up: BusSender>, } /// Result from [`SimSocket::try_recv`] or [`SimSocketReadHalf::try_recv`] @@ -31,8 +40,15 @@ pub enum TryRecv { Disconnected, } -impl SimSocket { - pub(crate) fn new(id: SimId, to_bus: BusSender, receiver: SimDownLink) -> Self { +impl SimSocket +where + T: HasBytesSize, +{ + pub(crate) fn new( + id: SimId, + to_bus: BusSender>, + receiver: SimDownLink, + ) -> Self { Self { reader: SimSocketReadHalf { id, down: receiver }, writer: SimSocketWriteHalf { id, up: to_bus }, @@ -73,7 +89,7 @@ where } } -impl SimSocketWriteHalf { +impl SimSocketWriteHalf { #[inline] pub fn id(&self) -> SimId { self.id