Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Match GSO segment size to the first datagram, not the MTU #1837

Merged
merged 6 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 4 additions & 13 deletions quinn-proto/src/connection/datagrams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use tracing::{debug, trace};
use super::Connection;
use crate::{
frame::{Datagram, FrameStruct},
packet::SpaceId,
TransportError,
};

Expand Down Expand Up @@ -68,19 +67,11 @@ impl<'a> Datagrams<'a> {
///
/// Not necessarily the maximum size of received datagrams.
pub fn max_size(&self) -> Option<usize> {
let key = match self.conn.spaces[SpaceId::Data].crypto.as_ref() {
Some(crypto) => Some(&*crypto.packet.local),
None => self.conn.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
};
// If neither Data nor 0-RTT keys are available, make a reasonable tag length guess. As of
// this writing, all QUIC cipher suites use 16-byte tags. We could return `None` instead,
// but that would needlessly prevent sending datagrams during 0-RTT.
let tag_len = key.map_or(16, |x| x.tag_len());
// We use the conservative overhead bound for any packet number, reducing the budget by at
// most 3 bytes, so that PN size fluctuations don't cause users sending maximum-size
// datagrams to suffer avoidable packet loss.
let max_size = self.conn.path.current_mtu() as usize
- 1 // flags byte
- self.conn.rem_cids.active().len()
- 4 // worst-case packet number size
- tag_len
- self.conn.predict_1rtt_overhead(None)
- Datagram::SIZE_BOUND;
let limit = self
.conn
Expand Down
142 changes: 118 additions & 24 deletions quinn-proto/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use crate::{
crypto::{self, KeyPair, Keys, PacketKey},
frame,
frame::{Close, Datagram, FrameStruct},
packet::{Header, InitialHeader, InitialPacket, LongType, Packet, PartialDecode, SpaceId},
packet::{
Header, InitialHeader, InitialPacket, LongType, Packet, PacketNumber, PartialDecode,
SpaceId,
},
range_set::ArrayRangeSet,
shared::{
ConnectionEvent, ConnectionEventInner, ConnectionId, DatagramConnectionEvent, EcnCodepoint,
Expand Down Expand Up @@ -471,6 +474,10 @@ impl Connection {
};

let mut num_datagrams = 0;
// Position in `buf` of the first byte of the current UDP datagram. When coalescing QUIC
// packets, this can be earlier than the start of the current QUIC packet.
let mut datagram_start = 0;
let mut segment_size = usize::from(self.path.current_mtu());

// Send PATH_CHALLENGE for a previous path if necessary
if let Some(ref mut prev_path) = self.prev_path {
Expand Down Expand Up @@ -556,9 +563,9 @@ impl Connection {
&& self.peer_supports_ack_frequency();
}

// Reserving capacity can provide more capacity than we asked for.
// However we are not allowed to write more than MTU size. Therefore
// the maximum capacity is tracked separately.
// Reserving capacity can provide more capacity than we asked for. However, we are not
// allowed to write more than `segment_size`. Therefore the maximum capacity is tracked
// separately.
let mut buf_capacity = 0;

let mut coalesce = true;
Expand All @@ -574,9 +581,18 @@ impl Connection {
// so we cannot trivially rewrite it to take advantage of `SpaceId::iter()`.
while space_idx < spaces.len() {
let space_id = spaces[space_idx];
// Number of bytes available for frames if this is a 1-RTT packet. We're guaranteed to
// be able to send an individual frame at least this large in the next 1-RTT
// packet. This could be generalized to support every space, but it's only needed to
// handle large fixed-size frames, which only exist in 1-RTT (application datagrams). We
// don't account for coalesced packets potentially occupying space because frames can
// always spill into the next datagram.
let pn = self.packet_number_filter.peek(&self.spaces[SpaceId::Data]);
let frame_space_1rtt =
segment_size.saturating_sub(self.predict_1rtt_overhead(Some(pn)));

// Is there data or a close message to send in this space?
let can_send = self.space_can_send(space_id);
let can_send = self.space_can_send(space_id, frame_space_1rtt);
if can_send.is_empty() && (!close || self.spaces[space_id].crypto.is_none()) {
space_idx += 1;
continue;
Expand All @@ -586,7 +602,7 @@ impl Connection {
|| self.spaces[space_id].ping_pending
|| self.spaces[space_id].immediate_ack_pending;
if space_id == SpaceId::Data {
ack_eliciting |= self.can_send_1rtt();
ack_eliciting |= self.can_send_1rtt(frame_space_1rtt);
}

// Can we append more data into the current buffer?
Expand All @@ -602,7 +618,7 @@ impl Connection {
// We need to send 1 more datagram and extend the buffer for that.

// Is 1 more datagram allowed?
if buf_capacity >= self.path.current_mtu() as usize * max_datagrams {
if buf_capacity >= segment_size * max_datagrams {
// No more datagrams allowed
break;
}
Expand All @@ -615,7 +631,7 @@ impl Connection {
// (see https://github.com/quinn-rs/quinn/issues/1082)
if self
.path
.anti_amplification_blocked(self.path.current_mtu() as u64 * num_datagrams + 1)
.anti_amplification_blocked(segment_size as u64 * num_datagrams + 1)
{
trace!("blocked by anti-amplification");
break;
Expand All @@ -624,15 +640,15 @@ impl Connection {
// Congestion control and pacing checks
// Tail loss probes must not be blocked by congestion, or a deadlock could arise
if ack_eliciting && self.spaces[space_id].loss_probes == 0 {
// Assume the current packet will get padded to fill the full MTU
// Assume the current packet will get padded to fill the segment
let untracked_bytes = if let Some(builder) = &builder_storage {
buf_capacity - builder.partial_encode.start
} else {
0
} as u64;
debug_assert!(untracked_bytes <= self.path.current_mtu() as u64);
debug_assert!(untracked_bytes <= segment_size as u64);

let bytes_to_send = u64::from(self.path.current_mtu()) + untracked_bytes;
let bytes_to_send = segment_size as u64 + untracked_bytes;
if self.path.in_flight.bytes + bytes_to_send >= self.path.congestion.window() {
space_idx += 1;
congestion_blocked = true;
Expand Down Expand Up @@ -662,17 +678,52 @@ impl Connection {

// Finish current packet
if let Some(mut builder) = builder_storage.take() {
// Pad the packet to make it suitable for sending with GSO
// which will always send the maximum PDU.
builder.pad_to(self.path.current_mtu());
if pad_datagram {
builder.pad_to(MIN_INITIAL_SIZE);
}

if num_datagrams > 1 {
// If too many padding bytes would be required to continue the GSO batch
// after this packet, end the GSO batch here. Ensures that fixed-size frames
// with heterogeneous sizes (e.g. application datagrams) won't inadvertently
// waste large amounts of bandwidth. The exact threshold is a bit arbitrary
// and might benefit from further tuning, though there's no universally
// optimal value.
const MAX_PADDING: usize = 16;
let packet_len_unpadded = cmp::max(builder.min_size, buf.len())
- datagram_start
+ builder.tag_len;
if packet_len_unpadded + MAX_PADDING < segment_size {
trace!(
"GSO truncated by demand for {} padding bytes",
segment_size - packet_len_unpadded
);
break;
}

// Pad the current packet to GSO segment size so it can be included in the
// GSO batch.
builder.pad_to(segment_size as u16);
}

builder.finish_and_track(now, self, sent_frames.take(), buf);

debug_assert_eq!(buf.len(), buf_capacity, "Packet must be padded");
if num_datagrams == 1 {
// Set the segment size for this GSO batch to the size of the first UDP
// datagram in the batch. Larger data that cannot be fragmented
// (e.g. application datagrams) will be included in a future batch. When
// sending large enough volumes of data for GSO to be useful, we expect
// packet sizes to usually be consistent, e.g. populated by max-size STREAM
// frames or uniformly sized datagrams.
segment_size = buf.len();
// Clip the unused capacity out of the buffer so future packets don't
// overrun
buf_capacity = buf.len();
}
}

// Allocate space for another datagram
buf_capacity += self.path.current_mtu() as usize;
buf_capacity += segment_size;
if buf.capacity() < buf_capacity {
// We reserve the maximum space for sending `max_datagrams` upfront
// to avoid any reallocations if more datagrams have to be appended later on.
Expand All @@ -682,11 +733,18 @@ impl Connection {
// (e.g. purely containing ACKs), modern memory allocators
// (e.g. mimalloc and jemalloc) will pool certain allocation sizes
// and therefore this is still rather efficient.
buf.reserve(max_datagrams * self.path.current_mtu() as usize);
buf.reserve(max_datagrams * segment_size);
}
num_datagrams += 1;
coalesce = true;
pad_datagram = false;
datagram_start = buf.len();

debug_assert_eq!(
datagram_start % segment_size,
0,
"datagrams in a GSO batch must be aligned to the segment size"
);
} else {
// We can append/coalesce the next packet into the current
// datagram.
Expand Down Expand Up @@ -727,7 +785,7 @@ impl Connection {
space_id,
buf,
buf_capacity,
(num_datagrams as usize - 1) * (self.path.current_mtu() as usize),
datagram_start,
ack_eliciting,
self,
self.version,
Expand Down Expand Up @@ -945,14 +1003,14 @@ impl Connection {
},
segment_size: match num_datagrams {
1 => None,
_ => Some(self.path.current_mtu() as usize),
_ => Some(segment_size),
},
src_ip: self.local_ip,
})
}

/// Indicate what types of frames are ready to send for the given space
fn space_can_send(&self, space_id: SpaceId) -> SendableFrames {
fn space_can_send(&self, space_id: SpaceId, frame_space_1rtt: usize) -> SendableFrames {
if self.spaces[space_id].crypto.is_some() {
let can_send = self.spaces[space_id].can_send(&self.streams);
if !can_send.is_empty() {
Expand All @@ -964,7 +1022,7 @@ impl Connection {
return SendableFrames::empty();
}

if self.spaces[space_id].crypto.is_some() && self.can_send_1rtt() {
if self.spaces[space_id].crypto.is_some() && self.can_send_1rtt(frame_space_1rtt) {
return SendableFrames {
other: true,
acks: false,
Expand All @@ -973,7 +1031,7 @@ impl Connection {

if self.zero_rtt_crypto.is_some() && self.side.is_client() {
let mut can_send = self.spaces[space_id].can_send(&self.streams);
can_send.other |= self.can_send_1rtt();
can_send.other |= self.can_send_1rtt(frame_space_1rtt);
if !can_send.is_empty() {
return can_send;
}
Expand Down Expand Up @@ -3467,15 +3525,19 @@ impl Connection {
/// Whether we have 1-RTT data to send
///
/// See also `self.space(SpaceId::Data).can_send()`
fn can_send_1rtt(&self) -> bool {
fn can_send_1rtt(&self, max_size: usize) -> bool {
self.streams.can_send_stream_data()
|| self.path.challenge_pending
|| self
.prev_path
.as_ref()
.map_or(false, |x| x.challenge_pending)
|| !self.path_responses.is_empty()
|| !self.datagrams.outgoing.is_empty()
|| self
.datagrams
.outgoing
.front()
.map_or(false, |x| x.size(true) <= max_size)
}

/// Update counters to account for a packet becoming acknowledged, lost, or abandoned
Expand All @@ -3502,6 +3564,38 @@ impl Connection {
pub fn current_mtu(&self) -> u16 {
self.path.current_mtu()
}

/// Size of non-frame data for a 1-RTT packet
///
/// Quantifies space consumed by the QUIC header and AEAD tag. All other bytes in a packet are
/// frames. Changes if the length of the remote connection ID changes, which is expected to be
/// rare. If `pn` is specified, may additionally change unpredictably due to variations in
/// latency and packet loss.
fn predict_1rtt_overhead(&self, pn: Option<u64>) -> usize {
let pn_len = match pn {
Some(pn) => PacketNumber::new(
pn,
self.spaces[SpaceId::Data].largest_acked_packet.unwrap_or(0),
)
.len(),
// Upper bound
None => 4,
};

// 1 byte for flags
1 + self.rem_cids.active().len() + pn_len + self.tag_len_1rtt()
}

fn tag_len_1rtt(&self) -> usize {
let key = match self.spaces[SpaceId::Data].crypto.as_ref() {
Some(crypto) => Some(&*crypto.packet.local),
None => self.zero_rtt_crypto.as_ref().map(|x| &*x.packet),
};
// If neither Data nor 0-RTT keys are available, make a reasonable tag length guess. As of
// this writing, all QUIC cipher suites use 16-byte tags. We could return `None` instead,
// but that would needlessly prevent sending datagrams during 0-RTT.
key.map_or(16, |x| x.tag_len())
}
}

impl fmt::Debug for Connection {
Expand Down
2 changes: 2 additions & 0 deletions quinn-proto/src/connection/packet_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ impl PacketBuilder {
})
}

/// Append the minimum amount of padding such that, after encryption, the packet will occupy at
/// least `min_size` bytes
pub(super) fn pad_to(&mut self, min_size: u16) {
let prev = self.min_size;
self.min_size = self.datagram_start + (min_size as usize) - self.tag_len;
Expand Down
52 changes: 52 additions & 0 deletions quinn-proto/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3012,3 +3012,55 @@ fn stream_gso() {
let final_ios = pair.client_conn_mut(client_ch).stats().udp_tx.ios;
assert_eq!(final_ios - initial_ios, 2);
}

#[test]
fn datagram_gso() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, _) = pair.connect();

let initial_ios = pair.client_conn_mut(client_ch).stats().udp_tx.ios;
let initial_bytes = pair.client_conn_mut(client_ch).stats().udp_tx.bytes;

// Send 10 datagrams above half the MTU, which fits inside a `tests::util::MAX_DATAGRAMS`
// datagram batch
info!("sending");
const DATAGRAM_LEN: usize = 1024;
const DATAGRAMS: usize = 10;
for _ in 0..DATAGRAMS {
pair.client_datagrams(client_ch)
.send(Bytes::from_static(&[0; DATAGRAM_LEN]), false)
.unwrap();
}
pair.drive();
let final_ios = pair.client_conn_mut(client_ch).stats().udp_tx.ios;
let final_bytes = pair.client_conn_mut(client_ch).stats().udp_tx.bytes;
assert_eq!(final_ios - initial_ios, 1);
// Expected overhead: flags + CID + PN + tag + frame type + frame length = 1 + 8 + 1 + 16 + 1 + 2 = 29
assert_eq!(
final_bytes - initial_bytes,
((29 + DATAGRAM_LEN) * DATAGRAMS) as u64
);
}

#[test]
fn gso_truncation() {
let _guard = subscribe();
let mut pair = Pair::default();
let (client_ch, _) = pair.connect();

let initial_ios = pair.client_conn_mut(client_ch).stats().udp_tx.ios;

// Send three application datagrams such that each is large to be combined with another in a
// single MTU, and the second datagram would require an unreasonably large amount of padding to
// produce a QUIC packet of the same length as the first.
info!("sending");
for len in [1024, 768, 768] {
pair.client_datagrams(client_ch)
.send(vec![0; len].into(), false)
.unwrap();
}
pair.drive();
let final_ios = pair.client_conn_mut(client_ch).stats().udp_tx.ios;
assert_eq!(final_ios - initial_ios, 2);
}