diff --git a/src/change_stream.ts b/src/change_stream.ts index aab273b06c0..1e80cf7e58e 100644 --- a/src/change_stream.ts +++ b/src/change_stream.ts @@ -1,5 +1,12 @@ import Denque = require('denque'); -import { MongoError, AnyError, isResumableError, MongoDriverError } from './error'; +import { + MongoError, + AnyError, + isResumableError, + MongoDriverError, + MongoAPIError, + MongoChangeStreamError +} from './error'; import { AggregateOperation, AggregateOptions } from './operations/aggregate'; import { maxWireVersion, @@ -259,9 +266,8 @@ export class ChangeStream extends TypedEven } else if (parent instanceof MongoClient) { this.type = CHANGE_DOMAIN_TYPES.CLUSTER; } else { - // TODO(NODE-3404): Replace this with MongoChangeStreamError - throw new MongoDriverError( - 'Parent provided to ChangeStream constructor must an instance of Collection, Db, or MongoClient' + throw new MongoChangeStreamError( + 'Parent provided to ChangeStream constructor must be an instance of Collection, Db, or MongoClient' ); } @@ -365,8 +371,7 @@ export class ChangeStream extends TypedEven */ stream(options?: CursorStreamOptions): Readable { this.streamOptions = options; - // TODO(NODE-3404): Replace this with MongoChangeStreamError - if (!this.cursor) throw new MongoDriverError(NO_CURSOR_ERROR); + if (!this.cursor) throw new MongoChangeStreamError(NO_CURSOR_ERROR); return this.cursor.stream(options); } @@ -543,8 +548,9 @@ const CHANGE_STREAM_EVENTS = [ function setIsEmitter(changeStream: ChangeStream): void { if (changeStream[kMode] === 'iterator') { - throw new MongoDriverError( - 'Cannot use ChangeStream as an EventEmitter after using as an iterator' + // TODO(NODE-3485): Replace with MongoChangeStreamModeError + throw new MongoAPIError( + 'ChangeStream cannot be used as an EventEmitter after being used as an iterator' ); } changeStream[kMode] = 'emitter'; @@ -552,8 +558,8 @@ function setIsEmitter(changeStream: ChangeStream): void { function setIsIterator(changeStream: ChangeStream): void { if (changeStream[kMode] === 'emitter') { - throw new MongoDriverError( - 'Cannot use ChangeStream as iterator after using as an EventEmitter' + throw new MongoAPIError( + 'ChangeStream cannot be used as an EventEmitter after being used as an iterator' ); } changeStream[kMode] = 'iterator'; @@ -630,6 +636,7 @@ function waitForTopologyConnected( } if (calculateDurationInMs(start) > timeout) { + // TODO(NODE-3497): Replace with MongoNetworkTimeoutError return callback(new MongoDriverError('Timed out waiting for connection')); } @@ -676,17 +683,23 @@ function processNewChange( callback?: Callback> ) { if (changeStream[kClosed]) { + // TODO(NODE-3405): Replace with MongoStreamClosedError if (callback) callback(new MongoDriverError(CHANGESTREAM_CLOSED_ERROR)); return; } // a null change means the cursor has been notified, implicitly closing the change stream if (change == null) { + // TODO(NODE-3405): Replace with MongoStreamClosedError return closeWithError(changeStream, new MongoDriverError(CHANGESTREAM_CLOSED_ERROR), callback); } if (change && !change._id) { - return closeWithError(changeStream, new MongoDriverError(NO_RESUME_TOKEN_ERROR), callback); + return closeWithError( + changeStream, + new MongoChangeStreamError(NO_RESUME_TOKEN_ERROR), + callback + ); } // cache the resume token @@ -710,6 +723,7 @@ function processError( // If the change stream has been closed explicitly, do not process error. if (changeStream[kClosed]) { + // TODO(NODE-3405): Replace with MongoStreamClosedError if (callback) callback(new MongoDriverError(CHANGESTREAM_CLOSED_ERROR)); return; } @@ -770,6 +784,7 @@ function processError( */ function getCursor(changeStream: ChangeStream, callback: Callback>) { if (changeStream[kClosed]) { + // TODO(NODE-3405): Replace with MongoStreamClosedError callback(new MongoDriverError(CHANGESTREAM_CLOSED_ERROR)); return; } @@ -795,11 +810,12 @@ function processResumeQueue(changeStream: ChangeStream, err?: const request = changeStream[kResumeQueue].pop(); if (!err) { if (changeStream[kClosed]) { + // TODO(NODE-3405): Replace with MongoStreamClosedError request(new MongoDriverError(CHANGESTREAM_CLOSED_ERROR)); return; } if (!changeStream.cursor) { - request(new MongoDriverError(NO_CURSOR_ERROR)); + request(new MongoChangeStreamError(NO_CURSOR_ERROR)); return; } } diff --git a/src/cmap/connect.ts b/src/cmap/connect.ts index 3cb4ea73f90..a87bcf1f7b4 100644 --- a/src/cmap/connect.ts +++ b/src/cmap/connect.ts @@ -5,10 +5,10 @@ import { MongoNetworkError, MongoNetworkTimeoutError, AnyError, - MongoDriverError, MongoCompatibilityError, + MongoInvalidArgumentError, MongoServerError, - MongoInvalidArgumentError + MongoDriverError } from '../error'; import { AUTH_PROVIDERS, AuthMechanism } from './auth/defaultAuthProviders'; import { AuthContext } from './auth/auth_provider'; diff --git a/src/cmap/message_stream.ts b/src/cmap/message_stream.ts index 569c89dabea..8ab3290e3be 100644 --- a/src/cmap/message_stream.ts +++ b/src/cmap/message_stream.ts @@ -1,6 +1,6 @@ import { Duplex, DuplexOptions } from 'stream'; import { Response, Msg, BinMsg, Query, WriteProtocolMessageType, MessageHeader } from './commands'; -import { MongoDriverError, MongoParseError } from '../error'; +import { MongoDecompressionError, MongoParseError } from '../error'; import { OP_COMPRESSED, OP_MSG } from './wire_protocol/constants'; import { compress, @@ -191,9 +191,7 @@ function processIncomingData(stream: MessageStream, callback: Callback) if (messageBody.length !== messageHeader.length) { callback( - new MongoDriverError( - 'Decompressing a compressed message from the server failed. The message is corrupt.' - ) + new MongoDecompressionError('Message body and message header must be the same length') ); return; diff --git a/src/cmap/wire_protocol/compression.ts b/src/cmap/wire_protocol/compression.ts index 576d8e7badd..368f944c5e9 100644 --- a/src/cmap/wire_protocol/compression.ts +++ b/src/cmap/wire_protocol/compression.ts @@ -3,7 +3,7 @@ import type { Callback } from '../../utils'; import type { OperationDescription } from '../message_stream'; import { Snappy } from '../../deps'; -import { MongoDriverError } from '../../error'; +import { MongoDecompressionError, MongoInvalidArgumentError } from '../../error'; /** @public */ export const Compressor = Object.freeze({ @@ -53,10 +53,8 @@ export function compress( zlib.deflate(dataToBeCompressed, zlibOptions, callback as zlib.CompressCallback); break; default: - throw new MongoDriverError( - 'Attempt to compress message using unknown compressor "' + - self.options.agreedCompressor + - '".' + throw new MongoInvalidArgumentError( + `Unknown compressor ${self.options.agreedCompressor} failed to compress` ); } } @@ -68,9 +66,8 @@ export function decompress( callback: Callback ): void { if (compressorID < 0 || compressorID > Math.max(2)) { - throw new MongoDriverError( - `Server sent message compressed using an unsupported compressor.` + - ` (Received compressor ID ${compressorID})` + throw new MongoDecompressionError( + `Server sent message compressed using an unsupported compressor. (Received compressor ID ${compressorID})` ); } diff --git a/src/connection_string.ts b/src/connection_string.ts index dc661b4fae4..b61eb817d56 100644 --- a/src/connection_string.ts +++ b/src/connection_string.ts @@ -6,7 +6,7 @@ import { AuthMechanism } from './cmap/auth/defaultAuthProviders'; import { ReadPreference, ReadPreferenceMode } from './read_preference'; import { ReadConcern, ReadConcernLevel } from './read_concern'; import { W, WriteConcern } from './write_concern'; -import { MongoParseError } from './error'; +import { MongoAPIError, MongoInvalidArgumentError, MongoParseError } from './error'; import { AnyOptions, Callback, @@ -32,6 +32,7 @@ import type { TagSet } from './sdam/server_description'; import { Logger, LoggerLevel } from './logger'; import { PromiseProvider } from './promise_provider'; import { Encrypter } from './encrypter'; +import { Compressor } from './cmap/wire_protocol/compression'; const VALID_TXT_RECORDS = ['authSource', 'replicaSet', 'loadBalanced']; @@ -64,11 +65,12 @@ function matchesParentDomain(srvAddress: string, parentDomain: string): boolean */ export function resolveSRVRecord(options: MongoOptions, callback: Callback): void { if (typeof options.srvHost !== 'string') { - return callback(new MongoParseError('Cannot resolve empty srv string')); + return callback(new MongoAPIError('Option "srvHost" must not be empty')); } if (options.srvHost.split('.').length < 3) { - return callback(new MongoParseError('URI does not have hostname, domain name and tld')); + // TODO(NODE-3484): Replace with MongoConnectionStringError + return callback(new MongoAPIError('URI must include hostname, domain name, and tld')); } // Resolve the SRV record and use the result as the list of hosts to connect to. @@ -77,14 +79,12 @@ export function resolveSRVRecord(options: MongoOptions, callback: Callback { if (error) { - __handleError(stream, error); + stream.emit(GridFSBucketReadStream.ERROR, error); return; } @@ -234,31 +240,38 @@ function doRead(stream: GridFSBucketReadStream): void { const bytesRemaining = stream.s.file.length - stream.s.bytesRead; const expectedN = stream.s.expected++; const expectedLength = Math.min(stream.s.file.chunkSize, bytesRemaining); - let errmsg: string; if (doc.n > expectedN) { - errmsg = `ChunkIsMissing: Got unexpected n: ${doc.n}, expected: ${expectedN}`; - return __handleError(stream, new MongoDriverError(errmsg)); + return stream.emit( + GridFSBucketReadStream.ERROR, + new MongoGridFSChunkError( + `ChunkIsMissing: Got unexpected n: ${doc.n}, expected: ${expectedN}` + ) + ); } if (doc.n < expectedN) { - errmsg = `ExtraChunk: Got unexpected n: ${doc.n}, expected: ${expectedN}`; - return __handleError(stream, new MongoDriverError(errmsg)); + return stream.emit( + GridFSBucketReadStream.ERROR, + new MongoGridFSChunkError(`ExtraChunk: Got unexpected n: ${doc.n}, expected: ${expectedN}`) + ); } let buf = Buffer.isBuffer(doc.data) ? doc.data : doc.data.buffer; if (buf.byteLength !== expectedLength) { if (bytesRemaining <= 0) { - errmsg = `ExtraChunk: Got unexpected n: ${doc.n}`; - return __handleError(stream, new MongoDriverError(errmsg)); + return stream.emit( + GridFSBucketReadStream.ERROR, + new MongoGridFSChunkError(`ExtraChunk: Got unexpected n: ${doc.n}`) + ); } - errmsg = - 'ChunkIsWrongSize: Got unexpected length: ' + - buf.byteLength + - ', expected: ' + - expectedLength; - return __handleError(stream, new MongoDriverError(errmsg)); + return stream.emit( + GridFSBucketReadStream.ERROR, + new MongoGridFSChunkError( + `ChunkIsWrongSize: Got unexpected length: ${buf.byteLength}, expected: ${expectedLength}` + ) + ); } stream.s.bytesRead += buf.byteLength; @@ -305,7 +318,7 @@ function init(stream: GridFSBucketReadStream): void { stream.s.files.findOne(stream.s.filter, findOneOptions, (error, doc) => { if (error) { - return __handleError(stream, error); + return stream.emit(GridFSBucketReadStream.ERROR, error); } if (!doc) { @@ -315,7 +328,7 @@ function init(stream: GridFSBucketReadStream): void { const errmsg = `FileNotFound: file ${identifier} was not found`; const err = new MongoDriverError(errmsg); err.code = 'ENOENT'; // TODO: NODE-3338 set property as part of constructor - return __handleError(stream, err); + return stream.emit(GridFSBucketReadStream.ERROR, err); } // If document is empty, kill the stream immediately and don't @@ -336,7 +349,7 @@ function init(stream: GridFSBucketReadStream): void { try { stream.s.bytesToSkip = handleStartOption(stream, doc, stream.s.options); } catch (error) { - return __handleError(stream, error); + return stream.emit(GridFSBucketReadStream.ERROR, error); } const filter: Document = { files_id: doc._id }; @@ -362,7 +375,7 @@ function init(stream: GridFSBucketReadStream): void { try { stream.s.bytesToTrim = handleEndOption(stream, doc, stream.s.cursor, stream.s.options); } catch (error) { - return __handleError(stream, error); + return stream.emit(GridFSBucketReadStream.ERROR, error); } stream.emit(GridFSBucketReadStream.FILE, doc); @@ -438,7 +451,3 @@ function handleEndOption( } throw new MongoInvalidArgumentError('End option must be defined'); } - -function __handleError(stream: GridFSBucketReadStream, error?: AnyError): void { - stream.emit(GridFSBucketReadStream.ERROR, error); -} diff --git a/src/gridfs/upload.ts b/src/gridfs/upload.ts index bcc7cec84be..ff52e526633 100644 --- a/src/gridfs/upload.ts +++ b/src/gridfs/upload.ts @@ -1,14 +1,20 @@ import { Writable } from 'stream'; -import { MongoError, AnyError, MONGODB_ERROR_CODES, MongoDriverError } from '../error'; -import { WriteConcern } from './../write_concern'; -import { PromiseProvider } from '../promise_provider'; +import type { Document } from '../bson'; import { ObjectId } from '../bson'; +import type { Collection } from '../collection'; +import { + AnyError, + MONGODB_ERROR_CODES, + MongoDriverError, + MongoError, + MongoGridFSStreamError +} from '../error'; +import { PromiseProvider } from '../promise_provider'; import type { Callback } from '../utils'; -import type { Document } from '../bson'; -import type { GridFSBucket } from './index'; -import type { GridFSFile } from './download'; import type { WriteConcernOptions } from '../write_concern'; -import type { Collection } from '../collection'; +import { WriteConcern } from './../write_concern'; +import type { GridFSFile } from './download'; +import type { GridFSBucket } from './index'; /** @public */ export interface GridFSChunk { @@ -133,6 +139,7 @@ export class GridFSBucketWriteStream extends Writable { return waitForIndexes(this, () => doWrite(this, chunk, encoding, callback)); } + // TODO(NODE-3405): Refactor this with maybePromise and MongoStreamClosedError /** * Places this write stream into an aborted state (all future writes fail) * and deletes all chunks that have already been written. @@ -143,8 +150,9 @@ export class GridFSBucketWriteStream extends Writable { abort(callback: Callback): void; abort(callback?: Callback): Promise | void { const Promise = PromiseProvider.get(); - let error: MongoDriverError; + let error: MongoGridFSStreamError; if (this.state.streamEnd) { + // TODO(NODE-3405): Replace with MongoStreamClosedError error = new MongoDriverError('Cannot abort a stream that has already completed'); if (typeof callback === 'function') { return callback(error); @@ -152,6 +160,7 @@ export class GridFSBucketWriteStream extends Writable { return Promise.reject(error); } if (this.state.aborted) { + // TODO(NODE-3405): Replace with MongoStreamClosedError error = new MongoDriverError('Cannot call abort() on a stream twice'); if (typeof callback === 'function') { return callback(error); @@ -556,6 +565,7 @@ function writeRemnant(stream: GridFSBucketWriteStream, callback?: Callback): boo function checkAborted(stream: GridFSBucketWriteStream, callback?: Callback): boolean { if (stream.state.aborted) { if (typeof callback === 'function') { + // TODO(NODE-3405): Replace with MongoStreamClosedError callback(new MongoDriverError('this stream has been aborted')); } return true; diff --git a/src/index.ts b/src/index.ts index 870bb63a117..f58caa191ca 100644 --- a/src/index.ts +++ b/src/index.ts @@ -43,7 +43,16 @@ export { MongoSystemError, MongoServerSelectionError, MongoParseError, - MongoWriteConcernError + MongoWriteConcernError, + MongoRuntimeError, + MongoChangeStreamError, + MongoGridFSStreamError, + MongoGridFSChunkError, + MongoDecompressionError, + MongoBatchReExecutionError, + MongoCursorExhaustedError, + MongoCursorInUseError, + MongoNotConnectedError } from './error'; export { MongoBulkWriteError, BulkWriteOptions, AnyBulkWriteOperation } from './bulk/common'; export { diff --git a/test/functional/change_stream.test.js b/test/functional/change_stream.test.js index accb54c65ce..5508a0fbb18 100644 --- a/test/functional/change_stream.test.js +++ b/test/functional/change_stream.test.js @@ -100,7 +100,7 @@ function triggerResumableError(changeStream, delay, onClose) { */ function waitForStarted(changeStream, callback) { const timeout = setTimeout(() => { - throw new Error('Change stream never started'); + expect.fail('Change stream never started'); }, 2000); changeStream.cursor.once('init', () => { @@ -874,7 +874,7 @@ describe('Change Streams', function () { .next() .then(function () { // We should never execute this line because calling changeStream.next() should throw an error - throw new Error( + expect.fail( 'ChangeStream.next() returned a change document but it should have returned a MongoNetworkError' ); }) @@ -2007,7 +2007,7 @@ describe('Change Streams', function () { if (counter === 2) { changeStream.close(close); } else if (counter >= 3) { - close(new Error('should not have received more than 2 events')); + close(new Error('Should not have received more than 2 events')); } }); changeStream.on('error', err => close(err)); diff --git a/test/functional/gridfs_stream.test.js b/test/functional/gridfs_stream.test.js index 4d7b2c7a12b..bfe6a5ec8fc 100644 --- a/test/functional/gridfs_stream.test.js +++ b/test/functional/gridfs_stream.test.js @@ -473,6 +473,7 @@ describe('GridFS Stream', function () { // Fail if user tries to abort an aborted stream uploadStream.abort().then(null, function (error) { expect(error.toString()).to.equal( + // TODO(NODE-3405): Replace with MongoStreamClosedError 'MongoDriverError: Cannot call abort() on a stream twice' ); client.close(done); @@ -526,6 +527,7 @@ describe('GridFS Stream', function () { // Fail if user tries to abort an aborted stream uploadStream.abort().then(null, function (error) { expect(error.toString()).to.equal( + // TODO(NODE-3405): Replace with MongoStreamClosedError 'MongoDriverError: Cannot call abort() on a stream twice' ); client.close(done); @@ -1184,6 +1186,7 @@ describe('GridFS Stream', function () { // Fail if user tries to abort an aborted stream uploadStream.abort().then(null, function (error) { expect(error.toString()).to.equal( + // TODO(NODE-3405): Replace with MongoStreamClosedError 'MongoDriverError: Cannot call abort() on a stream twice' ); client.close(done); diff --git a/test/unit/core/connection_string.test.js b/test/unit/core/connection_string.test.js index c1f51322889..41f0f227ba0 100644 --- a/test/unit/core/connection_string.test.js +++ b/test/unit/core/connection_string.test.js @@ -1,6 +1,10 @@ 'use strict'; -const { MongoParseError, MongoDriverError } = require('../../../src/error'); +const { + MongoParseError, + MongoDriverError, + MongoInvalidArgumentError +} = require('../../../src/error'); const { loadSpecTests } = require('../../spec'); const chai = require('chai'); const { parseOptions } = require('../../../src/connection_string'); @@ -110,7 +114,7 @@ describe('Connection String', function () { describe('validation', function () { it('should validate compressors options', function () { expect(() => parseOptions('mongodb://localhost/?compressors=bunnies')).to.throw( - MongoParseError, + MongoInvalidArgumentError, 'bunnies is not a valid compression mechanism' ); });