Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Drop oldest udp message when buffer is full #128

Merged
merged 1 commit into from
Jul 17, 2023

Conversation

tthebst
Copy link
Contributor

@tthebst tthebst commented Jul 16, 2023

Resolved following todo, which is also mentioned in issue #105 :

TODO: ideally we should drop the oldest packets instead of new ones, but this would require a different channel implementation.

How:

Replaces mpsc queue with broadcast queue which has the behaviour we want. According to tokio docs

If a value is sent when the channel is at capacity, the oldest value currently held by the channel is released. This frees up space for the new value. Any receiver that has not yet seen the released value will return RecvError::Lagged the next time recv is called.

To use this channel there is a one public API change required. Datagram needs to implement Clone.

@tthebst tthebst changed the title Tim/replace oldest Drop oldest udp message when buffer is full Jul 16, 2023
@mcches mcches requested a review from camshaft July 17, 2023 15:26
@camshaft
Copy link
Collaborator

camshaft commented Jul 17, 2023

So I wanted to make sure that this behavior correctly matches existing network stacks so I wrote a test:

use s2n_quic_core::interval_set::IntervalSet;
use socket2::{Domain, Socket, Type};
use std::{net::SocketAddr, time::Duration};
use tokio::{net::UdpSocket, time::timeout};

#[tokio::test]
async fn queue_behavior() -> std::io::Result<()> {
    let packet_count = 1_000u32;

    let rx_socket = {
        let socket = Socket::new(Domain::IPV4, Type::DGRAM, None)?;

        socket.set_recv_buffer_size(u16::MAX as _)?;
        socket.set_nonblocking(true)?;

        let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
        socket.bind(&addr.into())?;
        UdpSocket::from_std(socket.into())?
    };
    let tx_socket = UdpSocket::bind("0.0.0.0:0").await?;

    let rx_addr = rx_socket.local_addr()?;

    dbg!(rx_addr);

    let mut payload = vec![0u8; 1200];

    eprintln!("sending {packet_count} packets");

    for i in 0..packet_count {
        payload[..4].copy_from_slice(&i.to_be_bytes());
        let _ = tx_socket.send_to(&payload, rx_addr).await;
    }

    eprintln!("receiving packets");

    let mut packets = IntervalSet::new();

    while let Ok(Ok((payload_len, _remote))) = timeout(
        Duration::from_millis(1000),
        rx_socket.recv_from(&mut payload),
    )
    .await
    {
        if payload_len < 4 {
            continue;
        }
        let i = u32::from_be_bytes((payload[..4]).try_into().unwrap());
        packets.insert_value(i).unwrap();
    }

    dbg!(packets);

    Ok(())
}
[package]
name = "udp-queue-test"
version = "0.1.0"
edition = "2021"
publish = false

[dependencies]
tokio = { version = "1", features = ["full"] }
s2n-quic-core = "0.24"
socket2 = "0.5"

Running on my Linux machine it turns out that my assumptions (and the TODO I left for that) don't match reality:

$ cargo test -- --nocapture
running 1 test
[src/main.rs:28] rx_addr = 127.0.0.1:60960
sending 1000 packets
receiving packets
[src/main.rs:56] packets = {
    0..=56,
}
test queue_behavior ... ok

If the behavior was preferring the most recent packets I would expect that packets is equal to something like

{
  943..=999
}

@camshaft
Copy link
Collaborator

Same behavior on macOS:

running 1 test
[src/main.rs:24] rx_addr = 127.0.0.1:58675
sending 1000 packets
receiving packets
[src/main.rs:52] packets = {
    0..=52,
}

@tthebst
Copy link
Contributor Author

tthebst commented Jul 17, 2023

In that case we can close this MR. Thanks for actually verifying the behaviour!

@camshaft
Copy link
Collaborator

Sorry for not checking before I put the TODO 😄!

@camshaft
Copy link
Collaborator

Actually, if you could update your PR to just remove the TODO that would be helpful. Otherwise, I can later.

@mcches mcches merged commit af1bbf3 into tokio-rs:main Jul 17, 2023
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants