Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make socket buffer capacity configurable #121

Merged
merged 2 commits into from
Jun 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ impl Builder {
self
}

pub fn tcp_capacity(&mut self, value: usize) -> &mut Self {
self.config.tcp_capacity = value;
self
}

pub fn udp_capacity(&mut self, value: usize) -> &mut Self {
self.config.udp_capacity = value;
self
}

pub fn build<'a>(&self) -> Sim<'a> {
self.build_with_rng(Box::new(rand::rngs::SmallRng::from_entropy()))
}
Expand Down
8 changes: 8 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ pub(crate) struct Config {

/// When the simulation starts
pub(crate) epoch: SystemTime,

/// Max size of the tcp receive buffer
pub(crate) tcp_capacity: usize,

/// Max size of the udp receive buffer
pub(crate) udp_capacity: usize,
}

/// Configures link behavior.
Expand Down Expand Up @@ -52,6 +58,8 @@ impl Default for Config {
duration: Duration::from_secs(10),
tick: Duration::from_millis(1),
epoch: SystemTime::now(),
tcp_capacity: 64,
udp_capacity: 64,
}
}
}
Expand Down
26 changes: 12 additions & 14 deletions src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ pub(crate) struct Host {
}

impl Host {
pub(crate) fn new(addr: IpAddr) -> Host {
pub(crate) fn new(addr: IpAddr, tcp_capacity: usize, udp_capacity: usize) -> Host {
Host {
addr,
udp: Udp::new(),
tcp: Tcp::new(),
udp: Udp::new(udp_capacity),
tcp: Tcp::new(tcp_capacity),
next_ephemeral_port: 49152,
elapsed: Duration::ZERO,
now: None,
Expand Down Expand Up @@ -138,11 +138,10 @@ struct UdpBind {
}

impl Udp {
fn new() -> Self {
fn new(capacity: usize) -> Self {
Self {
binds: IndexMap::new(),
// TODO: Make capacity configurable
capacity: 64,
capacity,
}
}

Expand Down Expand Up @@ -276,7 +275,7 @@ impl StreamSocket {
// Buffer and re-order received segments by `seq` as the network may deliver
// them out of order.
fn buffer(&mut self, seq: u64, segment: SequencedSegment) -> Result<(), Protocol> {
use mpsc::error::TrySendError::Closed;
use mpsc::error::TrySendError::*;

let exists = self.buf.insert(seq, segment);

Expand All @@ -288,7 +287,7 @@ impl StreamSocket {
let segment = self.buf.remove(&self.recv_seq).unwrap();
self.sender.try_send(segment).map_err(|e| match e {
Closed(_) => Protocol::Tcp(Segment::Rst),
_ => todo!("{} socket buffer full", self.local_addr),
Full(_) => panic!("{} socket buffer full", self.local_addr),
})?;
}

Expand All @@ -297,13 +296,12 @@ impl StreamSocket {
}

impl Tcp {
fn new() -> Self {
fn new(capacity: usize) -> Self {
Self {
binds: IndexMap::new(),
sockets: IndexMap::new(),
// TODO: Make capacity configurable
server_socket_capacity: 64,
socket_capacity: 64,
server_socket_capacity: capacity,
socket_capacity: capacity,
}
}

Expand Down Expand Up @@ -364,7 +362,7 @@ impl Tcp {
// connection refused on the client.
if let Some(b) = self.binds.get_mut(&dst.port()) {
if b.deque.len() == self.server_socket_capacity {
todo!("{} server socket buffer full", dst);
panic!("{} server socket buffer full", dst);
}

if matches(b.bind_addr, dst) {
Expand Down Expand Up @@ -427,7 +425,7 @@ mod test {

#[test]
fn recycle_ports() -> Result {
let mut host = Host::new(std::net::Ipv4Addr::UNSPECIFIED.into());
let mut host = Host::new(std::net::Ipv4Addr::UNSPECIFIED.into(), 1, 1);

host.udp.bind((host.addr, 65534).into())?;
host.udp.bind((host.addr, 65535).into())?;
Expand Down
4 changes: 2 additions & 2 deletions src/sim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl<'a> Sim<'a> {
let world = RefCell::get_mut(&mut self.world);

// Register host state with the world
world.register(addr);
world.register(addr, &self.config);
}

let rt = World::enter(&self.world, || Rt::client(client));
Expand All @@ -95,7 +95,7 @@ impl<'a> Sim<'a> {
let world = RefCell::get_mut(&mut self.world);

// Register host state with the world
world.register(addr);
world.register(addr, &self.config);
}

let rt = World::enter(&self.world, || Rt::host(host));
Expand Down
8 changes: 6 additions & 2 deletions src/world.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::config::Config;
use crate::envelope::Protocol;
use crate::ip::IpVersionAddrIter;
use crate::{config, Dns, Host, ToIpAddr, ToIpAddrs, Topology, TRACING_TARGET};
Expand Down Expand Up @@ -98,7 +99,7 @@ impl World {
}

/// Register a new host with the simulation.
pub(crate) fn register(&mut self, addr: IpAddr) {
pub(crate) fn register(&mut self, addr: IpAddr, config: &Config) {
assert!(
!self.hosts.contains_key(&addr),
"already registered host for the given ip address"
Expand All @@ -112,7 +113,10 @@ impl World {
}

// Initialize host state
self.hosts.insert(addr, Host::new(addr));
self.hosts.insert(
addr,
Host::new(addr, config.tcp_capacity, config.udp_capacity),
);
}

/// Send `message` from `src` to `dst`. Delivery is asynchronous and not
Expand Down
30 changes: 30 additions & 0 deletions tests/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,3 +910,33 @@ fn remote_dropped() -> Result {

sim.run()
}

#[test]
#[should_panic(expected = "192.168.0.1:80 server socket buffer full")]
fn socket_capacity() {
let mut sim = Builder::new()
.min_message_latency(Duration::from_millis(1))
.max_message_latency(Duration::from_millis(1))
.tcp_capacity(1)
.build();

sim.host("server", || async {
let l = TcpListener::bind(("0.0.0.0", 80)).await?;

loop {
_ = l.accept().await?;
}
});

sim.client("client1", async move {
_ = TcpStream::connect("server:80").await?;
Ok(())
});

sim.client("client2", async move {
_ = TcpStream::connect("server:80").await?;
Ok(())
});

_ = sim.run();
}
35 changes: 35 additions & 0 deletions tests/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,3 +559,38 @@ fn localhost_ping_pong() -> Result {
});
sim.run()
}

#[test]
fn socket_capacity() -> Result {
let mut sim = Builder::new()
.min_message_latency(Duration::from_millis(1))
.max_message_latency(Duration::from_millis(1))
.udp_capacity(1)
.build();

let (tx, rx) = oneshot::channel();

sim.client("server", async move {
let s = bind().await?;

_ = rx.await;
recv_ping(&s).await?;
assert!(timeout(Duration::from_secs(1), recv_ping(&s))
.await
.is_err());

Ok(())
});

sim.client("client", async move {
let s = bind().await?;

send_ping(&s).await?;
send_ping(&s).await?; // dropped
mcches marked this conversation as resolved.
Show resolved Hide resolved
_ = tx.send(());

Ok(())
});

sim.run()
}