Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: avoid http2 dynamic dispatch in socket handlers #2839

Merged
merged 3 commits into from
Feb 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/core/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,6 @@ module.exports = {
kHTTP2CopyHeaders: Symbol('http2 copy headers'),
kHTTPConnVersion: Symbol('http connection version'),
kRetryHandlerDefaultRetry: Symbol('retry agent default retry'),
kConstruct: Symbol('constructable')
kConstruct: Symbol('constructable'),
kListeners: Symbol('listeners')
}
238 changes: 135 additions & 103 deletions lib/dispatcher/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const {
kLocalAddress,
kMaxResponseSize,
kHTTPConnVersion,
kListeners,
// HTTP2
kHost,
kHTTP2Session,
Expand Down Expand Up @@ -111,6 +112,20 @@ const FastBuffer = Buffer[Symbol.species]

const kClosedResolve = Symbol('kClosedResolve')

function addListener (obj, name, listener) {
const listeners = (obj[kListeners] ??= [])
listeners.push([name, listener])
obj.on(name, listener)
return obj
}

function removeAllListeners (obj) {
for (const [name, listener] of obj[kListeners] ?? []) {
obj.removeListener(name, listener)
}
obj[kListeners] = null
}

/**
* @type {import('../../types/client.js').default}
*/
Expand Down Expand Up @@ -276,7 +291,7 @@ class Client extends DispatcherBase {
this[kMaxRequests] = maxRequestsPerClient
this[kClosedResolve] = null
this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1
this[kHTTPConnVersion] = 'h1'
this[kHTTPConnVersion] = null

// HTTP/2
this[kHTTP2Session] = null
Expand Down Expand Up @@ -803,11 +818,8 @@ class Parser {

socket[kClient] = null
socket[kError] = null
socket
.removeListener('error', onSocketError)
.removeListener('readable', onSocketReadable)
.removeListener('end', onSocketEnd)
.removeListener('close', onSocketClose)

removeAllListeners(socket)

client[kSocket] = null
client[kHTTP2Session] = null
Expand Down Expand Up @@ -1050,33 +1062,6 @@ function onParserTimeout (parser) {
}
}

function onSocketReadable () {
const { [kParser]: parser } = this
if (parser) {
parser.readMore()
}
}

function onSocketError (err) {
const { [kClient]: client, [kParser]: parser } = this

assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')

if (client[kHTTPConnVersion] !== 'h2') {
// On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded
// to the user.
if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so for as a valid response.
parser.onMessageComplete()
return
}
}

this[kError] = err

onError(this[kClient], err)
}

function onError (client, err) {
if (
client[kRunning] === 0 &&
Expand All @@ -1097,32 +1082,8 @@ function onError (client, err) {
}
}

function onSocketEnd () {
const { [kParser]: parser, [kClient]: client } = this

if (client[kHTTPConnVersion] !== 'h2') {
if (parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so far as a valid response.
parser.onMessageComplete()
return
}
}

util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
}

function onSocketClose () {
const { [kClient]: client, [kParser]: parser } = this

if (client[kHTTPConnVersion] === 'h1' && parser) {
if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so far as a valid response.
parser.onMessageComplete()
}

this[kParser].destroy()
this[kParser] = null
}
const { [kClient]: client } = this

const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))

