Skip to content

Commit

Permalink
remove the use of the Mutexes in the Routing thread
Browse files Browse the repository at this point in the history
closes #38
  • Loading branch information
NicolasDP committed Apr 16, 2024
1 parent 4edf947 commit 76031d2
Show file tree
Hide file tree
Showing 11 changed files with 294 additions and 222 deletions.
79 changes: 45 additions & 34 deletions netsim-async/examples/flooding_async.rs
@@ -1,27 +1,32 @@
use clap::Parser;
use netsim_async::{HasBytesSize, SimConfiguration, SimId, SimSocket};
use netsim_core::{Bandwidth, Edge, EdgePolicy, Latency, NodePolicy, PacketLoss};
use std::time::Duration;
use netsim_core::{time::Duration, Bandwidth, Edge, EdgePolicy, Latency, NodePolicy, PacketLoss};
use tokio::time::{sleep, Instant};

type SimContext = netsim_async::SimContext<Msg>;

#[derive(Parser)]
struct Command {
#[arg(long, default_value = "60")]
time: u64,
time: Duration,

#[arg(long, default_value = "10")]
every: u64,
every: Duration,

#[arg(long, default_value = "10")]
idle: Duration,
}

const LATENCY: Duration = Duration::from_millis(60);
const LATENCY: std::time::Duration = std::time::Duration::from_millis(60);

