diff --git a/src/builder.rs b/src/builder.rs index afcdb62..70b64e4 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -91,6 +91,16 @@ impl Builder { self } + pub fn tcp_capacity(&mut self, value: usize) -> &mut Self { + self.config.tcp_capacity = value; + self + } + + pub fn udp_capacity(&mut self, value: usize) -> &mut Self { + self.config.udp_capacity = value; + self + } + pub fn build<'a>(&self) -> Sim<'a> { self.build_with_rng(Box::new(rand::rngs::SmallRng::from_entropy())) } diff --git a/src/config.rs b/src/config.rs index 4573ced..5e3a247 100644 --- a/src/config.rs +++ b/src/config.rs @@ -11,6 +11,12 @@ pub(crate) struct Config { /// When the simulation starts pub(crate) epoch: SystemTime, + + /// Max size of the tcp receive buffer + pub(crate) tcp_capacity: usize, + + /// Max size of the udp receive buffer + pub(crate) udp_capacity: usize, } /// Configures link behavior. @@ -52,6 +58,8 @@ impl Default for Config { duration: Duration::from_secs(10), tick: Duration::from_millis(1), epoch: SystemTime::now(), + tcp_capacity: 64, + udp_capacity: 64, } } } diff --git a/src/host.rs b/src/host.rs index ed0b0dc..2418367 100644 --- a/src/host.rs +++ b/src/host.rs @@ -40,11 +40,11 @@ pub(crate) struct Host { } impl Host { - pub(crate) fn new(addr: IpAddr) -> Host { + pub(crate) fn new(addr: IpAddr, tcp_capacity: usize, udp_capacity: usize) -> Host { Host { addr, - udp: Udp::new(), - tcp: Tcp::new(), + udp: Udp::new(udp_capacity), + tcp: Tcp::new(tcp_capacity), next_ephemeral_port: 49152, elapsed: Duration::ZERO, now: None, @@ -138,11 +138,10 @@ struct UdpBind { } impl Udp { - fn new() -> Self { + fn new(capacity: usize) -> Self { Self { binds: IndexMap::new(), - // TODO: Make capacity configurable - capacity: 64, + capacity, } } @@ -276,7 +275,7 @@ impl StreamSocket { // Buffer and re-order received segments by `seq` as the network may deliver // them out of order. fn buffer(&mut self, seq: u64, segment: SequencedSegment) -> Result<(), Protocol> { - use mpsc::error::TrySendError::Closed; + use mpsc::error::TrySendError::*; let exists = self.buf.insert(seq, segment); @@ -288,7 +287,7 @@ impl StreamSocket { let segment = self.buf.remove(&self.recv_seq).unwrap(); self.sender.try_send(segment).map_err(|e| match e { Closed(_) => Protocol::Tcp(Segment::Rst), - _ => todo!("{} socket buffer full", self.local_addr), + Full(_) => panic!("{} socket buffer full", self.local_addr), })?; } @@ -297,13 +296,12 @@ impl StreamSocket { } impl Tcp { - fn new() -> Self { + fn new(capacity: usize) -> Self { Self { binds: IndexMap::new(), sockets: IndexMap::new(), - // TODO: Make capacity configurable - server_socket_capacity: 64, - socket_capacity: 64, + server_socket_capacity: capacity, + socket_capacity: capacity, } } @@ -364,7 +362,7 @@ impl Tcp { // connection refused on the client. if let Some(b) = self.binds.get_mut(&dst.port()) { if b.deque.len() == self.server_socket_capacity { - todo!("{} server socket buffer full", dst); + panic!("{} server socket buffer full", dst); } if matches(b.bind_addr, dst) { @@ -427,7 +425,7 @@ mod test { #[test] fn recycle_ports() -> Result { - let mut host = Host::new(std::net::Ipv4Addr::UNSPECIFIED.into()); + let mut host = Host::new(std::net::Ipv4Addr::UNSPECIFIED.into(), 1, 1); host.udp.bind((host.addr, 65534).into())?; host.udp.bind((host.addr, 65535).into())?; diff --git a/src/sim.rs b/src/sim.rs index 92152a7..f8cae6b 100644 --- a/src/sim.rs +++ b/src/sim.rs @@ -70,7 +70,7 @@ impl<'a> Sim<'a> { let world = RefCell::get_mut(&mut self.world); // Register host state with the world - world.register(addr); + world.register(addr, &self.config); } let rt = World::enter(&self.world, || Rt::client(client)); @@ -95,7 +95,7 @@ impl<'a> Sim<'a> { let world = RefCell::get_mut(&mut self.world); // Register host state with the world - world.register(addr); + world.register(addr, &self.config); } let rt = World::enter(&self.world, || Rt::host(host)); diff --git a/src/world.rs b/src/world.rs index 9c4343f..1fddd7d 100644 --- a/src/world.rs +++ b/src/world.rs @@ -1,3 +1,4 @@ +use crate::config::Config; use crate::envelope::Protocol; use crate::ip::IpVersionAddrIter; use crate::{config, Dns, Host, ToIpAddr, ToIpAddrs, Topology, TRACING_TARGET}; @@ -98,7 +99,7 @@ impl World { } /// Register a new host with the simulation. - pub(crate) fn register(&mut self, addr: IpAddr) { + pub(crate) fn register(&mut self, addr: IpAddr, config: &Config) { assert!( !self.hosts.contains_key(&addr), "already registered host for the given ip address" @@ -112,7 +113,10 @@ impl World { } // Initialize host state - self.hosts.insert(addr, Host::new(addr)); + self.hosts.insert( + addr, + Host::new(addr, config.tcp_capacity, config.udp_capacity), + ); } /// Send `message` from `src` to `dst`. Delivery is asynchronous and not diff --git a/tests/tcp.rs b/tests/tcp.rs index 43b595f..f792ffc 100644 --- a/tests/tcp.rs +++ b/tests/tcp.rs @@ -910,3 +910,33 @@ fn remote_dropped() -> Result { sim.run() } + +#[test] +#[should_panic(expected = "192.168.0.1:80 server socket buffer full")] +fn socket_capacity() { + let mut sim = Builder::new() + .min_message_latency(Duration::from_millis(1)) + .max_message_latency(Duration::from_millis(1)) + .tcp_capacity(1) + .build(); + + sim.host("server", || async { + let l = TcpListener::bind(("0.0.0.0", 80)).await?; + + loop { + _ = l.accept().await?; + } + }); + + sim.client("client1", async move { + _ = TcpStream::connect("server:80").await?; + Ok(()) + }); + + sim.client("client2", async move { + _ = TcpStream::connect("server:80").await?; + Ok(()) + }); + + _ = sim.run(); +} diff --git a/tests/udp.rs b/tests/udp.rs index 3a9360c..76beadd 100644 --- a/tests/udp.rs +++ b/tests/udp.rs @@ -559,3 +559,38 @@ fn localhost_ping_pong() -> Result { }); sim.run() } + +#[test] +fn socket_capacity() -> Result { + let mut sim = Builder::new() + .min_message_latency(Duration::from_millis(1)) + .max_message_latency(Duration::from_millis(1)) + .udp_capacity(1) + .build(); + + let (tx, rx) = oneshot::channel(); + + sim.client("server", async move { + let s = bind().await?; + + _ = rx.await; + recv_ping(&s).await?; + assert!(timeout(Duration::from_secs(1), recv_ping(&s)) + .await + .is_err()); + + Ok(()) + }); + + sim.client("client", async move { + let s = bind().await?; + + send_ping(&s).await?; + send_ping(&s).await?; // dropped + _ = tx.send(()); + + Ok(()) + }); + + sim.run() +}