diff --git a/src/cmap/connection.ts b/src/cmap/connection.ts index 50f3cd099e..682023a060 100644 --- a/src/cmap/connection.ts +++ b/src/cmap/connection.ts @@ -194,38 +194,9 @@ export class Connection extends EventEmitter { /* ignore errors, listen to `close` instead */ }); - stream.on('close', () => { - if (this.closed) { - return; - } - - this.closed = true; - this[kQueue].forEach(op => - op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`)) - ); - this[kQueue].clear(); - - this.emit('close'); - }); - - stream.on('timeout', () => { - if (this.closed) { - return; - } - - stream.destroy(); - this.closed = true; - this[kQueue].forEach(op => - op.cb( - new MongoNetworkTimeoutError(`connection ${this.id} to ${this.address} timed out`, { - beforeHandshake: this[kIsMaster] == null - }) - ) - ); - - this[kQueue].clear(); - this.emit('close'); - }); + this[kMessageStream].on('error', error => this.handleIssue({ destroy: error })); + stream.on('close', () => this.handleIssue({ isClose: true })); + stream.on('timeout', () => this.handleIssue({ isTimeout: true, destroy: true })); // hook the message stream up to the passed in stream stream.pipe(this[kMessageStream]); @@ -269,6 +240,35 @@ export class Connection extends EventEmitter { this[kLastUseTime] = now(); } + handleIssue(issue: { isTimeout?: boolean; isClose?: boolean; destroy?: boolean | Error }): void { + if (this.closed) { + return; + } + + if (issue.destroy) { + this[kStream].destroy(typeof issue.destroy === 'boolean' ? undefined : issue.destroy); + } + + this.closed = true; + + for (const [, op] of this[kQueue]) { + if (issue.isTimeout) { + op.cb( + new MongoNetworkTimeoutError(`connection ${this.id} to ${this.address} timed out`, { + beforeHandshake: !!this[kIsMaster] + }) + ); + } else if (issue.isClose) { + op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`)); + } else { + op.cb(typeof issue.destroy === 'boolean' ? undefined : issue.destroy); + } + } + + this[kQueue].clear(); + this.emit('close'); + } + destroy(): void; destroy(callback: Callback): void; destroy(options: DestroyOptions): void; diff --git a/test/unit/sdam/topology.test.js b/test/unit/sdam/topology.test.js index 71964ed918..4669c72206 100644 --- a/test/unit/sdam/topology.test.js +++ b/test/unit/sdam/topology.test.js @@ -286,5 +286,33 @@ describe('Topology (unit)', function () { }); }); }); + + it('should encounter a server selection timeout on garbled server responses', function () { + const net = require('net'); + const server = net.createServer(); + const p = Promise.resolve(); + server.listen(0, 'localhost', 2, () => { + server.on('connection', c => c.on('data', () => c.write('garbage_data'))); + const { address, port } = server.address(); + const client = this.configuration.newClient(`mongodb://${address}:${port}`, { + serverSelectionTimeoutMS: 1000 + }); + p.then(() => + client + .connect() + .then(() => { + expect.fail('Should throw a server selection error!'); + }) + .catch(error => { + expect(error).to.exist; + }) + .finally(() => { + server.close(); + return client.close(); + }) + ); + }); + return p; + }); }); });