Expand Down Expand Up @@ -1215,56 +1176,19 @@ async function connect (client) {

assert(socket)

const isH2 = socket.alpnProtocol === 'h2'
if (isH2) {
if (!h2ExperimentalWarned) {
h2ExperimentalWarned = true
process.emitWarning('H2 support is experimental, expect them to change at any time.', {
code: 'UNDICI-H2'
})
}

const session = http2.connect(client[kUrl], {
createConnection: () => socket,
peerMaxConcurrentStreams: client[kHTTP2SessionState].maxConcurrentStreams
})

client[kHTTPConnVersion] = 'h2'
session[kClient] = client
session[kSocket] = socket
session.on('error', onHttp2SessionError)
session.on('frameError', onHttp2FrameError)
session.on('end', onHttp2SessionEnd)
session.on('goaway', onHTTP2GoAway)
session.on('close', onSocketClose)
session.unref()

client[kHTTP2Session] = session
socket[kHTTP2Session] = session
if (socket.alpnProtocol === 'h2') {
await connectH2(client, socket)
} else {
if (!llhttpInstance) {
llhttpInstance = await llhttpPromise
llhttpPromise = null
}

socket[kNoRef] = false
socket[kWriting] = false
socket[kReset] = false
socket[kBlocking] = false
socket[kParser] = new Parser(client, socket, llhttpInstance)
await connectH1(client, socket)
}

addListener(socket, 'close', onSocketClose)

socket[kCounter] = 0
socket[kMaxRequests] = client[kMaxRequests]
socket[kClient] = client
socket[kError] = null

socket
.on('error', onSocketError)
.on('readable', onSocketReadable)
.on('end', onSocketEnd)
.on('close', onSocketClose)

client[kSocket] = socket

if (channels.connected.hasSubscribers) {
Expand Down Expand Up @@ -1475,10 +1399,15 @@ function shouldSendContentLength (method) {

function write (client, request) {
if (client[kHTTPConnVersion] === 'h2') {
writeH2(client, client[kHTTP2Session], request)
return
// TODO (fix): Why does this not return the value
// from writeH2.
writeH2(client, request)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@metcoder95 This is confusing to me. Seems to have been broken for a while.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it seems its kind of flaky there; once wrap with the interceptors I'll jump right away into H2

} else {
return writeH1(client, request)
}
}

function writeH1 (client, request) {
const { method, path, host, upgrade, blocking, reset } = request

let { body, headers, contentLength } = request
Expand Down Expand Up @@ -1656,7 +1585,8 @@ function write (client, request) {
return true
}

function writeH2 (client, session, request) {
function writeH2 (client, request) {
const session = client[kHTTP2Session]
const { body, method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request

let headers
Expand Down Expand Up @@ -2341,4 +2271,106 @@ function errorRequest (client, request, err) {
}
}

async function connectH1 (client, socket) {
client[kHTTPConnVersion] = 'h1'

if (!llhttpInstance) {
llhttpInstance = await llhttpPromise
llhttpPromise = null
}

socket[kNoRef] = false
socket[kWriting] = false
socket[kReset] = false
socket[kBlocking] = false
socket[kParser] = new Parser(client, socket, llhttpInstance)

addListener(socket, 'error', function (err) {
const { [kParser]: parser } = this

assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')

// On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded
// to the user.
if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so for as a valid response.
parser.onMessageComplete()
return
}

this[kError] = err

onError(this[kClient], err)
})
addListener(socket, 'readable', function () {
const { [kParser]: parser } = this
if (parser) {
parser.readMore()
}
})
addListener(socket, 'end', function () {
const { [kParser]: parser } = this

if (parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so far as a valid response.
parser.onMessageComplete()
return
}

util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
})
addListener(socket, 'close', function () {
const { [kParser]: parser } = this

if (parser) {
if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so far as a valid response.
parser.onMessageComplete()
}

this[kParser].destroy()
this[kParser] = null
}
})
}

async function connectH2 (client, socket) {
client[kHTTPConnVersion] = 'h2'

if (!h2ExperimentalWarned) {
h2ExperimentalWarned = true
process.emitWarning('H2 support is experimental, expect them to change at any time.', {
code: 'UNDICI-H2'
})
}

const session = http2.connect(client[kUrl], {
createConnection: () => socket,
peerMaxConcurrentStreams: client[kHTTP2SessionState].maxConcurrentStreams
})

session[kClient] = client
session[kSocket] = socket
session.on('error', onHttp2SessionError)
session.on('frameError', onHttp2FrameError)
session.on('end', onHttp2SessionEnd)
session.on('goaway', onHTTP2GoAway)
session.on('close', onSocketClose)
session.unref()

client[kHTTP2Session] = session
socket[kHTTP2Session] = session

addListener(socket, 'error', function (err) {
assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')

this[kError] = err

onError(this[kClient], err)
})
addListener(socket, 'end', function () {
util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
})
}

module.exports = Client