Skip to content

Commit

Permalink
refactor: Client.clients & Agent is a Client.
Browse files Browse the repository at this point in the history
Refs: #616
  • Loading branch information
ronag committed Mar 25, 2021
1 parent b3dd69f commit 142af1b
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 35 deletions.
25 changes: 10 additions & 15 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@

const Client = require('./lib/core/client')
const errors = require('./lib/core/errors')
const Pool = require('./lib/client-pool')
const Pool = require('./lib/pool')
const { Agent, getGlobalAgent, setGlobalAgent } = require('./lib/agent')
const util = require('./lib/core/util')
const { InvalidArgumentError, InvalidReturnValueError } = require('./lib/core/errors')
const { InvalidArgumentError } = require('./lib/core/errors')
const api = require('./lib/api')

Object.assign(Client.prototype, api)
Object.assign(Pool.prototype, api)
Object.assign(Agent.prototype, api)

function undici (url, opts) {
return new Pool(url, opts)
Expand All @@ -25,7 +26,7 @@ module.exports.Agent = Agent
module.exports.setGlobalAgent = setGlobalAgent
module.exports.getGlobalAgent = getGlobalAgent

function dispatchFromAgent (requestType) {
function makeDispatch (fn) {
return (url, { agent = getGlobalAgent(), method = 'GET', ...opts } = {}, ...additionalArgs) => {
if (opts.path != null) {
throw new InvalidArgumentError('unsupported opts.path')
Expand All @@ -34,18 +35,12 @@ function dispatchFromAgent (requestType) {
const { origin, pathname, search } = util.parseURL(url)
const path = `${pathname || '/'}${search || ''}`

const client = agent.get(origin)

if (client && typeof client[requestType] !== 'function') {
throw new InvalidReturnValueError(`Client returned from Agent.get() does not implement method ${requestType}`)
}

return client[requestType]({ ...opts, method, path }, ...additionalArgs)
return fn.call(agent, { ...opts, origin, method, path }, ...additionalArgs)
}
}

module.exports.request = dispatchFromAgent('request')
module.exports.stream = dispatchFromAgent('stream')
module.exports.pipeline = dispatchFromAgent('pipeline')
module.exports.connect = dispatchFromAgent('connect')
module.exports.upgrade = dispatchFromAgent('upgrade')
module.exports.request = makeDispatch(api.request)
module.exports.stream = makeDispatch(api.stream)
module.exports.pipeline = makeDispatch(api.pipeline)
module.exports.connect = makeDispatch(api.connect)
module.exports.upgrade = makeDispatch(api.upgrade)
40 changes: 29 additions & 11 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const EventEmitter = require('events')

const kOnConnect = Symbol('onConnect')
const kOnDisconnect = Symbol('onDisconnect')
const kOnDrain = Symbol('onDrain')
const kCache = Symbol('cache')
const kFactory = Symbol('factory')

Expand All @@ -28,36 +29,53 @@ class Agent extends EventEmitter {

const agent = this

this[kOnConnect] = function onConnect (client) {
agent.emit('connect', client)
this[kOnConnect] = function onConnect (...clients) {
agent.emit('connect', [...clients, agent])
}

this[kOnDisconnect] = function onDestroy (client, err) {
this[kOnDrain] = function onDrain (origin) {
agent.emit('drain', origin)
}

this[kOnDisconnect] = function onDestroy (clients, err) {
if (this.connected === 0 && this.size === 0) {
this.off('disconnect', agent[kOnDisconnect])
agent[kCache].delete(this.origin)
}

agent.emit('disconnect', client, err)
agent.emit('disconnect', [...clients, agent], err)
}
}

get (origin) {
if (typeof origin !== 'string' || origin === '') {
// TODO: Missing props:
// - connected
// - size
// - pending
// - running
// - closed
// - destroyed

dispatch (opts, handler) {
if (!opts || typeof opts !== 'object') {
throw new InvalidArgumentError('opts must be an object.')
}

if (typeof opts.origin !== 'string' || opts.origin === '') {
throw new InvalidArgumentError('Origin must be a non-empty string.')
}

let pool = this[kCache].get(origin)
let pool = this[kCache].get(opts.origin)

if (!pool) {
pool = this[kFactory](origin)
pool = this[kFactory](opts.origin)
.on('connect', this[kOnConnect])
.on('disconnect', this[kOnDisconnect])
.on('drain', this[kOnDrain])

this[kCache].set(origin, pool)
this[kCache].set(opts.origin, pool)
}

return pool
return pool.dispatch(opts, handler)
}

close () {
Expand All @@ -80,7 +98,7 @@ class Agent extends EventEmitter {
let globalAgent = new Agent({ connections: null })

function setGlobalAgent (agent) {
if (!agent || typeof agent.get !== 'function') {
if (!agent || typeof agent.dispatch !== 'function') {
throw new InvalidArgumentError('Argument agent must implement Agent')
}
globalAgent = agent
Expand Down
13 changes: 10 additions & 3 deletions lib/core/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ class Client extends EventEmitter {
return this[kQueue].length - this[kRunningIdx]
}

// TODO (fix): Make private?
get busy () {
const socket = this[kSocket]
return (
Expand Down Expand Up @@ -242,6 +243,10 @@ class Client extends EventEmitter {
}

dispatch (opts, handler) {
if (opts.origin && opts.origin !== this.url.origin) {
throw new InvalidArgumentError('origin')
}

try {
const request = new Request(opts, handler)
if (this[kDestroyed]) {
Expand Down Expand Up @@ -269,6 +274,8 @@ class Client extends EventEmitter {

handler.onError(err)
}

return this.busy
}

close (callback) {
Expand Down Expand Up @@ -484,7 +491,7 @@ class Parser extends HTTPParser {
detachSocket(socket)
client[kSocket] = null
client[kQueue][client[kRunningIdx]++] = null
client.emit('disconnect', client, new InformationalError('upgrade'))
client.emit('disconnect', [client], new InformationalError('upgrade'))

try {
request.onUpgrade(statusCode, headers, socket)
Expand Down Expand Up @@ -765,7 +772,7 @@ function onHeadersTimeout (self) {
function onSocketConnect () {
const { [kClient]: client } = this

client.emit('connect', client)
client.emit('connect', [client])
resume(client)
}

Expand Down Expand Up @@ -931,7 +938,7 @@ function socketResume (socket) {

function emitDrain (client) {
client[kNeedDrain] = 0
client.emit('drain')
client.emit('drain', this[kUrl].origin)
}

function resume (client, sync) {
Expand Down
23 changes: 17 additions & 6 deletions lib/client-pool.js → lib/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const {
const FixedQueue = require('./node/fixed-queue')
const util = require('./core/util')
const { kTLSSession } = require('./core/symbols')
const assert = require('assert')

const kClients = Symbol('clients')
const kNeedDrain = Symbol('needDrain')
Expand Down Expand Up @@ -48,7 +49,9 @@ class Pool extends EventEmitter {

const pool = this

this[kOnDrain] = function onDrain () {
this[kOnDrain] = function onDrain (origin) {
assert(this[kUrl].origin === origin)

const queue = pool[kQueue]

while (!this.busy) {
Expand All @@ -62,7 +65,7 @@ class Pool extends EventEmitter {

if (pool[kNeedDrain] && !this.busy) {
pool[kNeedDrain] = false
pool.emit('drain')
pool.emit('drain', this[kUrl].origin)
}

if (pool[kClosedResolve] && queue.isEmpty()) {
Expand All @@ -72,14 +75,14 @@ class Pool extends EventEmitter {
}
}

this[kOnConnect] = function onConnect (client) {
this[kOnConnect] = function onConnect (clients) {
pool[kConnected]++
pool.emit('connect', client)
pool.emit('connect', [...clients, pool])
}

this[kOnDisconnect] = function onDisconnect (client, err) {
this[kOnDisconnect] = function onDisconnect (clients, err) {
pool[kConnected]--
pool.emit('disconnect', client, err)
pool.emit('disconnect', [...clients, pool])
}

this[kOnTLSSession] = function cacheClientTLSSession (session) {
Expand All @@ -97,6 +100,7 @@ class Pool extends EventEmitter {
return this[kConnected]
}

// TODO (fix): Make private?
get busy () {
if (this[kPending] > 0) {
return true
Expand Down Expand Up @@ -154,6 +158,10 @@ class Pool extends EventEmitter {

dispatch (opts, handler) {
try {
if (opts.origin && opts.origin !== this.url.origin) {
throw new InvalidArgumentError('origin')
}

if (this[kDestroyed]) {
throw new ClientDestroyedError()
}
Expand Down Expand Up @@ -187,6 +195,7 @@ class Pool extends EventEmitter {
this[kQueue].push({ opts, handler })
this[kPending]++
} else {
// TODO (fix): Must use return value of dispatch instead of .busy.
client.dispatch(opts, handler)
if (client.busy && this.busy) {
this[kNeedDrain] = true
Expand All @@ -199,6 +208,8 @@ class Pool extends EventEmitter {

handler.onError(err)
}

return this.busy
}

close (cb) {
Expand Down

0 comments on commit 142af1b

Please sign in to comment.