Skip to content

Commit

Permalink
Create DispatchInterceptors
Browse files Browse the repository at this point in the history
  • Loading branch information
arontsang committed Apr 17, 2022
1 parent f5d9a94 commit d54794f
Show file tree
Hide file tree
Showing 12 changed files with 131 additions and 25 deletions.
14 changes: 6 additions & 8 deletions lib/agent.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
'use strict'

const { InvalidArgumentError } = require('./core/errors')
const { kClients, kRunning, kClose, kDestroy, kDispatch } = require('./core/symbols')
const { kClients, kRunning, kClose, kDestroy, kDispatch, kInterceptors } = require('./core/symbols')
const DispatcherBase = require('./dispatcher-base')
const Pool = require('./pool')
const Client = require('./client')
const util = require('./core/util')
const RedirectHandler = require('./handler/redirect')
const RedirectInterceptor = require('./interceptor/redirect')
const { WeakRef, FinalizationRegistry } = require('./compat/dispatcher-weakref')()

const kOnConnect = Symbol('onConnect')
Expand Down Expand Up @@ -44,6 +44,10 @@ class Agent extends DispatcherBase {
connect = { ...connect }
}

this[kInterceptors] = options.interceptors && options.interceptors.Agent && Array.isArray(options.interceptors.Agent)
? options.interceptors.Agent
: [RedirectInterceptor({ maxRedirections })]

this[kOptions] = { ...util.deepClone(options), connect }
this[kMaxRedirections] = maxRedirections
this[kFactory] = factory
Expand Down Expand Up @@ -108,12 +112,6 @@ class Agent extends DispatcherBase {
this[kFinalizer].register(dispatcher, key)
}

const { maxRedirections = this[kMaxRedirections] } = opts
if (maxRedirections != null && maxRedirections !== 0) {
opts = { ...opts, maxRedirections: 0 } // Stop sub dispatcher from also redirecting.
handler = new RedirectHandler(this, maxRedirections, opts, handler)
}

return dispatcher.dispatch(opts, handler)
}

Expand Down
5 changes: 4 additions & 1 deletion lib/balanced-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const {
kGetDispatcher
} = require('./pool-base')
const Pool = require('./pool')
const { kUrl } = require('./core/symbols')
const { kUrl, kInterceptors } = require('./core/symbols')
const { parseOrigin } = require('./core/util')
const kFactory = Symbol('factory')

Expand All @@ -37,6 +37,9 @@ class BalancedPool extends PoolBase {
throw new InvalidArgumentError('factory must be a function.')
}

this[kInterceptors] = opts.interceptors && opts.interceptors.BalancedPool && Array.isArray(opts.interceptors.BalancedPool)
? opts.interceptors.BalancedPool
: []
this[kFactory] = factory

