Skip to content

Commit

Permalink
refactor: Client.clients & Agent is a Client.
Browse files Browse the repository at this point in the history
Refs: #616
  • Loading branch information
ronag committed Mar 25, 2021
1 parent b3dd69f commit 4260bdf
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 75 deletions.
25 changes: 10 additions & 15 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

const Client = require('./lib/core/client')
const errors = require('./lib/core/errors')
const Pool = require('./lib/client-pool')
const Pool = require('./lib/pool')
const { Agent, getGlobalAgent, setGlobalAgent } = require('./lib/agent')
const util = require('./lib/core/util')
const { InvalidArgumentError, InvalidReturnValueError } = require('./lib/core/errors')
const { InvalidArgumentError } = require('./lib/core/errors')
const api = require('./lib/api')

Object.assign(Client.prototype, api)
Object.assign(Pool.prototype, api)
Object.assign(Agent.prototype, api)

function undici (url, opts) {
return new Pool(url, opts)
Expand All @@ -25,7 +26,7 @@ module.exports.Agent = Agent
module.exports.setGlobalAgent = setGlobalAgent
module.exports.getGlobalAgent = getGlobalAgent

function dispatchFromAgent (requestType) {
function makeDispatch (fn) {
return (url, { agent = getGlobalAgent(), method = 'GET', ...opts } = {}, ...additionalArgs) => {
if (opts.path != null) {
throw new InvalidArgumentError('unsupported opts.path')
Expand All @@ -34,18 +35,12 @@ function dispatchFromAgent (requestType) {
const { origin, pathname, search } = util.parseURL(url)
const path = `${pathname || '/'}${search || ''}`

const client = agent.get(origin)

if (client && typeof client[requestType] !== 'function') {
throw new InvalidReturnValueError(`Client returned from Agent.get() does not implement method ${requestType}`)
}

return client[requestType]({ ...opts, method, path }, ...additionalArgs)
return fn.call(agent, { ...opts, origin, method, path }, ...additionalArgs)
}
}

module.exports.request = dispatchFromAgent('request')
module.exports.stream = dispatchFromAgent('stream')
module.exports.pipeline = dispatchFromAgent('pipeline')
module.exports.connect = dispatchFromAgent('connect')
module.exports.upgrade = dispatchFromAgent('upgrade')
module.exports.request = makeDispatch(api.request)
module.exports.stream = makeDispatch(api.stream)
module.exports.pipeline = makeDispatch(api.pipeline)
module.exports.connect = makeDispatch(api.connect)
module.exports.upgrade = makeDispatch(api.upgrade)
79 changes: 63 additions & 16 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ const EventEmitter = require('events')

const kOnConnect = Symbol('onConnect')
const kOnDisconnect = Symbol('onDisconnect')
const kCache = Symbol('cache')
const kOnDrain = Symbol('onDrain')
const kClients = Symbol('cache')
const kFactory = Symbol('factory')

function defaultFactory (origin, opts) {
Expand All @@ -24,53 +25,99 @@ class Agent extends EventEmitter {
}

this[kFactory] = (origin) => factory(origin, opts)
this[kCache] = new Map()
this[kClients] = new Map()

const agent = this

this[kOnConnect] = function onConnect (client) {
agent.emit('connect', client)
this[kOnConnect] = function onConnect (origin, clients) {
agent.emit('connect', origin, [...clients, agent])
}

this[kOnDisconnect] = function onDestroy (client, err) {
this[kOnDrain] = function onDrain (origin) {
agent.emit('drain', origin)
}

this[kOnDisconnect] = function onDestroy (origin, clients, err) {
if (this.connected === 0 && this.size === 0) {
this.off('disconnect', agent[kOnDisconnect])
agent[kCache].delete(this.origin)
agent[kClients].delete(this.origin)
}

agent.emit('disconnect', client, err)
agent.emit('disconnect', origin, [...clients, agent], err)
}
}

get connected () {
let ret = 0
for (const { connected } of this[kClients].values()) {
ret += connected
}
return ret
}

get (origin) {
if (typeof origin !== 'string' || origin === '') {
get size () {
let ret = 0
for (const { size } of this[kClients].values()) {
ret += size
}
return ret
}

get pending () {
let ret = 0
for (const { pending } of this[kClients].values()) {
ret += pending
}
return ret
}

get running () {
let ret = 0
for (const { running } of this[kClients].values()) {
ret += running
}
return ret
}

// TODO: get closed ()
// TODO: get destroyed ()

dispatch (opts, handler) {
if (!opts || typeof opts !== 'object') {
throw new InvalidArgumentError('opts must be an object.')
}

if (typeof opts.origin !== 'string' || opts.origin === '') {
throw new InvalidArgumentError('Origin must be a non-empty string.')
}

let pool = this[kCache].get(origin)
let pool = this[kClients].get(opts.origin)

if (!pool) {
pool = this[kFactory](origin)
pool = this[kFactory](opts.origin)
.on('connect', this[kOnConnect])
.on('disconnect', this[kOnDisconnect])
.on('drain', this[kOnDrain])

this[kCache].set(origin, pool)
this[kClients].set(opts.origin, pool)
}

return pool
return pool.dispatch(opts, handler)
}

// TODO: close (callback)
close () {
const closePromises = []
for (const pool of this[kCache].values()) {
for (const pool of this[kClients].values()) {
closePromises.push(pool.close())
}
return Promise.all(closePromises)
}

// TODO: destroy (err, callback)
destroy () {
const destroyPromises = []
for (const pool of this[kCache].values()) {
for (const pool of this[kClients].values()) {
destroyPromises.push(pool.destroy())
}
return Promise.all(destroyPromises)
Expand All @@ -80,7 +127,7 @@ class Agent extends EventEmitter {
let globalAgent = new Agent({ connections: null })

function setGlobalAgent (agent) {
if (!agent || typeof agent.get !== 'function') {
if (!agent || typeof agent.dispatch !== 'function') {
throw new InvalidArgumentError('Argument agent must implement Agent')
}
globalAgent = agent
Expand Down
34 changes: 19 additions & 15 deletions lib/core/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,6 @@ class Client extends EventEmitter {
return this[kQueue].length - this[kRunningIdx]
}

get busy () {
const socket = this[kSocket]
return (
(socket && (socket[kReset] || socket[kWriting])) ||
(this.size >= (this[kPipelining] || 1)) ||
this.pending > 0
)
}

get destroyed () {
return this[kDestroyed]
}
Expand All @@ -242,6 +233,10 @@ class Client extends EventEmitter {
}

dispatch (opts, handler) {
if (opts.origin && opts.origin !== this.url.origin) {
throw new InvalidArgumentError('origin')
}

try {
const request = new Request(opts, handler)
if (this[kDestroyed]) {
Expand Down Expand Up @@ -269,6 +264,8 @@ class Client extends EventEmitter {

handler.onError(err)
}

return this[kNeedDrain] > 0
}

close (callback) {
Expand Down Expand Up @@ -484,7 +481,7 @@ class Parser extends HTTPParser {
detachSocket(socket)
client[kSocket] = null
client[kQueue][client[kRunningIdx]++] = null
client.emit('disconnect', client, new InformationalError('upgrade'))
client.emit('disconnect', [client], new InformationalError('upgrade'))

try {
request.onUpgrade(statusCode, headers, socket)
Expand Down Expand Up @@ -765,7 +762,7 @@ function onHeadersTimeout (self) {
function onSocketConnect () {
const { [kClient]: client } = this

client.emit('connect', client)
client.emit('connect', client[kUrl], [client])
resume(client)
}

Expand Down Expand Up @@ -844,7 +841,7 @@ function onSocketClose () {
// Retry remaining requests.
client[kPendingIdx] = client[kRunningIdx]

client.emit('disconnect', client, err)
client.emit('disconnect', client[kUrl], [client], err)
}

resume(client)
Expand Down Expand Up @@ -931,7 +928,7 @@ function socketResume (socket) {

function emitDrain (client) {
client[kNeedDrain] = 0
client.emit('drain')
client.emit('drain', this[kUrl].origin)
}

function resume (client, sync) {
Expand Down Expand Up @@ -962,8 +959,9 @@ function _resume (client, sync) {
continue
}

const socket = this[kSocket]

if (client[kSocket]) {
const socket = client[kSocket]
const timeout = client.running > 0 ? 0 : client[kKeepAliveTimeoutValue]

if (socket[kIdleTimeoutValue] !== timeout) {
Expand All @@ -985,7 +983,13 @@ function _resume (client, sync) {
}
}

if (client.busy) {
const busy = (
(socket && (socket[kReset] || socket[kWriting])) ||
(this.size >= (this[kPipelining] || 1)) ||
this.pending > 0
)

if (busy) {
client[kNeedDrain] = 2
} else if (client[kNeedDrain] === 2) {
if (sync) {
Expand Down
Loading

0 comments on commit 4260bdf

Please sign in to comment.