Skip to content

Commit

Permalink
refactor: move out more h2 from core client
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Feb 27, 2024
1 parent 0070ea1 commit 079c084
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 49 deletions.
4 changes: 3 additions & 1 deletion lib/core/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,7 @@ module.exports = {
kHTTPConnVersion: Symbol('http connection version'),
kRetryHandlerDefaultRetry: Symbol('retry agent default retry'),
kConstruct: Symbol('constructable'),
kListeners: Symbol('listeners')
kListeners: Symbol('listeners'),
kHTTPWrite: Symbol('write'),
kMaxConcurrentStreams: Symbol('max concurrent streams')
}
4 changes: 3 additions & 1 deletion lib/dispatcher/client-h1.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ const {
kHTTPConnVersion,
kListeners,
kOnError,
kResume
kResume,
kHTTPWrite
} = require('../core/symbols.js')

const constants = require('../llhttp/constants.js')
Expand Down Expand Up @@ -645,6 +646,7 @@ function onParserTimeout (parser) {

async function connectH1 (client, socket) {
client[kHTTPConnVersion] = 'h1'
client[kHTTPWrite] = (...args) => writeH1(client, ...args)

if (!llhttpInstance) {
llhttpInstance = await llhttpPromise
Expand Down
46 changes: 27 additions & 19 deletions lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ const {
kHTTPConnVersion,
kOnError,
// HTTP2
kHost,
kMaxConcurrentStreams,
kHTTP2Session,
kHTTP2SessionState,
kHTTP2CopyHeaders,
kResume
kResume,
kHTTPWrite
} = require('../core/symbols.js')

const kOpenStreams = Symbol('open streams')

// Experimental
let h2ExperimentalWarned = false

Expand All @@ -58,6 +60,10 @@ const {

async function connectH2 (client, socket) {
client[kHTTPConnVersion] = 'h2'
client[kHTTPWrite] = (...args) => {
// TODO (fix): return
writeH2(client, ...args)
}

if (!h2ExperimentalWarned) {
h2ExperimentalWarned = true
Expand All @@ -68,9 +74,10 @@ async function connectH2 (client, socket) {

const session = http2.connect(client[kUrl], {
createConnection: () => socket,
peerMaxConcurrentStreams: client[kHTTP2SessionState].maxConcurrentStreams
peerMaxConcurrentStreams: client[kMaxConcurrentStreams]
})

session[kOpenStreams] = 0
session[kClient] = client
session[kSocket] = socket
session.on('error', onHttp2SessionError)
Expand Down Expand Up @@ -217,9 +224,10 @@ function writeH2 (client, request) {

/** @type {import('node:http2').ClientHttp2Stream} */
let stream
const h2State = client[kHTTP2SessionState]

headers[HTTP2_HEADER_AUTHORITY] = host || client[kHost]
const { hostname, port } = client[kUrl]

headers[HTTP2_HEADER_AUTHORITY] = host || `${hostname}${port ? `:${port}` : ''}`
headers[HTTP2_HEADER_METHOD] = method

try {
Expand All @@ -235,8 +243,8 @@ function writeH2 (client, request) {
if (stream != null) {
util.destroy(stream, err)

h2State.openStreams -= 1
if (h2State.openStreams === 0) {
session[kOpenStreams] -= 1
if (session[kOpenStreams] === 0) {
session.unref()
}
}
Expand All @@ -257,18 +265,18 @@ function writeH2 (client, request) {

if (stream.id && !stream.pending) {
request.onUpgrade(null, null, stream)
++h2State.openStreams
++session[kOpenStreams]
} else {
stream.once('ready', () => {
request.onUpgrade(null, null, stream)
++h2State.openStreams
++session[kOpenStreams]
})
}

stream.once('close', () => {
h2State.openStreams -= 1
session[kOpenStreams] -= 1
// TODO(HTTP/2): unref only if current streams count is 0
if (h2State.openStreams === 0) session.unref()
if (session[kOpenStreams] === 0) session.unref()
})

return true
Expand Down Expand Up @@ -348,7 +356,7 @@ function writeH2 (client, request) {
}

// Increment counter as we have new several streams open
++h2State.openStreams
++session[kOpenStreams]

stream.once('response', headers => {
const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
Expand All @@ -371,8 +379,8 @@ function writeH2 (client, request) {
// Stream is closed or half-closed-remote (6), decrement counter and cleanup
// It does not have sense to continue working with the stream as we do not
// have yet RST_STREAM support on client-side
h2State.openStreams -= 1
if (h2State.openStreams === 0) {
session[kOpenStreams] -= 1
if (session[kOpenStreams] === 0) {
session.unref()
}

Expand All @@ -388,16 +396,16 @@ function writeH2 (client, request) {
})

stream.once('close', () => {
h2State.openStreams -= 1
session[kOpenStreams] -= 1
// TODO(HTTP/2): unref only if current streams count is 0
if (h2State.openStreams === 0) {
if (session[kOpenStreams] === 0) {
session.unref()
}
})

stream.once('error', function (err) {
if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
h2State.streams -= 1
session[kOpenStreams] -= 1
util.destroy(stream, err)
}
})
Expand All @@ -407,7 +415,7 @@ function writeH2 (client, request) {
errorRequest(client, request, err)

if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
h2State.streams -= 1
session[kOpenStreams] -= 1
util.destroy(stream, err)
}
})
Expand Down
39 changes: 11 additions & 28 deletions lib/dispatcher/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,16 @@ const {
kMaxResponseSize,
kHTTPConnVersion,
kOnError,
kHTTPWrite,
// HTTP2
kHost,
kMaxConcurrentStreams,
kHTTP2Session,
kHTTP2SessionState,
kHTTP2BuildRequest,
kHTTP1BuildRequest,
kResume
} = require('../core/symbols.js')
const { connectH1, writeH1, resumeH1 } = require('./client-h1.js')
const { connectH2, writeH2 } = require('./client-h2.js')
const { connectH1, resumeH1 } = require('./client-h1.js')
const { connectH2 } = require('./client-h2.js')

const kClosedResolve = Symbol('kClosedResolve')

Expand Down Expand Up @@ -107,8 +107,8 @@ class Client extends DispatcherBase {
autoSelectFamily,
autoSelectFamilyAttemptTimeout,
// h2
allowH2,
maxConcurrentStreams
maxConcurrentStreams,
allowH2
} = {}) {
super()

Expand Down Expand Up @@ -237,17 +237,10 @@ class Client extends DispatcherBase {
this[kClosedResolve] = null
this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1
this[kHTTPConnVersion] = null
this[kMaxConcurrentStreams] = maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server

// HTTP/2
this[kHTTP2Session] = null
this[kHTTP2SessionState] = !allowH2
? null
: {
// streams: null, // Fixed queue of streams - For future support of `push`
openStreams: 0, // Keep track of them to decide whether or not unref the session
maxConcurrentStreams: maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server
}
this[kHost] = `${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}`

// kQueue is built up of 3 sections separated by
// the kRunningIdx and kPendingIdx indices.
Expand Down Expand Up @@ -309,6 +302,7 @@ class Client extends DispatcherBase {
[kDispatch] (opts, handler) {
const origin = opts.origin || this[kUrl].origin

// TODO (fix): Why do these need tp be
const request = this[kHTTPConnVersion] === 'h2'
? Request[kHTTP2BuildRequest](origin, opts, handler)
: Request[kHTTP1BuildRequest](origin, opts, handler)
Expand Down Expand Up @@ -361,13 +355,12 @@ class Client extends DispatcherBase {
}

if (this[kHTTP2Session] != null) {
util.destroy(this[kHTTP2Session], err)
this[kHTTP2Session].destroy(err)
this[kHTTP2Session] = null
this[kHTTP2SessionState] = null
}

if (this[kSocket]) {
util.destroy(this[kSocket].on('close', callback), err)
this[kSocket].destroy(err).on('close', callback)
} else {
queueMicrotask(callback)
}
Expand Down Expand Up @@ -652,24 +645,14 @@ function _resume (client, sync) {
}
}

if (!request.aborted && write(client, request)) {
if (!request.aborted && client[kHTTPWrite](request)) {
client[kPendingIdx]++
} else {
client[kQueue].splice(client[kPendingIdx], 1)
}
}
}

function write (client, request) {
if (client[kHTTPConnVersion] === 'h2') {
// TODO (fix): Why does this not return the value
// from writeH2.
writeH2(client, request)
} else {
return writeH1(client, request)
}
}

function errorRequest (client, request, err) {
try {
request.onError(err)
Expand Down

0 comments on commit 079c084

Please sign in to comment.