Skip to content

Commit

Permalink
quic: cleanup QuicSocketFlags, used shared state struct
Browse files Browse the repository at this point in the history
Some of the flags were no longer being used.

Switched to use an AliasedStruct for shared state to avoid
extraneous expensive JS=>C++ calls.

Removed unused QuicSocket option

PR-URL: #34247
Reviewed-By: Anna Henningsen <anna@addaleax.net>
  • Loading branch information
jasnell committed Jul 9, 2020
1 parent d08e99d commit fe11f6b
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 159 deletions.
10 changes: 5 additions & 5 deletions doc/api/quic.md
Original file line number Diff line number Diff line change
Expand Up @@ -1327,17 +1327,17 @@ added: REPLACEME
-->

Emitted when the server busy state has been toggled using
`quicSocket.serverBusy = true | false`. The callback is invoked with a
single boolean argument indicating `true` if busy status is enabled,
`false` otherwise. This event is strictly informational.
`quicSocket.serverBusy = true | false`. The callback is invoked with no
arguments. Use the `quicsocket.serverBusy` property to determine the
current status. This event is strictly informational.

```js
const { createQuicSocket } = require('net');

const socket = createQuicSocket();

socket.on('busy', (busy) => {
if (busy)
socket.on('busy', () => {
if (socket.serverBusy)
console.log('Server is busy');
else
console.log('Server is not busy');
Expand Down
86 changes: 39 additions & 47 deletions lib/internal/quic/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ assertCrypto();
const {
Array,
BigInt64Array,
Boolean,
Error,
Map,
Number,
Expand All @@ -38,6 +39,7 @@ const {
validateQuicEndpointOptions,
validateCreateSecureContextOptions,
validateQuicSocketConnectOptions,
QuicSocketSharedState,
QuicSessionSharedState,
QLogStream,
} = require('internal/quic/util');
Expand Down Expand Up @@ -269,8 +271,8 @@ function onSocketClose(err) {

// Called by the C++ internals when the server busy state of
// the QuicSocket has been changed.
function onSocketServerBusy(on) {
this[owner_symbol][kServerBusy](on);
function onSocketServerBusy() {
this[owner_symbol][kServerBusy]();
}

// Called by the C++ internals when a new server QuicSession has been created.
Expand Down Expand Up @@ -845,31 +847,24 @@ class QuicEndpoint {
class QuicSocket extends EventEmitter {
[kInternalState] = {
alpn: undefined,
autoClose: undefined,
client: undefined,
defaultEncoding: undefined,
endpoints: new Set(),
highWaterMark: undefined,
listenPending: false,
lookup: undefined,
server: undefined,
serverBusy: false,
serverListening: false,
serverSecureContext: undefined,
sessions: new Set(),
state: kSocketUnbound,
statelessResetEnabled: true,
sharedState: undefined,
stats: undefined,
};

constructor(options) {
const {
endpoint,

// True if the QuicSocket should automatically enter a graceful shutdown
// if it is not listening as a server and the last QuicClientSession
// closes
autoClose,

// Default configuration for QuicClientSessions
client,

Expand Down Expand Up @@ -913,7 +908,6 @@ class QuicSocket extends EventEmitter {

const state = this[kInternalState];

state.autoClose = autoClose;
state.client = client;
state.lookup = lookup || (type === AF_INET6 ? lookup6 : lookup4);
state.server = server;
Expand Down Expand Up @@ -976,6 +970,10 @@ class QuicSocket extends EventEmitter {
if (handle !== undefined) {
handle[owner_symbol] = this;
this[async_id_symbol] = handle.getAsyncId();
this[kInternalState].sharedState =
new QuicSocketSharedState(handle.state);
} else {
this[kInternalState].sharedState = undefined;
}
}

Expand Down Expand Up @@ -1081,16 +1079,13 @@ class QuicSocket extends EventEmitter {
}

// Called by the C++ internals to notify when server busy status is toggled.
[kServerBusy](on) {
this[kInternalState].serverBusy = on;
// In a nextTick because the event ends up being
// emitted synchronously when quicSocket.serverBusy
// is called.
[kServerBusy]() {
const busy = this.serverBusy;
process.nextTick(() => {
try {
this.emit('busy', on);
this.emit('busy', busy);
} catch (error) {
this[kRejections](error, 'busy', on);
this[kRejections](error, 'busy', busy);
}
});
}
Expand Down Expand Up @@ -1161,6 +1156,7 @@ class QuicSocket extends EventEmitter {
// server and will emit session events whenever a new QuicServerSession
// is created.
const state = this[kInternalState];
state.listenPending = false;
this[kHandle].listen(
state.serverSecureContext.context,
address,
Expand Down Expand Up @@ -1225,14 +1221,12 @@ class QuicSocket extends EventEmitter {
// function.
listen(options, callback) {
const state = this[kInternalState];
if (state.serverListening)
throw new ERR_QUICSOCKET_LISTENING();

if (state.state === kSocketDestroyed ||
state.state === kSocketClosing) {
throw new ERR_QUICSOCKET_DESTROYED('listen');
}

if (this.listening || state.listenPending)
throw new ERR_QUICSOCKET_LISTENING();
if (typeof options === 'function') {
callback = options;
options = {};
Expand Down Expand Up @@ -1265,8 +1259,8 @@ class QuicSocket extends EventEmitter {

state.highWaterMark = highWaterMark;
state.defaultEncoding = defaultEncoding;
state.serverListening = true;
state.alpn = alpn;
state.listenPending = true;

// If the callback function is provided, it is registered as a
// handler for the on('session') event and will be called whenever
Expand Down Expand Up @@ -1403,10 +1397,8 @@ class QuicSocket extends EventEmitter {
// listening for new QuicServerSession connections.
// New initial connection packets for currently unknown
// DCID's will be ignored.
if (this[kHandle]) {
this[kHandle].stopListening();
}
state.serverListening = false;
if (this[kHandle])
this[kInternalState].sharedState.serverListening = false;

// If there are no sessions, calling maybeDestroy
// will immediately and synchronously destroy the
Expand Down Expand Up @@ -1502,7 +1494,7 @@ class QuicSocket extends EventEmitter {

// True if listen() has been called successfully
get listening() {
return this[kInternalState].serverListening;
return Boolean(this[kInternalState].sharedState?.serverListening);
}

// True if the QuicSocket is currently waiting on at least one
Expand All @@ -1518,12 +1510,27 @@ class QuicSocket extends EventEmitter {
if (state.state === kSocketDestroyed)
throw new ERR_QUICSOCKET_DESTROYED('serverBusy');
validateBoolean(on, 'on');
if (state.serverBusy !== on)
this[kHandle].setServerBusy(on);
if (state.sharedState.serverBusy !== on) {
state.sharedState.serverBusy = on;
this[kServerBusy]();
}
}

get serverBusy() {
return this[kInternalState].serverBusy;
return Boolean(this[kInternalState].sharedState?.serverBusy);
}

set statelessResetDisabled(on) {
const state = this[kInternalState];
if (state.state === kSocketDestroyed)
throw new ERR_QUICSOCKET_DESTROYED('statelessResetDisabled');
validateBoolean(on, 'on');
if (state.sharedState.statelessResetDisabled !== on)
state.sharedState.statelessResetDisabled = on;
}

get statelessResetDisabled() {
return Boolean(this[kInternalState].sharedState?.statelessResetDisabled);
}

get duration() {
Expand Down Expand Up @@ -1613,21 +1620,6 @@ class QuicSocket extends EventEmitter {
}
this[kHandle].setDiagnosticPacketLoss(rx, tx);
}

get statelessResetEnabled() {
return this[kInternalState].statelessResetEnabled;
}

set statelessResetEnabled(on) {
const state = this[kInternalState];
if (state.state === kSocketDestroyed)
throw new ERR_QUICSOCKET_DESTROYED('serverBusy');
validateBoolean(on, 'on');
if (state.statelessResetEnabled !== on) {
this[kHandle].enableStatelessReset(on);
state.statelessResetEnabled = on;
}
}
}

class QuicSession extends EventEmitter {
Expand Down
41 changes: 38 additions & 3 deletions lib/internal/quic/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ const {
IDX_QUICSESSION_STATE_MAX_DATA_LEFT,
IDX_QUICSESSION_STATE_BYTES_IN_FLIGHT,

IDX_QUICSOCKET_STATE_SERVER_LISTENING,
IDX_QUICSOCKET_STATE_SERVER_BUSY,
IDX_QUICSOCKET_STATE_STATELESS_RESET_DISABLED,

IDX_HTTP3_QPACK_MAX_TABLE_CAPACITY,
IDX_HTTP3_QPACK_BLOCKED_STREAMS,
IDX_HTTP3_MAX_HEADER_LIST_SIZE,
Expand Down Expand Up @@ -514,7 +518,6 @@ function validateQuicSocketOptions(options = {}) {
validateObject(options, 'options');

const {
autoClose = false,
client = {},
disableStatelessReset = false,
endpoint = { port: 0, type: 'udp4' },
Expand All @@ -538,7 +541,6 @@ function validateQuicSocketOptions(options = {}) {
validateLookup(lookup);
validateBoolean(validateAddress, 'options.validateAddress');
validateBoolean(validateAddressLRU, 'options.validateAddressLRU');
validateBoolean(autoClose, 'options.autoClose');
validateBoolean(qlog, 'options.qlog');
validateBoolean(disableStatelessReset, 'options.disableStatelessReset');

Expand Down Expand Up @@ -576,7 +578,6 @@ function validateQuicSocketOptions(options = {}) {

return {
endpoint,
autoClose,
client,
lookup,
maxConnections,
Expand Down Expand Up @@ -803,6 +804,39 @@ function toggleListeners(state, event, on) {
}
}

class QuicSocketSharedState {
constructor(state) {
this[kHandle] = Buffer.from(state);
}

get serverListening() {
return Boolean(this[kHandle]
.readUInt8(IDX_QUICSOCKET_STATE_SERVER_LISTENING));
}

set serverListening(on) {
this[kHandle].writeUInt8(on ? 1 : 0, IDX_QUICSOCKET_STATE_SERVER_LISTENING);
}

get serverBusy() {
return Boolean(this[kHandle].readUInt8(IDX_QUICSOCKET_STATE_SERVER_BUSY));
}

set serverBusy(on) {
this[kHandle].writeUInt8(on ? 1 : 0, IDX_QUICSOCKET_STATE_SERVER_BUSY);
}

get statelessResetDisabled() {
return Boolean(this[kHandle]
.readUInt8(IDX_QUICSOCKET_STATE_STATELESS_RESET_DISABLED));
}

set statelessResetDisabled(on) {
this[kHandle].writeUInt8(on ? 1 : 0,
IDX_QUICSOCKET_STATE_STATELESS_RESET_DISABLED);
}
}

// A utility class used to handle reading / modifying shared JS/C++
// state associated with a QuicSession
class QuicSessionSharedState {
Expand Down Expand Up @@ -938,6 +972,7 @@ module.exports = {
validateQuicEndpointOptions,
validateCreateSecureContextOptions,
validateQuicSocketConnectOptions,
QuicSocketSharedState,
QuicSessionSharedState,
QLogStream,
};
5 changes: 5 additions & 0 deletions src/quic/node_quic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ void Initialize(Local<Object> target,
QUICSESSION_SHARED_STATE(V)
#undef V

#define V(id, _, __) \
NODE_DEFINE_CONSTANT(constants, IDX_QUICSOCKET_STATE_##id);
QUICSOCKET_SHARED_STATE(V)
#undef V

#define V(name, _, __) \
NODE_DEFINE_CONSTANT(constants, IDX_QUIC_SESSION_STATS_##name);
SESSION_STATS(V)
Expand Down
25 changes: 3 additions & 22 deletions src/quic/node_quic_socket-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,20 +90,6 @@ void QuicSocket::DisassociateStatelessResetToken(
token_map_.erase(token);
}

// StopListening is called when the QuicSocket is no longer
// accepting new server connections. Typically, this is called
// when the QuicSocket enters a graceful closing state where
// existing sessions are allowed to close naturally but new
// sessions are rejected.
void QuicSocket::StopListening() {
if (is_server_listening()) {
Debug(this, "Stop listening");
set_server_listening(false);
// It is important to not call ReceiveStop here as there
// is ongoing traffic being exchanged by the peers.
}
}

void QuicSocket::ReceiveStart() {
for (const auto& endpoint : endpoints_)
CHECK_EQ(endpoint->ReceiveStart(), 0);
Expand Down Expand Up @@ -157,8 +143,8 @@ size_t QuicSocket::GetCurrentStatelessResetCounter(const SocketAddress& addr) {

void QuicSocket::ServerBusy(bool on) {
Debug(this, "Turning Server Busy Response %s", on ? "on" : "off");
set_server_busy(on);
listener_->OnServerBusy(on);
state_->server_busy = on ? 1 : 0;
listener_->OnServerBusy();
}

bool QuicSocket::is_diagnostic_packet_loss(double prob) const {
Expand All @@ -173,11 +159,6 @@ void QuicSocket::set_diagnostic_packet_loss(double rx, double tx) {
tx_loss_ = tx;
}

bool QuicSocket::EnableStatelessReset(bool on) {
set_stateless_reset_disabled(!on);
return !is_stateless_reset_disabled();
}

void QuicSocket::set_validated_address(const SocketAddress& addr) {
if (has_option_validate_address_lru()) {
// Remove the oldest item if we've hit the LRU limit
Expand Down Expand Up @@ -215,7 +196,7 @@ void QuicSocket::AddEndpoint(
if (preferred || endpoints_.empty())
preferred_endpoint_ = endpoint_;
endpoints_.emplace_back(endpoint_);
if (is_server_listening())
if (state_->server_listening)
endpoint_->ReceiveStart();
}

Expand Down
Loading

0 comments on commit fe11f6b

Please sign in to comment.