Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move out more h2 from core client #2860

Merged
merged 4 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,5 @@ undici-fetch.js

# .npmrc has platform specific value for windows
.npmrc

.tap
5 changes: 3 additions & 2 deletions lib/core/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ module.exports = {
kHTTP2BuildRequest: Symbol('http2 build request'),
kHTTP1BuildRequest: Symbol('http1 build request'),
kHTTP2CopyHeaders: Symbol('http2 copy headers'),
kHTTPConnVersion: Symbol('http connection version'),
kRetryHandlerDefaultRetry: Symbol('retry agent default retry'),
kConstruct: Symbol('constructable'),
kListeners: Symbol('listeners')
kListeners: Symbol('listeners'),
kHTTPContext: Symbol('http context'),
kMaxConcurrentStreams: Symbol('max concurrent streams')
}
21 changes: 13 additions & 8 deletions lib/dispatcher/client-h1.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ const {
kMaxRequests,
kCounter,
kMaxResponseSize,
kHTTPConnVersion,
kListeners,
kOnError,
kResume
Expand Down Expand Up @@ -644,8 +643,6 @@ function onParserTimeout (parser) {
}

async function connectH1 (client, socket) {
client[kHTTPConnVersion] = 'h1'

if (!llhttpInstance) {
llhttpInstance = await llhttpPromise
llhttpPromise = null
Expand Down Expand Up @@ -735,6 +732,18 @@ async function connectH1 (client, socket) {

client[kResume]()
})

return {
version: 'h1',
write (...args) {
return writeH1(client, ...args)
},
resume () {
resumeH1(client)
},
destroy () {
}
}
}

function resumeH1 (client) {
Expand Down Expand Up @@ -1274,8 +1283,4 @@ class AsyncWriter {
}
}

module.exports = {
connectH1,
writeH1,
resumeH1
}
module.exports = connectH1
61 changes: 36 additions & 25 deletions lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ const {
kError,
kSocket,
kStrictContentLength,
kHTTPConnVersion,
kOnError,
// HTTP2
kHost,
kMaxConcurrentStreams,
kHTTP2Session,
kHTTP2SessionState,
kHTTP2CopyHeaders,
kResume
} = require('../core/symbols.js')

const kOpenStreams = Symbol('open streams')

// Experimental
let h2ExperimentalWarned = false

Expand All @@ -57,8 +57,6 @@ const {
} = http2

async function connectH2 (client, socket) {
client[kHTTPConnVersion] = 'h2'

if (!h2ExperimentalWarned) {
h2ExperimentalWarned = true
process.emitWarning('H2 support is experimental, expect them to change at any time.', {
Expand All @@ -68,9 +66,10 @@ async function connectH2 (client, socket) {

const session = http2.connect(client[kUrl], {
createConnection: () => socket,
peerMaxConcurrentStreams: client[kHTTP2SessionState].maxConcurrentStreams
peerMaxConcurrentStreams: client[kMaxConcurrentStreams]
})

session[kOpenStreams] = 0
session[kClient] = client
session[kSocket] = socket
session.on('error', onHttp2SessionError)
Expand Down Expand Up @@ -124,6 +123,20 @@ async function connectH2 (client, socket) {
socket.on('end', function () {
util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
})

return {
version: 'h2',
write (...args) {
// TODO (fix): return
writeH2(client, ...args)
},
resume () {

},
destroy (err) {
session.destroy(err)
}
}
}

function onHttp2SessionError (err) {
Expand Down Expand Up @@ -217,9 +230,10 @@ function writeH2 (client, request) {

/** @type {import('node:http2').ClientHttp2Stream} */
let stream
const h2State = client[kHTTP2SessionState]

headers[HTTP2_HEADER_AUTHORITY] = host || client[kHost]
const { hostname, port } = client[kUrl]

headers[HTTP2_HEADER_AUTHORITY] = host || `${hostname}${port ? `:${port}` : ''}`
headers[HTTP2_HEADER_METHOD] = method

try {
Expand All @@ -235,8 +249,8 @@ function writeH2 (client, request) {
if (stream != null) {
util.destroy(stream, err)

h2State.openStreams -= 1
if (h2State.openStreams === 0) {
session[kOpenStreams] -= 1
if (session[kOpenStreams] === 0) {
session.unref()
}
}
Expand All @@ -257,18 +271,18 @@ function writeH2 (client, request) {

if (stream.id && !stream.pending) {
request.onUpgrade(null, null, stream)
++h2State.openStreams
++session[kOpenStreams]
} else {
stream.once('ready', () => {
request.onUpgrade(null, null, stream)
++h2State.openStreams
++session[kOpenStreams]
})
}

stream.once('close', () => {
h2State.openStreams -= 1
session[kOpenStreams] -= 1
// TODO(HTTP/2): unref only if current streams count is 0
if (h2State.openStreams === 0) session.unref()
if (session[kOpenStreams] === 0) session.unref()
})

return true
Expand Down Expand Up @@ -348,7 +362,7 @@ function writeH2 (client, request) {
}

// Increment counter as we have new several streams open
++h2State.openStreams
++session[kOpenStreams]

stream.once('response', headers => {
const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
Expand All @@ -371,8 +385,8 @@ function writeH2 (client, request) {
// Stream is closed or half-closed-remote (6), decrement counter and cleanup
// It does not have sense to continue working with the stream as we do not
// have yet RST_STREAM support on client-side
h2State.openStreams -= 1
if (h2State.openStreams === 0) {
session[kOpenStreams] -= 1
if (session[kOpenStreams] === 0) {
session.unref()
}

Expand All @@ -388,16 +402,16 @@ function writeH2 (client, request) {
})

stream.once('close', () => {
h2State.openStreams -= 1
session[kOpenStreams] -= 1
// TODO(HTTP/2): unref only if current streams count is 0
if (h2State.openStreams === 0) {
if (session[kOpenStreams] === 0) {
session.unref()
}
})

stream.once('error', function (err) {
if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
h2State.streams -= 1
session[kOpenStreams] -= 1
util.destroy(stream, err)
}
})
Expand All @@ -407,7 +421,7 @@ function writeH2 (client, request) {
errorRequest(client, request, err)

if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
h2State.streams -= 1
session[kOpenStreams] -= 1
util.destroy(stream, err)
}
})
Expand Down Expand Up @@ -599,7 +613,4 @@ async function writeIterable ({ h2stream, body, client, request, socket, content
}
}

module.exports = {
connectH2,
writeH2
}
module.exports = connectH2