From de614176b25a92a3ef6c603fc92208a89c27d7fe Mon Sep 17 00:00:00 2001 From: Manzoor Wani Date: Sat, 23 May 2026 00:38:54 +0900 Subject: [PATCH] fix: dispatch pending requests when a connection fails MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Requests queued behind the agent's maxSockets limit hang forever when their connections fail. agent-base parks a fake placeholder socket while connecting; on a connect() rejection it removes only that placeholder, so no real socket ever enters Node's pool. Node's removeSocket() — which drains agent.requests[name] — only runs off a real socket's lifecycle events, so the queue is never drained. Override createSocket() and, on a connection failure, dispatch the next queued request via #drainPendingRequests() so the queue keeps moving. This is a defensive fix that can be removed once agent-base fixes its createSocket failure path. Ref: npm/cli#9386 Ref: TooTallNate/proxy-agents#427 --- lib/agents.js | 52 +++++++++++ test/drain-on-connect-failure.js | 156 +++++++++++++++++++++++++++++++ 2 files changed, 208 insertions(+) create mode 100644 test/drain-on-connect-failure.js diff --git a/lib/agents.js b/lib/agents.js index c541b93..e9624df 100644 --- a/lib/agents.js +++ b/lib/agents.js @@ -203,4 +203,56 @@ module.exports = class Agent extends AgentBase { return super.addRequest(request, options) } + + // When connect() rejects, agent-base removes only its placeholder socket, so Node never drains this.requests[name] and requests queued past maxSockets hang forever. + // On a failure we dispatch the next queued request ourselves. + // See npm/cli#9386 and TooTallNate/proxy-agents#427. + createSocket (req, options, cb) { + super.createSocket(req, options, (err, socket) => { + if (err) { + this.#drainPendingRequests(req, options) + } + cb(err, socket) + }) + } + + // Dispatch the next request queued behind maxSockets, reusing the slot the failed connection freed. + #drainPendingRequests (failedReq, options) { + const name = this.getName(options) + const queue = this.requests[name] + if (!queue || queue.length === 0) { + return + } + + // Node's removeSocket() picks a queued request without shifting it off, so drop the failed one to avoid dispatching it twice. + const failedIndex = queue.indexOf(failedReq) + if (failedIndex !== -1) { + queue.splice(failedIndex, 1) + } + if (queue.length === 0) { + delete this.requests[name] + return + } + + // Safety belt: only dispatch if a socket slot is genuinely free. + const socketCount = this.sockets[name] ? this.sockets[name].length : 0 + if (socketCount >= this.maxSockets || this.totalSocketCount >= this.maxTotalSockets) { + return + } + + const nextReq = queue.shift() + if (queue.length === 0) { + delete this.requests[name] + } + + // All queued requests share this origin, so the failed request's options suit the next one. + // createSocket() recurses here if this connection also fails, draining the whole queue. + this.createSocket(nextReq, options, (err, socket) => { + if (err) { + nextReq.onSocket(null, err) + } else { + nextReq.onSocket(socket) + } + }) + } } diff --git a/test/drain-on-connect-failure.js b/test/drain-on-connect-failure.js new file mode 100644 index 0000000..46ff54c --- /dev/null +++ b/test/drain-on-connect-failure.js @@ -0,0 +1,156 @@ +'use strict' + +const t = require('tap') +const http = require('http') +const { Agent } = require('../lib/index.js') + +// Start an http server on a random port and return it once it is listening. +const startServer = async (t) => { + const server = http.createServer((req, res) => res.end('OK!')) + await new Promise((resolve) => server.listen(0, '127.0.0.1', resolve)) + t.teardown(() => new Promise((resolve) => server.close(resolve))) + return server.address().port +} + +// Find a port that nothing is listening on, so every connect() rejects with ECONNREFUSED. +const closedPort = async () => { + return new Promise((resolve) => { + const probe = http.createServer() + probe.listen(0, '127.0.0.1', () => { + const { port } = probe.address() + probe.close(() => resolve(port)) + }) + }) +} + +t.test('queued requests are dispatched when connections fail (npm/cli#9386)', async (t) => { + const port = await closedPort() + + const agent = new Agent({ maxSockets: 2 }) + const TOTAL = 6 + const errors = [] + + await new Promise((resolve, reject) => { + const timer = setTimeout( + () => reject(new Error('requests queued past maxSockets were never dispatched (hang)')), + 10000 + ) + for (let i = 0; i < TOTAL; i++) { + const req = http.request({ host: '127.0.0.1', port, agent }) + req.on('error', (err) => { + errors.push(err) + if (errors.length === TOTAL) { + // brief grace period to catch any double-dispatch (a 7th error) + setTimeout(() => { + clearTimeout(timer) + resolve() + }, 200) + } + }) + req.end() + } + }) + + t.equal(errors.length, TOTAL, 'every queued request settled exactly once') +}) + +t.test('a queued request whose connection succeeds is assigned a socket', async (t) => { + const port = await startServer(t) + + const agent = new Agent({ maxSockets: 1 }) + t.teardown(() => agent.destroy()) + + // Fail only the first connection: A fails, the queued B is drained and succeeds. + const realConnect = agent.connect.bind(agent) + let connectCalls = 0 + agent.connect = function (request, options) { + connectCalls += 1 + if (connectCalls === 1) { + return Promise.reject(Object.assign(new Error('refused'), { code: 'ECONNREFUSED' })) + } + return realConnect(request, options) + } + + const request = (label) => new Promise((resolve) => { + const req = http.request({ host: '127.0.0.1', port, agent }) + req.on('error', (err) => resolve({ label, error: err })) + req.on('response', (res) => { + res.resume() + res.on('end', () => resolve({ label, status: res.statusCode })) + }) + req.end() + }) + + const [a, b] = await Promise.all([request('A'), request('B')]) + t.equal(a.error?.code, 'ECONNREFUSED', 'the request whose connection failed errors') + t.equal(b.status, 200, 'the queued request is dispatched and completes') +}) + +t.test('a failed request picked by removeSocket is not dispatched twice', async (t) => { + const port = await startServer(t) + + // keepAlive is off so A's socket closes after its response, triggering removeSocket() to pick queued request B. + const agent = new Agent({ maxSockets: 1, keepAlive: false }) + t.teardown(() => agent.destroy()) + + // Request A succeeds; the connection removeSocket() makes for B then fails. + const realConnect = agent.connect.bind(agent) + let connectCalls = 0 + agent.connect = function (request, options) { + connectCalls += 1 + if (connectCalls === 1) { + return realConnect(request, options) + } + return Promise.reject(Object.assign(new Error('refused'), { code: 'ECONNREFUSED' })) + } + + let bErrors = 0 + const aDone = new Promise((resolve) => { + const req = http.request({ host: '127.0.0.1', port, agent }) + req.on('error', resolve) + req.on('response', (res) => { + res.resume() + res.on('end', resolve) + }) + req.end() + }) + const bDone = new Promise((resolve) => { + const req = http.request({ host: '127.0.0.1', port, agent }) + req.on('error', () => { + bErrors += 1 + // brief grace period to catch a double-dispatch erroring B a second time + setTimeout(resolve, 300) + }) + req.end() + }) + + await Promise.all([aDone, bDone]) + t.equal(bErrors, 1, 'the queued request that fails errors exactly once') +}) + +t.test('a connection failure does not dispatch a request when no slot is free', async (t) => { + const port = await closedPort() + + const agent = new Agent({ maxSockets: 1 }) + const options = { host: '127.0.0.1', port } + const name = agent.getName(options) + + // The single maxSockets slot is already taken, with a request queued behind it. + // A connection failure must not dispatch that request, since no slot is free. + let dispatched = false + const queuedRequest = { + onSocket: () => { + dispatched = true + }, + } + agent.requests[name] = [queuedRequest] + agent.sockets[name] = [{}] + + const err = await new Promise((resolve) => { + agent.createSocket({}, options, (e) => resolve(e)) + }) + + t.equal(err?.code, 'ECONNREFUSED', 'the connection failed') + t.equal(dispatched, false, 'the queued request was not dispatched') + t.equal(agent.requests[name]?.length, 1, 'the queued request is left in the queue') +})