Skip to content

Commit

Permalink
fix(NODE-3176): handle errors from MessageStream (#2780)
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken committed Apr 19, 2021
1 parent 2e3335b commit 76b110e
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 32 deletions.
64 changes: 32 additions & 32 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,38 +194,9 @@ export class Connection extends EventEmitter {
/* ignore errors, listen to `close` instead */
});

stream.on('close', () => {
if (this.closed) {
return;
}

this.closed = true;
this[kQueue].forEach(op =>
op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`))
);
this[kQueue].clear();

this.emit('close');
});

stream.on('timeout', () => {
if (this.closed) {
return;
}

stream.destroy();
this.closed = true;
this[kQueue].forEach(op =>
op.cb(
new MongoNetworkTimeoutError(`connection ${this.id} to ${this.address} timed out`, {
beforeHandshake: this[kIsMaster] == null
})
)
);

this[kQueue].clear();
this.emit('close');
});
this[kMessageStream].on('error', error => this.handleIssue({ destroy: error }));
stream.on('close', () => this.handleIssue({ isClose: true }));
stream.on('timeout', () => this.handleIssue({ isTimeout: true, destroy: true }));

// hook the message stream up to the passed in stream
stream.pipe(this[kMessageStream]);
Expand Down Expand Up @@ -269,6 +240,35 @@ export class Connection extends EventEmitter {
this[kLastUseTime] = now();
}

handleIssue(issue: { isTimeout?: boolean; isClose?: boolean; destroy?: boolean | Error }): void {
if (this.closed) {
return;
}

if (issue.destroy) {
this[kStream].destroy(typeof issue.destroy === 'boolean' ? undefined : issue.destroy);
}

this.closed = true;

for (const [, op] of this[kQueue]) {
if (issue.isTimeout) {
op.cb(
new MongoNetworkTimeoutError(`connection ${this.id} to ${this.address} timed out`, {
beforeHandshake: !!this[kIsMaster]
})
);
} else if (issue.isClose) {
op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} closed`));
} else {
op.cb(typeof issue.destroy === 'boolean' ? undefined : issue.destroy);
}
}

this[kQueue].clear();
this.emit('close');
}

destroy(): void;
destroy(callback: Callback): void;
destroy(options: DestroyOptions): void;
Expand Down
28 changes: 28 additions & 0 deletions test/unit/sdam/topology.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -286,5 +286,33 @@ describe('Topology (unit)', function () {
});
});
});

it('should encounter a server selection timeout on garbled server responses', function () {
const net = require('net');
const server = net.createServer();
const p = Promise.resolve();
server.listen(0, 'localhost', 2, () => {
server.on('connection', c => c.on('data', () => c.write('garbage_data')));
const { address, port } = server.address();
const client = this.configuration.newClient(`mongodb://${address}:${port}`, {
serverSelectionTimeoutMS: 1000
});
p.then(() =>
client
.connect()
.then(() => {
expect.fail('Should throw a server selection error!');
})
.catch(error => {
expect(error).to.exist;
})
.finally(() => {
server.close();
return client.close();
})
);
});
return p;
});
});
});

0 comments on commit 76b110e

Please sign in to comment.