Skip to content

Commit e13a5f6

Browse files
committed
add router stats and spawn with delay
1 parent 59c5eec commit e13a5f6

File tree

5 files changed

+88
-19
lines changed

5 files changed

+88
-19
lines changed

Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,21 @@ members = ["cli", "core", "machine", "macros", "nat", "router", "."]
33

44
[package]
55
name = "netsim-embed"
6-
version = "0.8.0"
6+
version = "0.9.0"
77
authors = ["David Craven <david@craven.ch>"]
88
edition = "2018"
99
description = "Network simulator."
1010
license = "MIT"
1111
repository = "https://github.com/ipfs-rust/netsim-embed"
1212

1313
[features]
14-
ipc = ["dep:libtest-mimic", "dep:anyhow", "dep:ipc-channel", "dep:netsim-embed-macros", "dep:serde"]
14+
ipc = [
15+
"dep:libtest-mimic",
16+
"dep:anyhow",
17+
"dep:ipc-channel",
18+
"dep:netsim-embed-macros",
19+
"dep:serde",
20+
]
1521

1622
[dependencies]
1723
anyhow = { version = "1.0.40", optional = true }

router/src/lib.rs

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,18 @@
1-
use futures::channel::{mpsc, oneshot};
2-
use futures::future::{poll_fn, FutureExt};
3-
use futures::stream::{FuturesUnordered, StreamExt};
1+
use futures::{
2+
channel::{mpsc, oneshot},
3+
future::{poll_fn, FutureExt},
4+
stream::{FuturesUnordered, StreamExt},
5+
};
46
use libpacket::ipv4::Ipv4Packet;
57
use netsim_embed_core::{Ipv4Route, Plug};
6-
use std::net::Ipv4Addr;
7-
use std::task::Poll;
8+
use std::{
9+
net::Ipv4Addr,
10+
sync::{
11+
atomic::{AtomicUsize, Ordering},
12+
Arc,
13+
},
14+
task::Poll,
15+
};
816

917
#[derive(Debug)]
1018
#[allow(clippy::enum_variant_names)]
@@ -20,13 +28,43 @@ pub struct Ipv4Router {
2028
#[allow(unused)]
2129
addr: Ipv4Addr,
2230
ctrl: mpsc::UnboundedSender<RouterCtrl>,
31+
counters: Arc<Counters>,
32+
}
33+
34+
#[derive(Debug, Default)]
35+
struct Counters {
36+
forwarded: AtomicUsize,
37+
invalid: AtomicUsize,
38+
disabled: AtomicUsize,
39+
unroutable: AtomicUsize,
2340
}
2441

