Skip to content

Commit

Permalink
refactor: Client.clients & Agent is a Client.
Browse files Browse the repository at this point in the history
Refs: #616
  • Loading branch information
ronag committed Mar 27, 2021
1 parent e91a79f commit f915b49
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 97 deletions.
42 changes: 28 additions & 14 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,45 +1,59 @@
'use strict'

const Client = require('./lib/core/client')
const Client = require('./lib/client')
const errors = require('./lib/core/errors')
const Pool = require('./lib/client-pool')
const { Agent, getGlobalAgent, setGlobalAgent } = require('./lib/agent')
const Pool = require('./lib/pool')
const Agent = require('./lib/agent')
const util = require('./lib/core/util')
const { InvalidArgumentError } = require('./lib/core/errors')
const api = require('./lib/api')

Object.assign(Client.prototype, api)
Object.assign(Pool.prototype, api)
Object.assign(Agent.prototype, api)

function undici (url, opts) {
return new Pool(url, opts)
}

module.exports = undici

module.exports.Pool = Pool
module.exports.Client = Client
module.exports.errors = errors

module.exports.Agent = Agent
module.exports.setGlobalAgent = setGlobalAgent
module.exports.getGlobalAgent = getGlobalAgent

function dispatchFromAgent (fn) {
return (url, { agent = getGlobalAgent(), method = 'GET', ...opts } = {}, ...additionalArgs) => {
let globalClient = new Agent()

function setGlobalClient (agent) {
if (!agent || typeof agent.dispatch !== 'function') {
throw new InvalidArgumentError('Argument agent must implement Agent')
}
globalClient = agent
}

function getGlobalClient () {
return globalClient
}

function makeDispatch (fn) {
return (url, { client = getGlobalClient(), method = 'GET', ...opts } = {}, ...additionalArgs) => {
if (opts.path != null) {
throw new InvalidArgumentError('unsupported opts.path')
}

const { origin, pathname, search } = util.parseURL(url)
const path = `${pathname}${search || ''}`

return fn.call(agent, { ...opts, origin, method, path }, ...additionalArgs)
return fn.call(client, { ...opts, origin, method, path }, ...additionalArgs)
}
}

module.exports.request = dispatchFromAgent(api.request)
module.exports.stream = dispatchFromAgent(api.stream)
module.exports.pipeline = dispatchFromAgent(api.pipeline)
module.exports.connect = dispatchFromAgent(api.connect)
module.exports.upgrade = dispatchFromAgent(api.upgrade)
module.exports.setGlobalClient = setGlobalClient
module.exports.getGlobalClient = getGlobalClient

module.exports.request = makeDispatch(api.request)
module.exports.stream = makeDispatch(api.stream)
module.exports.pipeline = makeDispatch(api.pipeline)
module.exports.connect = makeDispatch(api.connect)
module.exports.upgrade = makeDispatch(api.upgrade)
93 changes: 55 additions & 38 deletions lib/agent/index.js → lib/agent.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
'use strict'

const { InvalidArgumentError } = require('../core/errors')
const Pool = require('../client-pool')
const Client = require('../core/client')
const { InvalidArgumentError } = require('./core/errors')
const Pool = require('./pool')
const Client = require('./client')
const EventEmitter = require('events')
const util = require('../core/util')
const util = require('./core/util')
const assert = require('assert')
const RedirectHandler = require('./redirect')
const RedirectHandler = require('./handler/redirect')

const kOnConnect = Symbol('onConnect')
const kOnDisconnect = Symbol('onDisconnect')
const kClients = Symbol('clients')
const kMaxRedirections = Symbol('maxRedirections')
const kOnDrain = Symbol('onDrain')
const kFactory = Symbol('factory')

function defaultFactory (origin, opts) {
Expand All @@ -38,36 +39,54 @@ class Agent extends EventEmitter {

const agent = this

this[kOnConnect] = function onConnect (client) {
agent.emit('connect', client)
this[kOnConnect] = function onConnect (origin, clients) {
agent.emit('connect', origin, [...clients, agent])
}

this[kOnDisconnect] = function onDestroy (client, err) {
this[kOnDrain] = function onDrain (origin) {
agent.emit('drain', origin)
}

this[kOnDisconnect] = function onDestroy (origin, clients, err) {
if (this.connected === 0 && this.size === 0) {
this.off('disconnect', agent[kOnDisconnect])
agent[kClients].delete(this.origin)
}

agent.emit('disconnect', client, err)
agent.emit('disconnect', origin, [...clients, agent], err)
}
}

get (origin) {
if (typeof origin !== 'string' || origin === '') {
throw new InvalidArgumentError('Origin must be a non-empty string.')
get connected () {
let ret = 0
for (const { connected } of this[kClients].values()) {
ret += connected
}
return ret
}

let pool = this[kClients].get(origin)

if (!pool) {
pool = this[kFactory](origin)
.on('connect', this[kOnConnect])
.on('disconnect', this[kOnDisconnect])
get size () {
let ret = 0
for (const { size } of this[kClients].values()) {
ret += size
}
return ret
}

this[kClients].set(origin, pool)
get pending () {
let ret = 0
for (const { pending } of this[kClients].values()) {
ret += pending
}
return ret
}

return pool
get running () {
let ret = 0
for (const { running } of this[kClients].values()) {
ret += running
}
return ret
}

dispatch (opts, handler) {
Expand All @@ -76,6 +95,21 @@ class Agent extends EventEmitter {
throw new InvalidArgumentError('opts must be a object.')
}

if (typeof opts.origin !== 'string' || opts.origin === '') {
throw new InvalidArgumentError('Origin must be a non-empty string.')
}

let pool = this[kClients].get(opts.origin)

if (!pool) {
pool = this[kFactory](opts.origin)
.on('connect', this[kOnConnect])
.on('disconnect', this[kOnDisconnect])
.on('drain', this[kOnDrain])

this[kClients].set(opts.origin, pool)
}

const { maxRedirections = this[kMaxRedirections] } = opts

if (!Number.isInteger(maxRedirections) || maxRedirections < 0) {
Expand Down Expand Up @@ -130,21 +164,4 @@ class Agent extends EventEmitter {
}
}

let globalAgent = new Agent({ connections: null })

function setGlobalAgent (agent) {
if (!agent || typeof agent.get !== 'function') {
throw new InvalidArgumentError('Argument agent must implement Agent')
}
globalAgent = agent
}

function getGlobalAgent () {
return globalAgent
}

module.exports = {
setGlobalAgent,
getGlobalAgent,
Agent
}
module.exports = Agent
2 changes: 2 additions & 0 deletions lib/client.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
module.exports = require('./core/client')

27 changes: 13 additions & 14 deletions lib/core/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -220,15 +220,6 @@ class Client extends EventEmitter {
return this[kQueue].length - this[kRunningIdx]
}

get busy () {
const socket = this[kSocket]
return (
(socket && (socket[kReset] || socket[kWriting])) ||
(this.size >= (this[kPipelining] || 1)) ||
this.pending > 0
)
}

get destroyed () {
return this[kDestroyed]
}
Expand Down Expand Up @@ -280,6 +271,8 @@ class Client extends EventEmitter {

handler.onError(err)
}

return this[kNeedDrain] > 0
}

close (callback) {
Expand Down Expand Up @@ -469,7 +462,7 @@ class Parser extends HTTPParser {
detachSocket(socket)
client[kSocket] = null
client[kQueue][client[kRunningIdx]++] = null
client.emit('disconnect', client, new InformationalError('upgrade'))
client.emit('disconnect', [client], new InformationalError('upgrade'))

try {
request.onUpgrade(statusCode, headers, socket)
Expand Down Expand Up @@ -752,7 +745,7 @@ function onSocketConnect () {
clearTimeout(this[kConnectTimeout])
this[kConnectTimeout] = null

client.emit('connect', client)
client.emit('connect', client[kUrl], [client])
resume(client)
}

Expand Down Expand Up @@ -840,7 +833,7 @@ function onSocketClose () {
// Retry remaining requests.
client[kPendingIdx] = client[kRunningIdx]

client.emit('disconnect', client, err)
client.emit('disconnect', client[kUrl], [client], err)
}

resume(client)
Expand Down Expand Up @@ -915,7 +908,7 @@ function connect (client) {

function emitDrain (client) {
client[kNeedDrain] = 0
client.emit('drain')
client.emit('drain', this[kUrl].origin)
}

function resume (client, sync) {
Expand Down Expand Up @@ -970,7 +963,13 @@ function _resume (client, sync) {
}
}

if (client.busy) {
const busy = (
(socket && (socket[kReset] || socket[kWriting])) ||
(this.size >= (this[kPipelining] || 1)) ||
this.pending > 0
)

if (busy) {
client[kNeedDrain] = 2
} else if (client[kNeedDrain] === 2) {
if (sync) {
Expand Down
File renamed without changes.
Loading

0 comments on commit f915b49

Please sign in to comment.