Skip to content

Commit

Permalink
fix(H2): handle goaway properly (#3057)
Browse files Browse the repository at this point in the history
  • Loading branch information
metcoder95 authored Apr 7, 2024
1 parent b6aa794 commit bc4b206
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 104 deletions.
6 changes: 6 additions & 0 deletions docs/docs/api/Dispatcher.md
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,12 @@ Parameters:
* **targets** `Array<Dispatcher>`
* **error** `Error`

Emitted when the dispatcher has been disconnected from the origin.

> **Note**: For HTTP/2, this event is also emitted when the dispatcher has received the [GOAWAY Frame](https://webconcepts.info/concepts/http2-frame-type/0x7) with an Error with the message `HTTP/2: "GOAWAY" frame received` and the code `UND_ERR_INFO`.
> Due to nature of the protocol of using binary frames, it is possible that requests gets hanging as a frame can be received between the `HEADER` and `DATA` frames.
> It is recommended to handle this event and close the dispatcher to create a new HTTP/2 session.
### Event: `'connectionError'`

Parameters:
Expand Down
28 changes: 27 additions & 1 deletion lib/core/util.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const assert = require('node:assert')
const { kDestroyed, kBodyUsed } = require('./symbols')
const { kDestroyed, kBodyUsed, kListeners } = require('./symbols')
const { IncomingMessage } = require('node:http')
const stream = require('node:stream')
const net = require('node:net')
Expand Down Expand Up @@ -534,6 +534,29 @@ function parseRangeHeader (range) {
: null
}

function addListener (obj, name, listener) {
const listeners = (obj[kListeners] ??= [])
listeners.push([name, listener])
obj.on(name, listener)
return obj
}

function removeAllListeners (obj) {
for (const [name, listener] of obj[kListeners] ?? []) {
obj.removeListener(name, listener)
}
obj[kListeners] = null
}

function errorRequest (client, request, err) {
try {
request.onError(err)
assert(request.aborted)
} catch (err) {
client.emit('error', err)
}
}

const kEnumerableProperty = Object.create(null)
kEnumerableProperty.enumerable = true

Expand All @@ -556,6 +579,9 @@ module.exports = {
isDestroyed,
headerNameToString,
bufferToLowerCasedHeaderName,
addListener,
removeAllListeners,
errorRequest,
parseRawHeaders,
parseHeaders,
parseKeepAliveTimeout,
Expand Down
36 changes: 7 additions & 29 deletions lib/dispatcher/client-h1.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ const {
kMaxRequests,
kCounter,
kMaxResponseSize,
kListeners,
kOnError,
kResume,
kHTTPContext
Expand All @@ -56,23 +55,11 @@ const {
const constants = require('../llhttp/constants.js')
const EMPTY_BUF = Buffer.alloc(0)
const FastBuffer = Buffer[Symbol.species]
const addListener = util.addListener
const removeAllListeners = util.removeAllListeners

let extractBody

function addListener (obj, name, listener) {
const listeners = (obj[kListeners] ??= [])
listeners.push([name, listener])
obj.on(name, listener)
return obj
}

function removeAllListeners (obj) {
for (const [name, listener] of obj[kListeners] ?? []) {
obj.removeListener(name, listener)
}
obj[kListeners] = null
}

async function lazyllhttp () {
const llhttpWasmData = process.env.JEST_WORKER_ID ? require('../llhttp/llhttp-wasm.js') : undefined

Expand Down Expand Up @@ -719,14 +706,14 @@ async function connectH1 (client, socket) {
const requests = client[kQueue].splice(client[kRunningIdx])
for (let i = 0; i < requests.length; i++) {
const request = requests[i]
errorRequest(client, request, err)
util.errorRequest(client, request, err)
}
} else if (client[kRunning] > 0 && err.code !== 'UND_ERR_INFO') {
// Fail head of pipeline.
const request = client[kQueue][client[kRunningIdx]]
client[kQueue][client[kRunningIdx]++] = null

errorRequest(client, request, err)
util.errorRequest(client, request, err)
}

client[kPendingIdx] = client[kRunningIdx]
Expand Down Expand Up @@ -831,15 +818,6 @@ function resumeH1 (client) {
}
}

function errorRequest (client, request, err) {
try {
request.onError(err)
assert(request.aborted)
} catch (err) {
client.emit('error', err)
}
}

// https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2
function shouldSendContentLength (method) {
return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT'
Expand Down Expand Up @@ -906,7 +884,7 @@ function writeH1 (client, request) {
// A user agent may send a Content-Length header with 0 value, this should be allowed.
if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength !== null && request.contentLength !== contentLength) {
if (client[kStrictContentLength]) {
errorRequest(client, request, new RequestContentLengthMismatchError())
util.errorRequest(client, request, new RequestContentLengthMismatchError())
return false
}

Expand All @@ -920,7 +898,7 @@ function writeH1 (client, request) {
return
}

errorRequest(client, request, err || new RequestAbortedError())
util.errorRequest(client, request, err || new RequestAbortedError())

util.destroy(body)
util.destroy(socket, new InformationalError('aborted'))
Expand All @@ -929,7 +907,7 @@ function writeH1 (client, request) {
try {
request.onConnect(abort)
} catch (err) {
errorRequest(client, request, err)
util.errorRequest(client, request, err)
}

if (request.aborted) {
Expand Down
103 changes: 41 additions & 62 deletions lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ const {
kSocket,
kStrictContentLength,
kOnError,
// HTTP2
kMaxConcurrentStreams,
kHTTP2Session,
kResume
Expand Down Expand Up @@ -92,24 +91,26 @@ async function connectH2 (client, socket) {
session[kOpenStreams] = 0
session[kClient] = client
session[kSocket] = socket
session.on('error', onHttp2SessionError)
session.on('frameError', onHttp2FrameError)
session.on('end', onHttp2SessionEnd)
session.on('goaway', onHTTP2GoAway)
session.on('close', function () {

util.addListener(session, 'error', onHttp2SessionError)
util.addListener(session, 'frameError', onHttp2FrameError)
util.addListener(session, 'end', onHttp2SessionEnd)
util.addListener(session, 'goaway', onHTTP2GoAway)
util.addListener(session, 'close', function () {
const { [kClient]: client } = this

const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))
const err = this[kSocket][kError] || new SocketError('closed', util.getSocketInfo(this))

client[kSocket] = null
client[kHTTP2Session] = null

assert(client[kPending] === 0)

// Fail entire queue.
const requests = client[kQueue].splice(client[kRunningIdx])
for (let i = 0; i < requests.length; i++) {
const request = requests[i]
errorRequest(client, request, err)
util.errorRequest(client, request, err)
}

client[kPendingIdx] = client[kRunningIdx]
Expand All @@ -120,19 +121,21 @@ async function connectH2 (client, socket) {

client[kResume]()
})

session.unref()

client[kHTTP2Session] = session
socket[kHTTP2Session] = session

socket.on('error', function (err) {
util.addListener(socket, 'error', function (err) {
assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')

this[kError] = err

this[kClient][kOnError](err)
})
socket.on('end', function () {

util.addListener(socket, 'end', function () {
util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
})

Expand Down Expand Up @@ -172,67 +175,42 @@ function onHttp2SessionError (err) {
assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')

this[kSocket][kError] = err

this[kClient][kOnError](err)
}

function onHttp2FrameError (type, code, id) {
const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)

if (id === 0) {
const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)
this[kSocket][kError] = err
this[kClient][kOnError](err)
}
}

function onHttp2SessionEnd () {
this.destroy(new SocketError('other side closed'))
util.destroy(this[kSocket], new SocketError('other side closed'))
const err = new SocketError('other side closed', util.getSocketInfo(this[kSocket]))
this.destroy(err)
util.destroy(this[kSocket], err)
}

/**
* This is the root cause of #3011
* We need to handle GOAWAY frames properly, and trigger the session close
* along with the socket right away
* Find a way to trigger the close cycle from here on.
*/
function onHTTP2GoAway (code) {
const client = this[kClient]
const err = new InformationalError(`HTTP/2: "GOAWAY" frame received with code ${code}`)
client[kSocket] = null
client[kHTTP2Session] = null

if (client.destroyed) {
assert(this[kPending] === 0)

// Fail entire queue.
const requests = client[kQueue].splice(client[kRunningIdx])
for (let i = 0; i < requests.length; i++) {
const request = requests[i]
errorRequest(this, request, err)
}
} else if (client[kRunning] > 0) {
// Fail head of pipeline.
const request = client[kQueue][client[kRunningIdx]]
client[kQueue][client[kRunningIdx]++] = null

errorRequest(client, request, err)
}

client[kPendingIdx] = client[kRunningIdx]

assert(client[kRunning] === 0)

client.emit('disconnect',
client[kUrl],
[client],
err
)

client[kResume]()
}
// We need to trigger the close cycle right away
// We need to destroy the session and the socket
// Requests should be failed with the error after the current one is handled
this[kSocket][kError] = err
this[kClient][kOnError](err)

function errorRequest (client, request, err) {
try {
request.onError(err)
assert(request.aborted)
} catch (err) {
client.emit('error', err)
}
this.unref()
// We send the GOAWAY frame response as no error
this.destroy()
util.destroy(this[kSocket], err)
}

// https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2
Expand All @@ -245,7 +223,8 @@ function writeH2 (client, request) {
const { body, method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request

if (upgrade) {
errorRequest(client, request, new Error('Upgrade not supported for H2'))
util.errorRequest(client, request, new Error('Upgrade not supported for H2'))
return false
}

if (request.aborted) {
Expand Down Expand Up @@ -297,10 +276,10 @@ function writeH2 (client, request) {
}
}

errorRequest(client, request, err)
util.errorRequest(client, request, err)
})
} catch (err) {
errorRequest(client, request, err)
util.errorRequest(client, request, err)
}

if (method === 'CONNECT') {
Expand Down Expand Up @@ -375,7 +354,7 @@ function writeH2 (client, request) {
// A user agent may send a Content-Length header with 0 value, this should be allowed.
if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength != null && request.contentLength !== contentLength) {
if (client[kStrictContentLength]) {
errorRequest(client, request, new RequestContentLengthMismatchError())
util.errorRequest(client, request, new RequestContentLengthMismatchError())
return false
}

Expand Down Expand Up @@ -417,7 +396,7 @@ function writeH2 (client, request) {
// as there's no value to keep it open.
if (request.aborted || request.completed) {
const err = new RequestAbortedError()
errorRequest(client, request, err)
util.errorRequest(client, request, err)
util.destroy(stream, err)
return
}
Expand Down Expand Up @@ -451,13 +430,12 @@ function writeH2 (client, request) {
}

const err = new InformationalError('HTTP/2: stream half-closed (remote)')
errorRequest(client, request, err)
util.errorRequest(client, request, err)
util.destroy(stream, err)
})

stream.once('close', () => {
session[kOpenStreams] -= 1
// TODO(HTTP/2): unref only if current streams count is 0
if (session[kOpenStreams] === 0) {
session.unref()
}
Expand All @@ -466,13 +444,14 @@ function writeH2 (client, request) {
stream.once('error', function (err) {
if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
session[kOpenStreams] -= 1
util.errorRequest(client, request, err)
util.destroy(stream, err)
}
})

stream.once('frameError', (type, code) => {
const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`)
errorRequest(client, request, err)
util.errorRequest(client, request, err)

if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
session[kOpenStreams] -= 1
Expand Down
Loading

0 comments on commit bc4b206

Please sign in to comment.