Skip to content

Commit

Permalink
feat: port H2 work with latest main
Browse files Browse the repository at this point in the history
  • Loading branch information
metcoder95 committed Apr 19, 2023
1 parent 9041e9f commit 37d589a
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 30 deletions.
248 changes: 219 additions & 29 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

const assert = require('assert')
const net = require('net')
const http2 = require('http2')
const util = require('./core/util')
const timers = require('./timers')
const Request = require('./core/request')
Expand Down Expand Up @@ -67,7 +68,10 @@ const {
kDispatch,
kInterceptors,
kLocalAddress,
kMaxResponseSize
kMaxResponseSize,
// HTTP2
kHost,
kHTTP2Session,
} = require('./core/symbols')
const FastBuffer = Buffer[Symbol.species]

Expand Down Expand Up @@ -241,6 +245,10 @@ class Client extends DispatcherBase {
this[kClosedResolve] = null
this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1

// HTTP/2
this[kHTTP2Session] = null
this[kHost] = `${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}`

// kQueue is built up of 3 sections separated by
// the kRunningIdx and kPendingIdx indices.
// | complete | running | pending |
Expand Down Expand Up @@ -356,6 +364,15 @@ class Client extends DispatcherBase {
}
}


function onHttp2SessionError (err) {
assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')

this[kError] = err

onError(this[kClient], err)
}

