Skip to content

Commit

Permalink
refactor: Client.clients & Agent is a Client.
Browse files Browse the repository at this point in the history
Refs: #616
  • Loading branch information
ronag committed Mar 27, 2021
1 parent 0c92094 commit ee1ccff
Show file tree
Hide file tree
Showing 13 changed files with 301 additions and 247 deletions.
46 changes: 32 additions & 14 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,45 +1,63 @@
'use strict'

const Client = require('./lib/core/client')
const Client = require('./lib/client')
const errors = require('./lib/core/errors')
const Pool = require('./lib/client-pool')
const { Agent, getGlobalAgent, setGlobalAgent } = require('./lib/agent')
const Pool = require('./lib/pool')
const Agent = require('./lib/agent')
const util = require('./lib/core/util')
const { InvalidArgumentError } = require('./lib/core/errors')
const api = require('./lib/api')

Object.assign(Client.prototype, api)
Object.assign(Pool.prototype, api)
Object.assign(Agent.prototype, api)

function undici (url, opts) {
return new Pool(url, opts)
}

module.exports = undici

module.exports.Pool = Pool
module.exports.Client = Client
module.exports.errors = errors

module.exports.Agent = Agent
module.exports.setGlobalAgent = setGlobalAgent
module.exports.getGlobalAgent = getGlobalAgent

