Skip to content

Commit

Permalink
refactor: Client.clients & Agent is a Client.
Browse files Browse the repository at this point in the history
Refs: #616
  • Loading branch information
ronag committed Mar 27, 2021
1 parent e91a79f commit e66d8b2
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 91 deletions.
40 changes: 27 additions & 13 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,44 +2,58 @@

const Client = require('./lib/core/client')
const errors = require('./lib/core/errors')
const Pool = require('./lib/client-pool')
const { Agent, getGlobalAgent, setGlobalAgent } = require('./lib/agent')
const Pool = require('./lib/pool')
const Agent = require('./lib/agent')
const util = require('./lib/core/util')
const { InvalidArgumentError } = require('./lib/core/errors')
const api = require('./lib/api')

Object.assign(Client.prototype, api)
Object.assign(Pool.prototype, api)
Object.assign(Agent.prototype, api)

function undici (url, opts) {
return new Pool(url, opts)
}

module.exports = undici

module.exports.Pool = Pool
module.exports.Client = Client
module.exports.errors = errors

module.exports.Agent = Agent
module.exports.setGlobalAgent = setGlobalAgent
module.exports.getGlobalAgent = getGlobalAgent

function dispatchFromAgent (fn) {
return (url, { agent = getGlobalAgent(), method = 'GET', ...opts } = {}, ...additionalArgs) => {
let globalClient = new Agent()

function setGlobalClient (agent) {
if (!agent || typeof agent.dispatch !== 'function') {
throw new InvalidArgumentError('Argument agent must implement Agent')
}
globalClient = agent
}

function getGlobalClient () {
return globalClient
}

function makeDispatch (fn) {
return (url, { client = getGlobalClient(), method = 'GET', ...opts } = {}, ...additionalArgs) => {
if (opts.path != null) {
throw new InvalidArgumentError('unsupported opts.path')
}

const { origin, pathname, search } = util.parseURL(url)
const path = `${pathname}${search || ''}`

return fn.call(agent, { ...opts, origin, method, path }, ...additionalArgs)
return fn.call(client, { ...opts, origin, method, path }, ...additionalArgs)
}
}

module.exports.request = dispatchFromAgent(api.request)
module.exports.stream = dispatchFromAgent(api.stream)
module.exports.pipeline = dispatchFromAgent(api.pipeline)
module.exports.connect = dispatchFromAgent(api.connect)
module.exports.upgrade = dispatchFromAgent(api.upgrade)
module.exports.setGlobalClient = setGlobalClient
module.exports.getGlobalClient = getGlobalClient

module.exports.request = makeDispatch(api.request)
module.exports.stream = makeDispatch(api.stream)
module.exports.pipeline = makeDispatch(api.pipeline)
module.exports.connect = makeDispatch(api.connect)
module.exports.upgrade = makeDispatch(api.upgrade)
85 changes: 51 additions & 34 deletions lib/agent/index.js → lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ const Client = require('../core/client')
const EventEmitter = require('events')
const util = require('../core/util')
const assert = require('assert')
const RedirectHandler = require('./redirect')
const RedirectHandler = require('./handler/redirect')

const kOnConnect = Symbol('onConnect')
const kOnDisconnect = Symbol('onDisconnect')
const kClients = Symbol('clients')
const kMaxRedirections = Symbol('maxRedirections')
const kOnDrain = Symbol('onDrain')
const kFactory = Symbol('factory')

function defaultFactory (origin, opts) {
Expand All @@ -38,36 +39,54 @@ class Agent extends EventEmitter {

const agent = this

this[kOnConnect] = function onConnect (client) {
agent.emit('connect', client)
this[kOnConnect] = function onConnect (origin, clients) {
agent.emit('connect', origin, [...clients, agent])
}

this[kOnDisconnect] = function onDestroy (client, err) {
this[kOnDrain] = function onDrain (origin) {
agent.emit('drain', origin)
}

this[kOnDisconnect] = function onDestroy (origin, clients, err) {
if (this.connected === 0 && this.size === 0) {
this.off('disconnect', agent[kOnDisconnect])
agent[kClients].delete(this.origin)
}

agent.emit('disconnect', client, err)
agent.emit('disconnect', origin, [...clients, agent], err)
}
}

get (origin) {
if (typeof origin !== 'string' || origin === '') {
throw new InvalidArgumentError('Origin must be a non-empty string.')
get connected () {
let ret = 0
for (const { connected } of this[kClients].values()) {
ret += connected
}
return ret
}

let pool = this[kClients].get(origin)

if (!pool) {
pool = this[kFactory](origin)
.on('connect', this[kOnConnect])
.on('disconnect', this[kOnDisconnect])
get size () {
let ret = 0
for (const { size } of this[kClients].values()) {
ret += size
}
return ret
}

this[kClients].set(origin, pool)
get pending () {
let ret = 0
for (const { pending } of this[kClients].values()) {
ret += pending
}
return ret
}

return pool
get running () {
let ret = 0
for (const { running } of this[kClients].values()) {
ret += running
}
return ret
}

dispatch (opts, handler) {
Expand All @@ -76,6 +95,21 @@ class Agent extends EventEmitter {
throw new InvalidArgumentError('opts must be a object.')
}

if (typeof opts.origin !== 'string' || opts.origin === '') {
throw new InvalidArgumentError('Origin must be a non-empty string.')
}

let pool = this[kClients].get(opts.origin)

if (!pool) {
pool = this[kFactory](opts.origin)
.on('connect', this[kOnConnect])
.on('disconnect', this[kOnDisconnect])
.on('drain', this[kOnDrain])

this[kClients].set(opts.origin, pool)
}

const { maxRedirections = this[kMaxRedirections] } = opts

if (!Number.isInteger(maxRedirections) || maxRedirections < 0) {
Expand Down Expand Up @@ -130,21 +164,4 @@ class Agent extends EventEmitter {
}
}

let globalAgent = new Agent({ connections: null })

function setGlobalAgent (agent) {
if (!agent || typeof agent.get !== 'function') {
throw new InvalidArgumentError('Argument agent must implement Agent')
}
globalAgent = agent
}

function getGlobalAgent () {
return globalAgent
}

module.exports = {
setGlobalAgent,
getGlobalAgent,
Agent
}
module.exports = Agent
27 changes: 13 additions & 14 deletions lib/core/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -220,15 +220,6 @@ class Client extends EventEmitter {
return this[kQueue].length - this[kRunningIdx]
}

get busy () {
const socket = this[kSocket]
return (
(socket && (socket[kReset] || socket[kWriting])) ||
(this.size >= (this[kPipelining] || 1)) ||
this.pending > 0
)
}

get destroyed () {
return this[kDestroyed]
}
Expand Down Expand Up @@ -280,6 +271,8 @@ class Client extends EventEmitter {

handler.onError(err)
}

return this[kNeedDrain] > 0
}

close (callback) {
Expand Down Expand Up @@ -469,7 +462,7 @@ class Parser extends HTTPParser {
detachSocket(socket)
client[kSocket] = null
client[kQueue][client[kRunningIdx]++] = null
client.emit('disconnect', client, new InformationalError('upgrade'))
client.emit('disconnect', [client], new InformationalError('upgrade'))

try {
request.onUpgrade(statusCode, headers, socket)
Expand Down Expand Up @@ -752,7 +745,7 @@ function onSocketConnect () {
clearTimeout(this[kConnectTimeout])
this[kConnectTimeout] = null

client.emit('connect', client)
client.emit('connect', client[kUrl], [client])
resume(client)
}

Expand Down Expand Up @@ -840,7 +833,7 @@ function onSocketClose () {
// Retry remaining requests.
client[kPendingIdx] = client[kRunningIdx]

client.emit('disconnect', client, err)
client.emit('disconnect', client[kUrl], [client], err)
}

resume(client)
Expand Down Expand Up @@ -915,7 +908,7 @@ function connect (client) {

function emitDrain (client) {
client[kNeedDrain] = 0
client.emit('drain')
client.emit('drain', this[kUrl].origin)
}

function resume (client, sync) {
Expand Down Expand Up @@ -970,7 +963,13 @@ function _resume (client, sync) {
}
}

if (client.busy) {
const busy = (
(socket && (socket[kReset] || socket[kWriting])) ||
(this.size >= (this[kPipelining] || 1)) ||
this.pending > 0
)

if (busy) {
client[kNeedDrain] = 2
} else if (client[kNeedDrain] === 2) {
if (sync) {
Expand Down
File renamed without changes.
50 changes: 20 additions & 30 deletions lib/client-pool.js → lib/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const {
const FixedQueue = require('./node/fixed-queue')
const util = require('./core/util')
const { kTLSSession } = require('./core/symbols')
const assert = require('assert')

const kClients = Symbol('clients')
const kNeedDrain = Symbol('needDrain')
Expand Down Expand Up @@ -58,21 +59,25 @@ class Pool extends EventEmitter {

const pool = this

this[kOnDrain] = function onDrain () {
this[kOnDrain] = function onDrain (origin) {
assert(this[kUrl].origin === origin)

const queue = pool[kQueue]

while (!this.busy) {
this[kNeedDrain] = false

while (!this[kNeedDrain]) {
const item = queue.shift()
if (!item) {
break
}
pool[kPending]--
this.dispatch(item.opts, item.handler)
this[kNeedDrain] = this.dispatch(item.opts, item.handler)
}

if (pool[kNeedDrain] && !this.busy) {
if (pool[kNeedDrain] && !this[kNeedDrain]) {
pool[kNeedDrain] = false
pool.emit('drain')
pool.emit('drain', origin)
}

if (pool[kClosedResolve] && queue.isEmpty()) {
Expand All @@ -82,14 +87,14 @@ class Pool extends EventEmitter {
}
}

this[kOnConnect] = function onConnect (client) {
this[kOnConnect] = function onConnect (origin, clients) {
pool[kConnected]++
pool.emit('connect', client)
pool.emit('connect', origin, [...clients, pool])
}

this[kOnDisconnect] = function onDisconnect (client, err) {
this[kOnDisconnect] = function onDisconnect (origin, clients, err) {
pool[kConnected]--
pool.emit('disconnect', client, err)
pool.emit('disconnect', origin, [...clients, pool], err)
}

this[kOnTLSSession] = function cacheClientTLSSession (session) {
Expand All @@ -107,23 +112,6 @@ class Pool extends EventEmitter {
return this[kConnected]
}

get busy () {
if (this[kPending] > 0) {
return true
}

if (this[kConnections] && this[kClients].length === this[kConnections]) {
for (const { busy } of this[kClients]) {
if (!busy) {
return false
}
}
return true
}

return false
}

get pending () {
let ret = this[kPending]

Expand Down Expand Up @@ -164,7 +152,7 @@ class Pool extends EventEmitter {

dispatch (opts, handler) {
try {
if (opts.origin && opts.origin !== this.url.origin) {
if (opts.origin && opts.origin !== this[kUrl].origin) {
throw new InvalidArgumentError('origin')
}

Expand All @@ -176,7 +164,7 @@ class Pool extends EventEmitter {
throw new ClientClosedError()
}

let client = this[kClients].find(client => !client.busy)
let client = this[kClients].find(client => !client[kNeedDrain])

if (!client) {
if (!this[kConnections] || this[kClients].length < this[kConnections]) {
Expand All @@ -201,8 +189,8 @@ class Pool extends EventEmitter {
this[kQueue].push({ opts, handler })
this[kPending]++
} else {
client.dispatch(opts, handler)
if (client.busy && this.busy) {
client[kNeedDrain] = client.dispatch(opts, handler)
if (client[kNeedDrain]) {
this[kNeedDrain] = true
}
}
Expand All @@ -213,6 +201,8 @@ class Pool extends EventEmitter {

handler.onError(err)
}

return this[kNeedDrain]
}

close (cb) {
Expand Down

0 comments on commit e66d8b2

Please sign in to comment.