const constants = require('./llhttp/constants')
const createRedirectInterceptor = require('./interceptor/redirectInterceptor')
const EMPTY_BUF = Buffer.alloc(0)
Expand Down Expand Up @@ -995,14 +1012,16 @@ function onSocketEnd () {
function onSocketClose () {
const { [kClient]: client } = this

if (!this[kError] && this[kParser].statusCode && !this[kParser].shouldKeepAlive) {
// We treat all incoming data so far as a valid response.
this[kParser].onMessageComplete()
if (!this[kHTTP2Session]) {
if (!this[kError] && this[kParser].statusCode && !this[kParser].shouldKeepAlive) {
// We treat all incoming data so far as a valid response.
this[kParser].onMessageComplete()
}

this[kParser].destroy()
this[kParser] = null
}

this[kParser].destroy()
this[kParser] = null

const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))

client[kSocket] = null
Expand Down Expand Up @@ -1089,29 +1108,43 @@ async function connect (client) {
return
}

if (!llhttpInstance) {
llhttpInstance = await llhttpPromise
llhttpPromise = null
}

client[kConnecting] = false

assert(socket)

socket[kNoRef] = false
socket[kWriting] = false
socket[kReset] = false
socket[kBlocking] = false
socket[kError] = null
socket[kParser] = new Parser(client, socket, llhttpInstance)
socket[kClient] = client
socket[kCounter] = 0
socket[kMaxRequests] = client[kMaxRequests]
socket
.on('error', onSocketError)
.on('readable', onSocketReadable)
.on('end', onSocketEnd)
.on('close', onSocketClose)
if (socket.alpnProtocol === 'h2') {
const session = http2.connect(client[kUrl], {
createConnection: () => socket
})

session[kError] = null
session[kClient] = client
session.on('error', onHttp2SessionError)
session.on('close', onSocketClose)
session.unref()

client[kHTTP2Session] = session
} else {
if (!llhttpInstance) {
llhttpInstance = await llhttpPromise
llhttpPromise = null
}

socket[kNoRef] = false
socket[kWriting] = false
socket[kReset] = false
socket[kBlocking] = false
socket[kError] = null
socket[kParser] = new Parser(client, socket, llhttpInstance)
socket[kClient] = client
socket[kCounter] = 0
socket[kMaxRequests] = client[kMaxRequests]
socket
.on('error', onSocketError)
.on('readable', onSocketReadable)
.on('end', onSocketEnd)
.on('close', onSocketClose)
}

client[kSocket] = socket

Expand Down Expand Up @@ -1205,7 +1238,7 @@ function _resume (client, sync) {

const socket = client[kSocket]

if (socket && !socket.destroyed) {
if (socket && !socket.destroyed && socket.alpnProtocol !== 'h2') {
if (client[kSize] === 0) {
if (!socket[kNoRef] && socket.unref) {
socket.unref()
Expand Down Expand Up @@ -1270,12 +1303,14 @@ function _resume (client, sync) {
return
}

if (!socket) {
if (!socket && !client[kHTTP2Session]) {
connect(client)
return
}

if (socket.destroyed || socket[kWriting] || socket[kReset] || socket[kBlocking]) {
if ((socket.destroyed || socket[kWriting] || socket[kReset] || socket[kBlocking]) ||
(client[kHTTP2Session] && client[kHTTP2Session].destroyed)) {
// TODO(HTTP/2): what if exceeds max concurrent streams or can't accept new
return
}

Expand Down Expand Up @@ -1331,6 +1366,12 @@ function _resume (client, sync) {
}

function write (client, request) {
if (client[kHTTP2Session]) {
console.log('http/2')
writeH2(client, client[kHTTP2Session], request)
return
}

const { body, method, path, host, upgrade, headers, blocking, reset } = request

// https://tools.ietf.org/html/rfc7231#section-4.3.1
Expand Down Expand Up @@ -1486,6 +1527,155 @@ function write (client, request) {
return true
}

function writeH2 (client, session, request) {
// TODO(HTTP/2): upgrade is not supported in HTTP/2
const { body, method, path, host, upgrade } = request

// https://tools.ietf.org/html/rfc7231#section-4.3.1
// https://tools.ietf.org/html/rfc7231#section-4.3.2
// https://tools.ietf.org/html/rfc7231#section-4.3.5

// Sending a payload body on a request that does not
// expect it can cause undefined behavior on some
// servers and corrupt connection state. Do not
// re-use the connection for further requests.


const expectsPayload = (
method === 'PUT' ||
method === 'POST' ||
method === 'PATCH'
)

if (body && typeof body.read === 'function') {
// Try to read EOF in order to get length.
body.read(0)
}

let contentLength = util.bodyLength(body)

if (contentLength == null) {
contentLength = request.contentLength
}

if (contentLength === 0 || !expectsPayload) {
// https://tools.ietf.org/html/rfc7230#section-3.3.2
// A user agent SHOULD NOT send a Content-Length header field when
// the request message does not contain a payload body and the method
// semantics do not anticipate such a body.

contentLength = null
}

if (request.contentLength != null && request.contentLength !== contentLength) {
if (client[kStrictContentLength]) {
errorRequest(client, request, new RequestContentLengthMismatchError())
return false
}

process.emitWarning(new RequestContentLengthMismatchError())
}

try {
// TODO(HTTP/2): Should we call onConnect immediately or on stream ready event?
request.onConnect((err) => {
if (request.aborted || request.completed) {
return
}

errorRequest(client, request, err || new RequestAbortedError())
})
} catch (err) {
errorRequest(client, request, err)
}

if (request.aborted) {
return false
}

const headers = { ...request.headers }
headers[':authority'] = host || client[kHost]
headers[':path'] = path

// TODO(HTTP/2): Expect: 100-continue

/* istanbul ignore else: assertion */
if (!body) {
if (contentLength === 0) {
headers['content-length'] = '0'
} else {
assert(contentLength == null, 'no body must not have content length')
}
} else if (util.isBuffer(body)) {
assert(contentLength === body.byteLength, 'buffer body must have content length')

headers['content-length'] = String(contentLength)

process.nextTick(() => {
stream.end(body)
request.onBodySent(body)
})
} else if (util.isBlob(body)) {
process.nextTick(() => {
writeBlob({ client, request, stream, contentLength, expectsPayload })
})
} else if (util.isStream(body)) {
process.nextTick(() => {
writeStream({ client, request, stream, contentLength, expectsPayload })
})
} else if (util.isIterable(body)) {
process.nextTick(() => {
writeIterable({ client, request, stream, contentLength, expectsPayload })
})
} else {
assert(false)
}

console.log('http/2 request')
// TODO(HTTP/2): ref only if current streams count is 0
session.ref()
// TODO(HTTP/2): The handler expects an array but the native http2 module provides us with an object. What should we do?
const stream = session.request(headers)

stream.on('response', headers => {
if (request.onHeaders(Number(headers[':status']), headers, stream.resume.bind(stream), '') === false) {
stream.pause()
}
})

stream.on('data', chunk => {
if (request.onData(chunk) === false) {
stream.pause()
}
})

stream.on('trailers', headers => {
// TODO(HTTP/2): Suppor trailers
})
stream.on('end', () => {
request.onComplete([])
})

stream.on('aborted', () => {
// TODO(HTTP/2): Support aborted
})

stream.on('ready', () => {
// TODO(HTTP/2): Support ready
})

stream.on('timeout', () => {
// TODO(HTTP/2): Support timeout
})

stream.on('close', () => {
// TODO(HTTP/2): unref only if current streams count is 0
session.unref()
})

return true
}

function writeStream ({ body, client, request, socket, contentLength, header, expectsPayload }) {
assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined')

Expand Down
2 changes: 2 additions & 0 deletions lib/core/connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ function buildConnector ({ maxCachedSessions, socketPath, timeout, ...opts }) {
servername,
session,
localAddress,
// TODO(HTTP/2): Should we support h2c?
ALPNProtocols: ['h2', 'http/1.1'],
socket: httpSocket, // upgrade socket connection
port: port || 443,
host: hostname
Expand Down
4 changes: 3 additions & 1 deletion lib/core/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,7 @@ module.exports = {
kProxy: Symbol('proxy agent options'),
kCounter: Symbol('socket request counter'),
kInterceptors: Symbol('dispatch interceptors'),
kMaxResponseSize: Symbol('max response size')
kMaxResponseSize: Symbol('max response size'),
kHost: Symbol('host'),
kHTTP2Session: Symbol('http2Session')
}

0 comments on commit 37d589a

Please sign in to comment.