function dispatchFromAgent (fn) {
return (url, { agent = getGlobalAgent(), method = 'GET', ...opts } = {}, ...additionalArgs) => {
let globalClient = new Agent()

function setGlobalClient (agent) {
if (!agent || typeof agent.dispatch !== 'function') {
throw new InvalidArgumentError('Argument agent must implement Agent')
}
globalClient = agent
}

function getGlobalClient () {
return globalClient
}

function makeDispatch (fn) {
return (url, { agent, client = getGlobalClient(), method = 'GET', ...opts } = {}, ...additionalArgs) => {
if (opts.path != null) {
throw new InvalidArgumentError('unsupported opts.path')
}

if (agent) {
throw new InvalidArgumentError('unsupported opts.agent. Did you mean opts.client?')
}

const { origin, pathname, search } = util.parseURL(url)
const path = search ? `${pathname}${search}` : pathname

return fn.call(agent, { ...opts, origin, method, path }, ...additionalArgs)
return fn.call(client, { ...opts, origin, method, path }, ...additionalArgs)
}
}

module.exports.request = dispatchFromAgent(api.request)
module.exports.stream = dispatchFromAgent(api.stream)
module.exports.pipeline = dispatchFromAgent(api.pipeline)
module.exports.connect = dispatchFromAgent(api.connect)
module.exports.upgrade = dispatchFromAgent(api.upgrade)
module.exports.setGlobalClient = setGlobalClient
module.exports.getGlobalClient = getGlobalClient

module.exports.request = makeDispatch(api.request)
module.exports.stream = makeDispatch(api.stream)
module.exports.pipeline = makeDispatch(api.pipeline)
module.exports.connect = makeDispatch(api.connect)
module.exports.upgrade = makeDispatch(api.upgrade)
98 changes: 56 additions & 42 deletions lib/agent/index.js → lib/agent.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
'use strict'

const { InvalidArgumentError } = require('../core/errors')
const Pool = require('../client-pool')
const Client = require('../core/client')
const { InvalidArgumentError } = require('./core/errors')
const Pool = require('./pool')
const Client = require('./client')
const EventEmitter = require('events')
const util = require('../core/util')
const util = require('./core/util')
const assert = require('assert')
const RedirectHandler = require('./redirect')
const RedirectHandler = require('./handler/redirect')

const kOnConnect = Symbol('onConnect')
const kOnDisconnect = Symbol('onDisconnect')
const kClients = Symbol('clients')
const kMaxRedirections = Symbol('maxRedirections')
const kOnDrain = Symbol('onDrain')
const kFactory = Symbol('factory')

function defaultFactory (origin, opts) {
Expand All @@ -38,36 +39,54 @@ class Agent extends EventEmitter {

const agent = this

this[kOnConnect] = function onConnect (client) {
agent.emit('connect', client)
this[kOnConnect] = function onConnect (origin, clients) {
agent.emit('connect', origin, [...clients, agent])
}

this[kOnDisconnect] = function onDestroy (client, err) {
this[kOnDrain] = function onDrain (origin) {
agent.emit('drain', origin)
}

this[kOnDisconnect] = function onDestroy (origin, clients, err) {
if (this.connected === 0 && this.size === 0) {
this.off('disconnect', agent[kOnDisconnect])
agent[kClients].delete(this.origin)
}

agent.emit('disconnect', client, err)
agent.emit('disconnect', origin, [...clients, agent], err)
}
}

get (origin) {
if (typeof origin !== 'string' || origin === '') {
throw new InvalidArgumentError('Origin must be a non-empty string.')
get connected () {
let ret = 0
for (const { connected } of this[kClients].values()) {
ret += connected
}
return ret
}

let pool = this[kClients].get(origin)

if (!pool) {
pool = this[kFactory](origin)
.on('connect', this[kOnConnect])
.on('disconnect', this[kOnDisconnect])
get size () {
let ret = 0
for (const { size } of this[kClients].values()) {
ret += size
}
return ret
}

this[kClients].set(origin, pool)
get pending () {
let ret = 0
for (const { pending } of this[kClients].values()) {
ret += pending
}
return ret
}

return pool
get running () {
let ret = 0
for (const { running } of this[kClients].values()) {
ret += running
}
return ret
}

dispatch (opts, handler) {
Expand All @@ -80,14 +99,26 @@ class Agent extends EventEmitter {
throw new InvalidArgumentError('opts must be a object.')
}

if (typeof opts.origin !== 'string' || opts.origin === '') {
throw new InvalidArgumentError('Origin must be a non-empty string.')
}

let client = this[kClients].get(opts.origin)

if (!client) {
client = this[kFactory](opts.origin)
.on('connect', this[kOnConnect])
.on('disconnect', this[kOnDisconnect])
.on('drain', this[kOnDrain])

this[kClients].set(opts.origin, client)
}

const { maxRedirections = this[kMaxRedirections] } = opts

if (!Number.isInteger(maxRedirections) || maxRedirections < 0) {
throw new InvalidArgumentError('maxRedirections must be a positive number')
}

const client = this.get(opts.origin)

if (!maxRedirections) {
return client.dispatch(opts, handler)
}
Expand All @@ -107,7 +138,7 @@ class Agent extends EventEmitter {
})
}

client.dispatch(opts, new RedirectHandler(this, opts, handler))
return client.dispatch(opts, new RedirectHandler(this, opts, handler))
} catch (err) {
if (typeof handler.onError !== 'function') {
throw new InvalidArgumentError('invalid onError method')
Expand All @@ -134,21 +165,4 @@ class Agent extends EventEmitter {
}
}

let globalAgent = new Agent({ connections: null })

function setGlobalAgent (agent) {
if (!agent || typeof agent.get !== 'function') {
throw new InvalidArgumentError('Argument agent must implement Agent')
}
globalAgent = agent
}

function getGlobalAgent () {
return globalAgent
}

module.exports = {
setGlobalAgent,
getGlobalAgent,
Agent
}
module.exports = Agent
1 change: 1 addition & 0 deletions lib/client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
module.exports = require('./core/client')
36 changes: 25 additions & 11 deletions lib/core/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -183,14 +183,17 @@ class Client extends EventEmitter {
this[kPendingIdx] = 0
}

// TODO: Make private?
get url () {
return this[kUrl]
}

// TODO: Make private?
get pipelining () {
return this[kPipelining]
}

// TODO: Make private?
set pipelining (value) {
this[kPipelining] = value
resume(this, true)
Expand Down Expand Up @@ -220,13 +223,9 @@ class Client extends EventEmitter {
return this[kQueue].length - this[kRunningIdx]
}

// TODO: Make private?
get busy () {
const socket = this[kSocket]
return (
(socket && (socket[kReset] || socket[kWriting])) ||
(this.size >= (this[kPipelining] || 1)) ||
this.pending > 0
)
return this[kNeedDrain] > 1
}

get destroyed () {
Expand Down Expand Up @@ -277,13 +276,19 @@ class Client extends EventEmitter {
} else {
resume(this, true)
}

if (this[kResuming] && this[kNeedDrain] !== 2 && isBusy(this)) {
this[kNeedDrain] = 2
}
} catch (err) {
if (typeof handler.onError !== 'function') {
throw new InvalidArgumentError('invalid onError method')
}

handler.onError(err)
}

return this[kNeedDrain] < 2
}

close (callback) {
Expand Down Expand Up @@ -473,7 +478,7 @@ class Parser extends HTTPParser {
detachSocket(socket)
client[kSocket] = null
client[kQueue][client[kRunningIdx]++] = null
client.emit('disconnect', client, new InformationalError('upgrade'))
client.emit('disconnect', client[kUrl], [client], new InformationalError('upgrade'))

try {
request.onUpgrade(statusCode, headers, socket)
Expand Down Expand Up @@ -756,7 +761,7 @@ function onSocketConnect () {
clearTimeout(this[kConnectTimeout])
this[kConnectTimeout] = null

client.emit('connect', client)
client.emit('connect', client[kUrl], [client])
resume(client)
}

Expand Down Expand Up @@ -844,7 +849,7 @@ function onSocketClose () {
// Retry remaining requests.
client[kPendingIdx] = client[kRunningIdx]

client.emit('disconnect', client, err)
client.emit('disconnect', client[kUrl], [client], err)
}

resume(client)
Expand Down Expand Up @@ -919,7 +924,16 @@ function connect (client) {

function emitDrain (client) {
client[kNeedDrain] = 0
client.emit('drain')
client.emit('drain', client[kUrl].origin)
}

function isBusy (client) {
const socket = client[kSocket]
return (
(socket && (socket[kReset] || socket[kWriting])) ||
(client.size >= (client[kPipelining] || 1)) ||
client.pending > 0
)
}

function resume (client, sync) {
Expand Down Expand Up @@ -974,7 +988,7 @@ function _resume (client, sync) {
}
}

if (client.busy) {
if (isBusy(client)) {
client[kNeedDrain] = 2
} else if (client[kNeedDrain] === 2) {
if (sync) {
Expand Down
File renamed without changes.
Loading

0 comments on commit ee1ccff

Please sign in to comment.