#[tokio::main]
async fn main() {
let cmd = Command::parse();

let configuration = SimConfiguration::default();
let configuration = SimConfiguration {
idle_duration: cmd.idle.into_duration(),
..SimConfiguration::default()
};

let mut context: SimContext = SimContext::with_config(configuration);

Expand All @@ -31,38 +36,44 @@ async fn main() {
let tap = Tap {
socket: context.open().unwrap(),
sink_id: sink.socket.id(),
every: Duration::from_millis(cmd.every),
every: cmd.every,
};

context.set_node_policy(
sink.socket.id(),
NodePolicy {
bandwidth_down: Bandwidth::bits_per(u64::MAX, Duration::from_secs(1)),
bandwidth_up: Bandwidth::bits_per(u64::MAX, Duration::from_secs(1)),
location: None,
},
);
context.set_node_policy(
tap.socket.id(),
NodePolicy {
bandwidth_down: Bandwidth::bits_per(u64::MAX, Duration::from_secs(1)),
bandwidth_up: Bandwidth::bits_per(u64::MAX, Duration::from_secs(1)),
location: None,
},
);
context.set_edge_policy(
Edge::new((tap.socket.id(), sink.socket.id())),
EdgePolicy {
latency: Latency::new(LATENCY),
packet_loss: PacketLoss::NONE,
..Default::default()
},
);
context
.set_node_policy(
sink.socket.id(),
NodePolicy {
bandwidth_down: Bandwidth::MAX,
bandwidth_up: Bandwidth::MAX,
location: None,
},
)
.unwrap();
context
.set_node_policy(
tap.socket.id(),
NodePolicy {
bandwidth_down: Bandwidth::MAX,
bandwidth_up: Bandwidth::MAX,
location: None,
},
)
.unwrap();
context
.set_edge_policy(
Edge::new((tap.socket.id(), sink.socket.id())),
EdgePolicy {
latency: Latency::new(LATENCY),
packet_loss: PacketLoss::NONE,
..Default::default()
},
)
.unwrap();

let sink = tokio::spawn(sink.work());
let tap = tokio::spawn(tap.work());

sleep(Duration::from_secs(cmd.time)).await;
sleep(cmd.time.into_duration()).await;

context.shutdown().unwrap();
sink.await.unwrap();
Expand Down Expand Up @@ -90,7 +101,7 @@ impl Sink {
}

let len = delays.len();
let total = delays.iter().copied().sum::<Duration>();
let total = delays.iter().copied().sum::<std::time::Duration>();
let avg = total / delays.len() as u32;

println!("sent {len} messages over. Msg received with an average of {avg:?} delays to the expected LATENCY");
Expand All @@ -117,7 +128,7 @@ impl Tap {
async fn work(mut self) {
while self.send_msg() {
let now = Instant::now();
sleep(self.every).await;
sleep(self.every.into_duration()).await;
let elapsed = now.elapsed();

println!("{elapsed:?}");
Expand Down
16 changes: 9 additions & 7 deletions netsim-async/examples/simple_async.rs
Expand Up @@ -12,13 +12,15 @@ async fn main() {
let net1 = context.open().unwrap();
let mut net2 = context.open().unwrap();

context.set_edge_policy(
Edge::new((net1.id(), net2.id())),
EdgePolicy {
latency: Latency::new(Duration::from_secs(1)),
..Default::default()
},
);
context
.set_edge_policy(
Edge::new((net1.id(), net2.id())),
EdgePolicy {
latency: Latency::new(Duration::from_secs(1)),
..Default::default()
},
)
.unwrap();

net1.send_to(net2.id(), MSG).unwrap();

Expand Down
28 changes: 22 additions & 6 deletions netsim-async/src/lib.rs
Expand Up @@ -11,7 +11,10 @@ pub use netsim_core::{
SimId,
};

pub struct SimSocket<T> {
pub struct SimSocket<T>
where
T: HasBytesSize,
{
reader: SimSocketReadHalf<T>,
writer: SimSocketWriteHalf<T>,
}
Expand All @@ -21,13 +24,23 @@ pub struct SimSocketReadHalf<T> {
down: SimDownLink<T>,
}

pub struct SimSocketWriteHalf<T> {
pub struct SimSocketWriteHalf<T>
where
T: HasBytesSize,
{
id: SimId,
up: BusSender<T>,
up: BusSender<SimUpLink<T>>,
}

impl<T> SimSocket<T> {
pub(crate) fn new(id: SimId, to_bus: BusSender<T>, receiver: SimDownLink<T>) -> Self {
impl<T> SimSocket<T>
where
T: HasBytesSize,
{
pub(crate) fn new(
id: SimId,
to_bus: BusSender<SimUpLink<T>>,
receiver: SimDownLink<T>,
) -> Self {
let reader = SimSocketReadHalf { id, down: receiver };
let writer = SimSocketWriteHalf { id, up: to_bus };

Expand Down Expand Up @@ -57,7 +70,10 @@ where
}
}

impl<T> SimSocketWriteHalf<T> {
impl<T> SimSocketWriteHalf<T>
where
T: HasBytesSize,
{
pub fn id(&self) -> SimId {
self.id
}
Expand Down
8 changes: 4 additions & 4 deletions netsim-async/src/sim_context.rs
Expand Up @@ -42,19 +42,19 @@ where
self.core.shutdown()
}

pub fn set_node_policy(&mut self, node: SimId, policy: NodePolicy) {
pub fn set_node_policy(&mut self, node: SimId, policy: NodePolicy) -> Result<()> {
self.core.set_node_policy(node, policy)
}

pub fn set_edge_policy(&mut self, edge: Edge, policy: EdgePolicy) {
pub fn set_edge_policy(&mut self, edge: Edge, policy: EdgePolicy) -> Result<()> {
self.core.set_edge_policy(edge, policy)
}

pub fn reset_node_policy(&mut self, node: SimId) {
pub fn reset_node_policy(&mut self, node: SimId) -> Result<()> {
self.core.reset_node_policy(node)
}

pub fn reset_edge_policy(&mut self, edge: Edge) {
pub fn reset_edge_policy(&mut self, edge: Edge) -> Result<()> {
self.core.reset_edge_policy(edge)
}
}
Expand Down
75 changes: 56 additions & 19 deletions netsim-core/src/bus.rs
@@ -1,50 +1,87 @@
use crate::Msg;
use crate::{sim_context::Link, Edge, EdgePolicy, Msg, NodePolicy, SimId};
use anyhow::{anyhow, Result};
use std::sync::mpsc;

pub enum BusMessage<T> {
Message(Msg<T>),
pub enum BusMessage<UpLink: Link> {
Message(Msg<UpLink::Msg>),
NodeAdd(UpLink, mpsc::SyncSender<SimId>),
NodePolicyDefault(NodePolicy),
NodePolicySet(SimId, NodePolicy),
NodePolicyReset(SimId),
EdgePolicyDefault(EdgePolicy),
EdgePolicySet(Edge, EdgePolicy),
EdgePolicyReset(Edge),
Shutdown,
Disconnected,
}

pub struct BusSender<T> {
sender: mpsc::Sender<BusMessage<T>>,
pub struct BusSender<UpLink: Link> {
sender: mpsc::Sender<BusMessage<UpLink>>,
}

pub(crate) struct BusReceiver<T> {
receiver: mpsc::Receiver<BusMessage<T>>,
pub(crate) struct BusReceiver<UpLink: Link> {
receiver: mpsc::Receiver<BusMessage<UpLink>>,
}

pub(crate) fn open_bus<T>() -> (BusSender<T>, BusReceiver<T>) {
pub(crate) fn open_bus<UpLink: Link>() -> (BusSender<UpLink>, BusReceiver<UpLink>) {
let (sender, receiver) = mpsc::channel();
(BusSender::new(sender), BusReceiver::new(receiver))
}

impl<T> BusSender<T> {
fn new(sender: mpsc::Sender<BusMessage<T>>) -> Self {
impl<UpLink: Link> BusSender<UpLink> {
fn new(sender: mpsc::Sender<BusMessage<UpLink>>) -> Self {
Self { sender }
}

pub fn send_msg(&self, msg: Msg<T>) -> Result<()> {
fn send(&self, msg: BusMessage<UpLink>) -> Result<()> {
self.sender
.send(BusMessage::Message(msg))
.send(msg)
.map_err(|error| anyhow!("failed to send message: {error}"))
}

pub fn send_msg(&self, msg: Msg<UpLink::Msg>) -> Result<()> {
self.send(BusMessage::Message(msg))
}

pub fn send_node_add(&self, link: UpLink, reply: mpsc::SyncSender<SimId>) -> Result<()> {
self.send(BusMessage::NodeAdd(link, reply))
}

pub fn send_node_policy_default(&self, policy: NodePolicy) -> Result<()> {
self.send(BusMessage::NodePolicyDefault(policy))
}

pub fn send_node_policy_set(&self, id: SimId, policy: NodePolicy) -> Result<()> {
self.send(BusMessage::NodePolicySet(id, policy))
}

pub fn send_node_policy_reset(&self, id: SimId) -> Result<()> {
self.send(BusMessage::NodePolicyReset(id))
}

pub fn send_edge_policy_default(&self, policy: EdgePolicy) -> Result<()> {
self.send(BusMessage::EdgePolicyDefault(policy))
}

pub fn send_edge_policy_set(&self, id: Edge, policy: EdgePolicy) -> Result<()> {
self.send(BusMessage::EdgePolicySet(id, policy))
}

pub fn send_edge_policy_reset(&self, id: Edge) -> Result<()> {
self.send(BusMessage::EdgePolicyReset(id))
}

pub(crate) fn send_shutdown(&self) -> Result<()> {
self.sender
.send(BusMessage::Shutdown)
.map_err(|error| anyhow!("failed to send message: {error}"))
self.send(BusMessage::Shutdown)
}
}

impl<T> BusReceiver<T> {
fn new(receiver: mpsc::Receiver<BusMessage<T>>) -> Self {
impl<UpLink: Link> BusReceiver<UpLink> {
fn new(receiver: mpsc::Receiver<BusMessage<UpLink>>) -> Self {
Self { receiver }
}

pub(crate) fn try_receive(&mut self) -> Option<BusMessage<T>> {
pub(crate) fn try_receive(&mut self) -> Option<BusMessage<UpLink>> {
match self.receiver.try_recv() {
Ok(bus_msg) => Some(bus_msg),
Err(mpsc::TryRecvError::Empty) => None,
Expand All @@ -53,7 +90,7 @@ impl<T> BusReceiver<T> {
}
}

impl<T> Clone for BusSender<T> {
impl<UpLink: Link> Clone for BusSender<UpLink> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
Expand Down
2 changes: 2 additions & 0 deletions netsim-core/src/policy.rs
Expand Up @@ -79,6 +79,8 @@ pub struct Policy {
}

impl Bandwidth {
pub const MAX: Self = Self(u64::MAX);

pub const fn bits_per(bits: u64, duration: Duration) -> Self {
Self(bits * duration.as_millis() as u64)
}
Expand Down

0 comments on commit 76031d2

Please sign in to comment.