for (const upstream of upstreams) {
Expand Down
14 changes: 7 additions & 7 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ const net = require('net')
const util = require('./core/util')
const Request = require('./core/request')
const DispatcherBase = require('./dispatcher-base')
const RedirectHandler = require('./handler/redirect')
const {
RequestContentLengthMismatchError,
ResponseContentLengthMismatchError,
Expand Down Expand Up @@ -61,7 +60,8 @@ const {
kCounter,
kClose,
kDestroy,
kDispatch
kDispatch,
kInterceptors
} = require('./core/symbols')

const kClosedResolve = Symbol('kClosedResolve')
Expand All @@ -83,6 +83,7 @@ try {

class Client extends DispatcherBase {
constructor (url, {
interceptors,
maxHeaderSize,
headersTimeout,
socketTimeout,
Expand Down Expand Up @@ -180,6 +181,9 @@ class Client extends DispatcherBase {
})
}

this[kInterceptors] = interceptors && interceptors.Client && Array.isArray(interceptors.Client)
? interceptors.Client
: [RedirectInterceptor({ maxRedirections })]
this[kUrl] = util.parseOrigin(url)
this[kConnector] = connect
this[kSocket] = null
Expand Down Expand Up @@ -255,11 +259,6 @@ class Client extends DispatcherBase {
}

[kDispatch] (opts, handler) {
const { maxRedirections = this[kMaxRedirections] } = opts
if (maxRedirections) {
handler = new RedirectHandler(this, maxRedirections, opts, handler)
}

const origin = opts.origin || this[kUrl].origin

const request = new Request(origin, opts, handler)
Expand Down Expand Up @@ -320,6 +319,7 @@ class Client extends DispatcherBase {
}

const constants = require('./llhttp/constants')
const RedirectInterceptor = require('./interceptor/redirect')
const EMPTY_BUF = Buffer.alloc(0)

async function lazyllhttp () {
Expand Down
3 changes: 2 additions & 1 deletion lib/core/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,6 @@ module.exports = {
kMaxRedirections: Symbol('maxRedirections'),
kMaxRequests: Symbol('maxRequestsPerClient'),
kProxy: Symbol('proxy agent options'),
kCounter: Symbol('socket request counter')
kCounter: Symbol('socket request counter'),
kInterceptors: Symbol('dispatch interceptors')
}
19 changes: 17 additions & 2 deletions lib/dispatcher-base.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ const {
ClientClosedError,
InvalidArgumentError
} = require('./core/errors')
const { kDestroy, kClose, kDispatch } = require('./core/symbols')
const { kDestroy, kClose, kDispatch, kInterceptors } = require('./core/symbols')

const kDestroyed = Symbol('destroyed')
const kClosed = Symbol('closed')
const kOnDestroyed = Symbol('onDestroyed')
const kOnClosed = Symbol('onClosed')
const kBuildInterceptedDispatch = Symbol('Build Dispatch')

class DispatcherBase extends Dispatcher {
constructor () {
Expand All @@ -31,6 +32,10 @@ class DispatcherBase extends Dispatcher {
return this[kClosed]
}

get interceptors () {
return this[kInterceptors]
}

close (callback) {
if (callback === undefined) {
return new Promise((resolve, reject) => {
Expand Down Expand Up @@ -125,6 +130,15 @@ class DispatcherBase extends Dispatcher {
})
}

[kBuildInterceptedDispatch] () {
let dispatch = this[kDispatch].bind(this)
for (let i = this[kInterceptors].length - 1; i >= 0; i--) {
dispatch = this[kInterceptors][i](dispatch)
}

return dispatch
}

dispatch (opts, handler) {
if (!handler || typeof handler !== 'object') {
throw new InvalidArgumentError('handler must be an object')
Expand All @@ -143,7 +157,8 @@ class DispatcherBase extends Dispatcher {
throw new ClientClosedError()
}

return this[kDispatch](opts, handler)
const dispatch = this[kBuildInterceptedDispatch]()
return dispatch(opts, handler)
} catch (err) {
if (typeof handler.onError !== 'function') {
throw new InvalidArgumentError('invalid onError method')
Expand Down
6 changes: 3 additions & 3 deletions lib/handler/redirect.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ class BodyAsyncIterable {
}

class RedirectHandler {
constructor (dispatcher, maxRedirections, opts, handler) {
constructor (dispatch, maxRedirections, opts, handler) {
if (maxRedirections != null && (!Number.isInteger(maxRedirections) || maxRedirections < 0)) {
throw new InvalidArgumentError('maxRedirections must be a positive number')
}

util.validateHandler(handler, opts.method, opts.upgrade)

this.dispatcher = dispatcher
this.dispatch = dispatch
this.location = null
this.abort = null
this.opts = { ...opts, maxRedirections: 0 } // opts must be a copy
Expand Down Expand Up @@ -156,7 +156,7 @@ class RedirectHandler {
this.location = null
this.abort = null

this.dispatcher.dispatch(this.opts, this)
this.dispatch(this.opts, this)
} else {
this.handler.onComplete(trailers)
}
Expand Down
20 changes: 20 additions & 0 deletions lib/interceptor/redirect.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@

const RedirectHandler = require('../handler/redirect')

function RedirectInterceptor ({ maxRedirections: defaultMaxRedirections }) {
return (dispatch) => {
return function Intercept (opts, handler) {
const { maxRedirections = defaultMaxRedirections } = opts

if (!maxRedirections) {
return dispatch(opts, handler)
}

const redirectHandler = new RedirectHandler(dispatch, maxRedirections, opts, handler)
opts = { ...opts, maxRedirections: 0 } // Stop sub dispatcher from also redirecting.
return dispatch(opts, redirectHandler)
}
}
}

module.exports = RedirectInterceptor
5 changes: 4 additions & 1 deletion lib/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const {
InvalidArgumentError
} = require('./core/errors')
const util = require('./core/util')
const { kUrl } = require('./core/symbols')
const { kUrl, kInterceptors } = require('./core/symbols')
const buildConnector = require('./core/connect')

const kOptions = Symbol('options')
Expand Down Expand Up @@ -58,6 +58,9 @@ class Pool extends PoolBase {
})
}

this[kInterceptors] = options.interceptors && options.interceptors.Pool && Array.isArray(options.interceptors.Pool)
? options.interceptors.Pool
: []
this[kConnections] = connections || null
this[kUrl] = util.parseOrigin(origin)
this[kOptions] = { ...util.deepClone(options), connect }
Expand Down
3 changes: 3 additions & 0 deletions types/agent.d.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { URL } from 'url'
import Dispatcher = require('./dispatcher')
import Pool = require('./pool')
import {DispatchInterceptor} from "./dispatcher";

export = Agent

Expand All @@ -20,6 +21,8 @@ declare namespace Agent {
factory?(origin: URL, opts: Object): Dispatcher;
/** Integer. Default: `0` */
maxRedirections?: number;

interceptors?: { Agent?: DispatchInterceptor[] } & Pool.Options["interceptors"]
}

export interface DispatchOptions extends Dispatcher.DispatchOptions {
Expand Down
4 changes: 3 additions & 1 deletion types/client.d.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { URL } from 'url'
import { TlsOptions } from 'tls'
import Dispatcher = require('./dispatcher')
import { DispatchOptions, RequestOptions } from './dispatcher'
import {DispatchInterceptor, DispatchOptions, RequestOptions} from './dispatcher'
import buildConnector = require('./connector')

export = Client
Expand Down Expand Up @@ -41,6 +41,8 @@ declare namespace Client {
tls?: TlsOptions | null;
/** */
maxRequestsPerClient?: number;

interceptors?: {Client: DispatchInterceptor[] | undefined}
}

export interface SocketInfo {
Expand Down
60 changes: 59 additions & 1 deletion types/dispatcher.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ import { Duplex, Readable, Writable } from 'stream'
import { EventEmitter } from 'events'
import { IncomingHttpHeaders } from 'http'
import { Blob } from 'buffer'
import BodyReadable from './readable'
import type BodyReadable from './readable'
import { FormData } from './formdata'
import { UndiciError } from './errors'

type AbortSignal = unknown;

Expand Down Expand Up @@ -36,6 +37,59 @@ declare class Dispatcher extends EventEmitter {
destroy(err: Error | null): Promise<void>;
destroy(callback: () => void): void;
destroy(err: Error | null, callback: () => void): void;

on(eventName: 'connect', callback: (origin: URL, targets: Array<Dispatcher>) => void): this;
on(eventName: 'disconnect', callback: (origin: URL, targets: Array<Dispatcher>, error: UndiciError) => void): this;
on(eventName: 'connectionError', callback: (origin: URL, targets: Array<Dispatcher>, error: UndiciError) => void): this;
on(eventName: 'drain', callback: (origin: URL) => void): this;


once(eventName: 'connect', callback: (origin: URL, targets: Array<Dispatcher>) => void): this;
once(eventName: 'disconnect', callback: (origin: URL, targets: Array<Dispatcher>, error: UndiciError) => void): this;
once(eventName: 'connectionError', callback: (origin: URL, targets: Array<Dispatcher>, error: UndiciError) => void): this;
once(eventName: 'drain', callback: (origin: URL) => void): this;


off(eventName: 'connect', callback: (origin: URL, targets: Array<Dispatcher>) => void): this;
off(eventName: 'disconnect', callback: (origin: URL, targets: Array<Dispatcher>, error: UndiciError) => void): this;
off(eventName: 'connectionError', callback: (origin: URL, targets: Array<Dispatcher>, error: UndiciError) => void): this;
off(eventName: 'drain', callback: (origin: URL) => void): this;


addListener(eventName: 'connect', callback: (origin: URL, targets: Array<Dispatcher>) => void): this;
addListener(eventName: 'disconnect', callback: (origin: URL, targets: Array<Dispatcher>, error: UndiciError) => void): this;
addListener(eventName: 'connectionError', callback: (origin: URL, targets: Array<Dispatcher>, error: UndiciError) => void): this;
addListener(eventName: 'drain', callback: (origin: URL) => void): this;

removeListener(eventName: 'connect', callback: (origin: URL, targets: Array<Dispatcher>) => void): this;
removeListener(eventName: 'disconnect', callback: (origin: URL, targets: Array<Dispatcher>, error: UndiciError) => void): this;
removeListener(eventName: 'connectionError', callback: (origin: URL, targets: Array<Dispatcher>, error: UndiciError) => void): this;
removeListener(eventName: 'drain', callback: (origin: URL) => void): this;

prependListener(eventName: 'connect', callback: (origin: URL, targets: Array<Dispatcher>) => void): this;
prependListener(eventName: 'disconnect', callback: (origin: URL, targets: Array<Dispatcher>, error: UndiciError) => void): this;
prependListener(eventName: 'connectionError', callback: (origin: URL, targets: Array<Dispatcher>, error: UndiciError) => void): this;
prependListener(eventName: 'drain', callback: (origin: URL) => void): this;

prependOnceListener(eventName: 'connect', callback: (origin: URL, targets: Array<Dispatcher>) => void): this;
prependOnceListener(eventName: 'disconnect', callback: (origin: URL, targets: Array<Dispatcher>, error: UndiciError) => void): this;
prependOnceListener(eventName: 'connectionError', callback: (origin: URL, targets: Array<Dispatcher>, error: UndiciError) => void): this;
prependOnceListener(eventName: 'drain', callback: (origin: URL) => void): this;

listeners(eventName: 'connect'): ((origin: URL, targets: Array<Dispatcher>) => void)[]
listeners(eventName: 'disconnect'): ((origin: URL, targets: Array<Dispatcher>, error: UndiciError) => void)[];
listeners(eventName: 'connectionError'): ((origin: URL, targets: Array<Dispatcher>, error: UndiciError) => void)[];
listeners(eventName: 'drain'): ((origin: URL) => void)[];

rawListeners(eventName: 'connect'): ((origin: URL, targets: Array<Dispatcher>) => void)[]
rawListeners(eventName: 'disconnect'): ((origin: URL, targets: Array<Dispatcher>, error: UndiciError) => void)[];
rawListeners(eventName: 'connectionError'): ((origin: URL, targets: Array<Dispatcher>, error: UndiciError) => void)[];
rawListeners(eventName: 'drain'): ((origin: URL) => void)[];

emit(eventName: 'connect', origin: URL, targets: Array<Dispatcher>): boolean;
emit(eventName: 'disconnect', origin: URL, targets: Array<Dispatcher>, error: UndiciError): boolean;
emit(eventName: 'connectionError', origin: URL, targets: Array<Dispatcher>, error: UndiciError): boolean;
emit(eventName: 'drain', origin: URL): boolean;
}

declare namespace Dispatcher {
Expand Down Expand Up @@ -168,4 +222,8 @@ declare namespace Dispatcher {
json(): Promise<any>;
text(): Promise<string>;
}

export interface DispatchInterceptor {
(dispatch: Dispatcher['dispatch']): Dispatcher['dispatch']
}
}
3 changes: 3 additions & 0 deletions types/pool.d.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import Client = require('./client')
import Dispatcher = require('./dispatcher')
import { URL } from 'url'
import {DispatchInterceptor} from "./dispatcher";

export = Pool

Expand All @@ -18,5 +19,7 @@ declare namespace Pool {
factory?(origin: URL, opts: object): Dispatcher;
/** The max number of clients to create. `null` if no limit. Default `null`. */
connections?: number | null;

interceptors?: { Pool?: DispatchInterceptor[] } & Client.Options["interceptors"]
}
}

0 comments on commit d54794f

Please sign in to comment.