Skip to content

Commit fe3639a

Browse files
jasnelladuh95
authored andcommitted
quic: improve backend quic packet processing
Use a uv_check_t on BindingData to process outbound pending packet send, and use TrySend for actually sending packets when possible. Results in an 8% improvement in req/s and ~24% improvement in p95 latency. Also sets us up better for future improvements in libuv if the changes proposed in libuv/libuv#5116 are accepted. Signed-off-by: James M Snell <jasnell@gmail.com> Assisted-by: Opencode:Opus 4.6 PR-URL: #63267 Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
1 parent bc4c457 commit fe3639a

6 files changed

Lines changed: 214 additions & 8 deletions

File tree

src/quic/bindingdata.cc

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ namespace node {
2222
using mem::kReserveSizeAndAlign;
2323
using v8::Function;
2424
using v8::FunctionTemplate;
25+
using v8::HandleScope;
2526
using v8::Local;
2627
using v8::Object;
2728
using v8::String;
@@ -154,6 +155,16 @@ BindingData& BindingData::Get(Environment* env) {
154155

155156
BindingData::~BindingData() {
156157
quic_alloc_state.binding = nullptr;
158+
if (flush_check_initialized_) {
159+
uv_check_stop(&flush_check_);
160+
flush_check_started_ = false;
161+
// The check handle is closed inline here. Because BindingData destruction
162+
// happens during Environment cleanup, the handle will be finalized by
163+
// libuv's close phase.
164+
uv_close(reinterpret_cast<uv_handle_t*>(&flush_check_), nullptr);
165+
flush_check_initialized_ = false;
166+
}
167+
pending_flush_sessions_.clear();
157168
}
158169

159170
ngtcp2_mem* BindingData::ngtcp2_allocator() {
@@ -221,6 +232,11 @@ void BindingData::RegisterExternalReferences(
221232
BindingData::BindingData(Realm* realm, Local<Object> object)
222233
: BaseObject(realm, object) {
223234
MakeWeak();
235+
CHECK_EQ(uv_check_init(env()->event_loop(), &flush_check_), 0);
236+
flush_check_.data = this;
237+
// Unref so the check handle doesn't keep the event loop alive on its own.
238+
uv_unref(reinterpret_cast<uv_handle_t*>(&flush_check_));
239+
flush_check_initialized_ = true;
224240
}
225241

226242
SessionManager& BindingData::session_manager() {
@@ -230,6 +246,45 @@ SessionManager& BindingData::session_manager() {
230246
return *session_manager_;
231247
}
232248

249+
void BindingData::ScheduleSessionFlush(const BaseObjectPtr<Session>& session) {
250+
pending_flush_sessions_.push_back(session);
251+
if (!flush_check_started_) {
252+
uv_check_start(&flush_check_, OnFlushCheck);
253+
flush_check_started_ = true;
254+
}
255+
}
256+
257+
void BindingData::OnFlushCheck(uv_check_t* handle) {
258+
auto* binding = static_cast<BindingData*>(handle->data);
259+
if (binding->pending_flush_sessions_.empty()) {
260+
uv_check_stop(&binding->flush_check_);
261+
binding->flush_check_started_ = false;
262+
return;
263+
}
264+
265+
HandleScope scope(binding->env()->isolate());
266+
267+
// Swap to a local vector before iterating. SendPendingData may trigger
268+
// MakeCallback which runs JS that could cause more packet receives via
269+
// re-entry (e.g., a stream data callback that synchronously writes to
270+
// another session). Any sessions added during the flush remain in
271+
// pending_flush_sessions_ and are picked up on the next check tick.
272+
auto sessions = std::move(binding->pending_flush_sessions_);
273+
for (auto& session : sessions) {
274+
session->pending_flush_ = false;
275+
if (!session->is_destroyed()) {
276+
session->FlushPendingData();
277+
}
278+
}
279+
280+
// If no new sessions were added during the flush, stop the check
281+
// to avoid per-tick callback overhead when idle.
282+
if (binding->pending_flush_sessions_.empty()) {
283+
uv_check_stop(&binding->flush_check_);
284+
binding->flush_check_started_ = false;
285+
}
286+
}
287+
233288
void BindingData::MemoryInfo(MemoryTracker* tracker) const {
234289
#define V(name, _) tracker->TrackField(#name, name##_callback());
235290

src/quic/bindingdata.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,11 @@
1010
#include <ngtcp2/ngtcp2_crypto.h>
1111
#include <node.h>
1212
#include <node_mem.h>
13+
#include <uv.h>
1314
#include <v8.h>
1415
#include <memory>
1516
#include <unordered_map>
17+
#include <vector>
1618
#include "defs.h"
1719

1820
namespace node::quic {
@@ -201,6 +203,13 @@ class BindingData final
201203
// routing so that any endpoint can route packets to any session.
202204
SessionManager& session_manager();
203205

206+
// Schedule a session for deferred SendPendingData. Sessions are accumulated
207+
// during the I/O poll phase (via Endpoint::Receive -> Session::ReadPacket)
208+
// and flushed in a uv_check callback immediately after poll completes.
209+
// This batches multiple received packets before generating responses,
210+
// allowing ngtcp2 to make better ACK coalescing decisions.
211+
void ScheduleSessionFlush(const BaseObjectPtr<Session>& session);
212+
204213
std::unordered_map<Endpoint*, BaseObjectPtr<BaseObject>> listening_endpoints;
205214

206215
size_t current_ngtcp2_memory_ = 0;
@@ -247,6 +256,17 @@ class BindingData final
247256
#undef V
248257

249258
std::unique_ptr<SessionManager> session_manager_;
259+
260+
// Deferred send flush state. The uv_check_t fires immediately after
261+
// the I/O poll phase in the same event loop tick, allowing batched
262+
// receive processing: all packets are read during poll, then
263+
// SendPendingData is called once per dirty session in the check callback.
264+
uv_check_t flush_check_;
265+
std::vector<BaseObjectPtr<Session>> pending_flush_sessions_;
266+
bool flush_check_started_ = false;
267+
bool flush_check_initialized_ = false;
268+
269+
static void OnFlushCheck(uv_check_t* handle);
250270
};
251271

252272
JS_METHOD_IMPL(IllegalConstructor);

src/quic/endpoint.cc

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,14 @@ int Endpoint::UDP::Send(Packet::Ptr packet) {
492492
return err;
493493
}
494494

495+
int Endpoint::UDP::TrySend(Packet* packet) {
496+
DCHECK_NOT_NULL(packet);
497+
if (is_closed_or_closing()) return UV_EBADF;
498+
uv_buf_t buf = *packet;
499+
return uv_udp_try_send(
500+
&impl_->handle_, &buf, 1, packet->destination().data());
501+
}
502+
495503
void Endpoint::UDP::MemoryInfo(MemoryTracker* tracker) const {
496504
if (impl_) tracker->TrackField("impl", impl_);
497505
}
@@ -812,6 +820,45 @@ void Endpoint::Send(Packet::Ptr packet) {
812820
STAT_INCREMENT(Stats, packets_sent);
813821
}
814822

823+
void Endpoint::SendOrTrySend(Packet::Ptr packet) {
824+
#ifdef DEBUG
825+
if (is_diagnostic_packet_loss(options_.tx_loss)) [[unlikely]] {
826+
return;
827+
}
828+
#endif
829+
830+
if (is_closed() || is_closing() || packet->length() == 0) {
831+
return;
832+
}
833+
834+
Debug(this, "TrySend %s", packet->ToString());
835+
size_t packet_length = packet->length();
836+
837+
// Attempt synchronous send. On success (returns number of bytes sent),
838+
// the packet is delivered immediately — no callback overhead, no
839+
// waiting for the next poll cycle.
840+
int err = udp_.TrySend(packet.get());
841+
if (err >= 0) {
842+
// Synchronous send succeeded. Release the packet immediately.
843+
STAT_INCREMENT_N(Stats, bytes_sent, packet_length);
844+
STAT_INCREMENT(Stats, packets_sent);
845+
// Ptr destructor releases back to arena pool.
846+
return;
847+
}
848+
849+
if (err == UV_EAGAIN) {
850+
// Socket not writable or async sends are queued. Fall back to the
851+
// async path — the packet will be queued and flushed on the next
852+
// POLLOUT cycle.
853+
Debug(this, "TrySend got EAGAIN, falling back to async Send");
854+
return Send(std::move(packet));
855+
}
856+
857+
// Other errors are fatal.
858+
Debug(this, "TrySend failed with error %d", err);
859+
Destroy(CloseContext::SEND_FAILURE, err);
860+
}
861+
815862
void Endpoint::SendRetry(const PathDescriptor& options) {
816863
// Generating and sending retry packets does consume some system resources,
817864
// and it is possible for a malicious peer to trigger sending a large number
@@ -1128,10 +1175,22 @@ void Endpoint::Receive(const uv_buf_t& buf,
11281175
DCHECK_NOT_NULL(session);
11291176
if (session->is_destroyed()) return;
11301177
size_t len = store.length();
1131-
if (session->Receive(std::move(store), local_address, remote_address)) {
1178+
// Use ReadPacket (no SendPendingDataScope) so that multiple packets
1179+
// received in the same I/O burst are processed before any responses
1180+
// are generated. The deferred flush via BindingData's uv_check
1181+
// callback calls SendPendingData once per dirty session after all
1182+
// packets in the burst have been read.
1183+
if (session->ReadPacket(std::move(store), local_address, remote_address)) {
11321184
STAT_INCREMENT_N(Stats, bytes_received, len);
11331185
STAT_INCREMENT(Stats, packets_received);
11341186
}
1187+
// Schedule the session for deferred SendPendingData if it hasn't
1188+
// been scheduled already in this burst.
1189+
if (!session->is_destroyed() && !session->pending_flush_) {
1190+
session->pending_flush_ = true;
1191+
BindingData::Get(env()).ScheduleSessionFlush(
1192+
BaseObjectPtr<Session>(session));
1193+
}
11351194
};
11361195

11371196
const auto accept = [&](const Session::Config& config, Store&& store) {

src/quic/endpoint.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,13 @@ class Endpoint final : public AsyncWrap, public Packet::Listener {
208208

209209
void Send(Packet::Ptr packet);
210210

211+
// Attempt synchronous send via uv_udp_try_send. If the socket is
212+
// writable, the packet is sent immediately and the Ptr is released.
213+
// If the socket is not writable (UV_EAGAIN), falls back to the
214+
// async Send path. Used by the deferred flush callback to avoid
215+
// the one-tick latency of async uv_udp_send.
216+
void SendOrTrySend(Packet::Ptr packet);
217+
211218
// Acquire a Packet from the pool. length sets the initial working
212219
// size (must be <= pool capacity). The slot is always allocated at
213220
// full capacity to avoid fragmentation.
@@ -281,6 +288,12 @@ class Endpoint final : public AsyncWrap, public Packet::Listener {
281288
void Close();
282289
int Send(Packet::Ptr packet);
283290

291+
// Synchronous send using uv_udp_try_send. Returns 0 on success,
292+
// UV_EAGAIN if the socket is not writable or the send queue is
293+
// non-empty, or another negative error code on failure.
294+
// On success, the caller is responsible for releasing the packet.
295+
int TrySend(Packet* packet);
296+
284297
// Returns the local UDP socket address to which we are bound,
285298
// or fail with an assert if we are not bound.
286299
SocketAddress local_address() const;

src/quic/session.cc

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2104,13 +2104,21 @@ void Session::SetLastError(QuicError&& error) {
21042104
bool Session::Receive(Store&& store,
21052105
const SocketAddress& local_address,
21062106
const SocketAddress& remote_address) {
2107+
// Convenience wrapper: reads the packet and immediately triggers
2108+
// SendPendingData. Used by paths that need an immediate response
2109+
// (e.g., Endpoint::Connect for client Initial packets).
2110+
// The hot receive path uses ReadPacket() directly with deferred
2111+
// flush via BindingData's uv_check callback.
2112+
SendPendingDataScope send_scope(this);
2113+
return ReadPacket(std::move(store), local_address, remote_address);
2114+
}
2115+
2116+
bool Session::ReadPacket(Store&& store,
2117+
const SocketAddress& local_address,
2118+
const SocketAddress& remote_address) {
21072119
DCHECK(!is_destroyed());
21082120
impl_->remote_address_ = remote_address;
21092121

2110-
// When we are done processing this packet, we arrange to send any
2111-
// pending data for this session.
2112-
SendPendingDataScope send_scope(this);
2113-
21142122
ngtcp2_vec vec = store;
21152123
Path path(local_address, remote_address);
21162124

@@ -2125,14 +2133,16 @@ bool Session::Receive(Store&& store,
21252133
// ensures that any deferred destroy waits until all callbacks for this
21262134
// packet have completed. After calling ngtcp2_conn_read_pkt here, we
21272135
// will need to double check that the session is not destroyed before
2128-
// we try doing anything with it (like updating stats, sending pending
2129-
// data, etc).
2136+
// we try doing anything with it (like updating stats, etc).
21302137
int err;
21312138
{
21322139
NgTcp2CallbackScope callback_scope(this);
2140+
// ECN codepoint (ngtcp2_pkt_info.ecn) is not yet populated because
2141+
// libuv does not currently deliver per-packet ECN metadata. When
2142+
// libuv gains ECN receive reporting, the pkt_info should be
2143+
// populated from the per-packet metadata and passed through here.
21332144
err = ngtcp2_conn_read_pkt(*this,
21342145
&path,
2135-
// TODO(@jasnell): ECN pkt_info blocked on libuv
21362146
nullptr,
21372147
vec.base,
21382148
vec.len,
@@ -2245,6 +2255,17 @@ bool Session::Receive(Store&& store,
22452255
return false;
22462256
}
22472257

2258+
void Session::FlushPendingData() {
2259+
DCHECK(!is_destroyed());
2260+
if (impl_->application_) {
2261+
// Prefer synchronous sends during the deferred flush to avoid the
2262+
// one-tick latency of async uv_udp_send from the uv_check callback.
2263+
prefer_try_send_ = true;
2264+
application().SendPendingData();
2265+
prefer_try_send_ = false;
2266+
}
2267+
}
2268+
22482269
void Session::Send(Packet::Ptr packet) {
22492270
// Sending a Packet is generally best effort. If we're not in a state
22502271
// where we can send a packet, it's ok to drop it on the floor. The
@@ -2261,6 +2282,16 @@ void Session::Send(Packet::Ptr packet) {
22612282
return;
22622283
}
22632284

2285+
// When called from the deferred flush path (uv_check callback),
2286+
// prefer synchronous send to avoid the one-tick latency of async
2287+
// uv_udp_send. SendOrTrySend uses uv_udp_try_send first, falling
2288+
// back to uv_udp_send on EAGAIN.
2289+
if (prefer_try_send_) {
2290+
Debug(this, "Session is sending (try_send) %s", packet->ToString());
2291+
endpoint().SendOrTrySend(std::move(packet));
2292+
return;
2293+
}
2294+
22642295
Debug(this, "Session is sending %s", packet->ToString());
22652296
endpoint().Send(std::move(packet));
22662297
}

src/quic/session.h

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,23 @@ class Session final : public AsyncWrap, private SessionTicket::AppData::Source {
357357
const SocketAddress& local_address,
358358
const SocketAddress& remote_address);
359359

360+
// ReadPacket processes a single inbound packet through ngtcp2 without
361+
// triggering SendPendingData. This is the building block for batched
362+
// receive processing: the caller (Endpoint::Receive) accumulates
363+
// dirty sessions and a uv_check callback flushes them after all
364+
// packets in the I/O burst have been read.
365+
// Receive() is kept as a convenience wrapper that calls ReadPacket()
366+
// then triggers SendPendingData (for paths like Connect that need
367+
// immediate response).
368+
bool ReadPacket(Store&& store,
369+
const SocketAddress& local_address,
370+
const SocketAddress& remote_address);
371+
372+
// Called by BindingData's flush callback to trigger SendPendingData
373+
// on this session. Encapsulates the application() access so that
374+
// bindingdata.cc doesn't need the full Application type definition.
375+
void FlushPendingData();
376+
360377
void Send(Packet::Ptr packet);
361378
void Send(Packet::Ptr packet, const PathStorage& path);
362379
datagram_id SendDatagram(Store&& data);
@@ -572,11 +589,22 @@ class Session final : public AsyncWrap, private SessionTicket::AppData::Source {
572589
bool in_ngtcp2_callback_scope_ = false;
573590
bool in_nghttp3_callback_scope_ = false;
574591
bool destroy_deferred_ = false;
592+
// Set when this session is in BindingData's pending_flush_sessions_ vector.
593+
// Cleared by the flush callback before calling SendPendingData.
594+
// Provides O(1) dedup so a session receiving multiple packets in one I/O
595+
// burst is only scheduled for flush once.
596+
bool pending_flush_ = false;
597+
// When true, Session::Send prefers synchronous delivery via
598+
// Endpoint::SendOrTrySend (uv_udp_try_send with async fallback).
599+
// Set during FlushPendingData to avoid the one-tick latency of
600+
// async-only sends from the uv_check callback.
601+
bool prefer_try_send_ = false;
575602
QuicConnectionPointer connection_;
576603
std::unique_ptr<TLSSession> tls_session_;
577604
friend struct NgTcp2CallbackScope;
578605
friend struct NgHttp3CallbackScope;
579606
friend class Application;
607+
friend class BindingData;
580608
friend class DefaultApplication;
581609
friend class Http3ApplicationImpl;
582610
friend class Endpoint;

0 commit comments

Comments
 (0)