2542
impl Ipv4Router {
2643
pub fn new(addr: Ipv4Addr) -> Self {
2744
let (tx, rx) = mpsc::unbounded();
28-
router(addr, rx);
29-
Self { addr, ctrl: tx }
45+
let counters = Arc::new(Counters::default());
46+
router(addr, Arc::clone(&counters), rx);
47+
Self {
48+
addr,
49+
ctrl: tx,
50+
counters,
51+
}
52+
}
53+
54+
pub fn forwarded(&self) -> usize {
55+
self.counters.forwarded.load(Ordering::Relaxed)
56+
}
57+
58+
pub fn invalid(&self) -> usize {
59+
self.counters.invalid.load(Ordering::Relaxed)
60+
}
61+
62+
pub fn disabled(&self) -> usize {
63+
self.counters.disabled.load(Ordering::Relaxed)
64+
}
65+
66+
pub fn unroutable(&self) -> usize {
67+
self.counters.unroutable.load(Ordering::Relaxed)
3068
}
3169

3270
pub fn add_connection(&self, id: usize, plug: Plug, routes: Vec<Ipv4Route>) {
@@ -56,7 +94,7 @@ impl Ipv4Router {
5694
}
5795
}
5896

59-
fn router(addr: Ipv4Addr, mut ctrl: mpsc::UnboundedReceiver<RouterCtrl>) {
97+
fn router(addr: Ipv4Addr, counters: Arc<Counters>, mut ctrl: mpsc::UnboundedReceiver<RouterCtrl>) {
6098
async_global_executor::spawn(async move {
6199
let mut conns = vec![];
62100
loop {
@@ -87,7 +125,7 @@ fn router(addr: Ipv4Addr, mut ctrl: mpsc::UnboundedReceiver<RouterCtrl>) {
87125
None => break,
88126
},
89127
incoming = incoming(&mut conns).fuse() => match incoming {
90-
(_, Some(packet)) => forward_packet(addr, &mut conns, packet),
128+
(_, Some(packet)) => forward_packet(addr, &counters, &mut conns, packet),
91129
(i, None) => { conns.swap_remove(i); }
92130
}
93131
}
@@ -111,12 +149,14 @@ async fn incoming(conns: &mut [(usize, Plug, Vec<Ipv4Route>, bool)]) -> (usize,
111149

112150
fn forward_packet(
113151
addr: Ipv4Addr,
152+
counters: &Counters,
114153
conns: &mut [(usize, Plug, Vec<Ipv4Route>, bool)],
115154
bytes: Vec<u8>,
116155
) {
117156
let packet = if let Some(packet) = Ipv4Packet::new(&bytes) {
118157
packet
119158
} else {
159+
counters.invalid.fetch_add(1, Ordering::Relaxed);
120160
log::info!("router {}: dropping invalid ipv4 packet", addr);
121161
return;
122162
};
@@ -130,8 +170,10 @@ fn forward_packet(
130170
for route in routes {
131171
if route.dest().contains(dest) || dest.is_broadcast() || dest.is_multicast() {
132172
if !*en {
173+
counters.disabled.fetch_add(1, Ordering::Relaxed);
133174
log::trace!("router {}: route {:?} disabled", addr, route);
134175
} else {
176+
counters.forwarded.fetch_add(1, Ordering::Relaxed);
135177
log::trace!("router {}: routing packet on route {:?}", addr, route);
136178
tx.unbounded_send(bytes.clone());
137179
forwarded = true;
@@ -141,6 +183,7 @@ fn forward_packet(
141183
}
142184
if !forwarded {
143185
let src = packet.get_source();
186+
counters.unroutable.fetch_add(1, Ordering::Relaxed);
144187
log::debug!(
145188
"router {}: dropping unroutable packet from {} to {}",
146189
addr,

src/lib.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use async_process::Command;
22
use futures::prelude::*;
3-
pub use libpacket::*;
43
use netsim_embed_core::*;
54
pub use netsim_embed_core::{DelayBuffer, Ipv4Range, Protocol};
65
pub use netsim_embed_machine::{unshare_user, Machine, MachineId, Namespace};
@@ -72,7 +71,12 @@ where
7271
}
7372

7473
#[cfg(feature = "ipc")]
75-
pub async fn spawn<M: MachineFn>(&mut self, _machine: M, arg: M::Arg) -> MachineId {
74+
pub async fn spawn<M: MachineFn>(
75+
&mut self,
76+
_machine: M,
77+
arg: M::Arg,
78+
delay: Option<DelayBuffer>,
79+
) -> MachineId {
7680
use ipc_channel::ipc;
7781
let id = M::id();
7882
let (server, server_name) = ipc::IpcOneShotServer::<ipc::IpcSender<M::Arg>>::new().unwrap();
@@ -82,7 +86,7 @@ where
8286
&format!("{id}"),
8387
&server_name,
8488
]);
85-
let machine = self.spawn_machine(command, None).await;
89+
let machine = self.spawn_machine(command, delay).await;
8690
let (_, ipc) = async_global_executor::spawn_blocking(|| server.accept())
8791
.await
8892
.unwrap();
@@ -235,6 +239,22 @@ impl Network {
235239
self.range
236240
}
237241

242+
pub fn num_forwarded(&self) -> usize {
243+
self.router.forwarded()
244+
}
245+
246+
pub fn num_invalid(&self) -> usize {
247+
self.router.invalid()
248+
}
249+
250+
pub fn num_disabled(&self) -> usize {
251+
self.router.disabled()
252+
}
253+
254+
pub fn num_unroutable(&self) -> usize {
255+
self.router.unroutable()
256+
}
257+
238258
pub fn unique_addr(&mut self) -> Ipv4Addr {
239259
let addr = self.range.address_for(self.device);
240260
self.device += 1;

tests/smoke_test.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ fn add((left, right, sender): (IpcReceiver<usize>, IpcReceiver<usize>, IpcSender
1515
fn can_send_one() {
1616
let mut s = netsim_embed::Netsim::<String, String>::new();
1717
let (sender, receiver) = ipc_channel::ipc::channel().unwrap();
18-
let _ = async_std::task::block_on(s.spawn(send_one, sender));
18+
let _ = async_std::task::block_on(s.spawn(send_one, sender, None));
1919
assert_eq!(1, receiver.recv().unwrap());
2020
}
2121

@@ -24,13 +24,13 @@ fn one_plus_one_makes_two() {
2424
let mut s = netsim_embed::Netsim::<String, String>::new();
2525

2626
let (sender1, receiver1) = ipc_channel::ipc::channel::<usize>().unwrap();
27-
let _ = s.spawn(send_one, sender1).await;
27+
let _ = s.spawn(send_one, sender1, None).await;
2828

2929
let (sender2, receiver2) = ipc_channel::ipc::channel::<usize>().unwrap();
30-
let _ = s.spawn(send_one, sender2).await;
30+
let _ = s.spawn(send_one, sender2, None).await;
3131

3232
let (sender3, receiver3) = ipc_channel::ipc::channel::<usize>().unwrap();
33-
let _ = s.spawn(add, (receiver1, receiver2, sender3)).await;
33+
let _ = s.spawn(add, (receiver1, receiver2, sender3), None).await;
3434

3535
assert_eq!(2, receiver3.recv().unwrap());
3636
})

0 commit comments

Comments
 (0)