diff --git a/packages/compass-import-export/src/utils/collection-stream.js b/packages/compass-import-export/src/utils/collection-stream.js index 8a21cff9508..3e463c5df09 100644 --- a/packages/compass-import-export/src/utils/collection-stream.js +++ b/packages/compass-import-export/src/utils/collection-stream.js @@ -1,6 +1,7 @@ /* eslint-disable no-console */ import { Writable } from 'stream'; import { createDebug } from './logger'; + const debug = createDebug('collection-stream'); function mongodbServerErrorToJSError({ index, code, errmsg, op, errInfo }) { @@ -48,7 +49,7 @@ class WritableCollectionStream extends Writable { _write(document, _encoding, next) { this.batch.push(document); - if (this.batch.length === this.BATCH_SIZE) { + if (this.batch.length >= this.BATCH_SIZE) { return this._executeBatch(next); } @@ -69,57 +70,80 @@ class WritableCollectionStream extends Writable { this._executeBatch(callback); } - _executeBatch(callback) { + async _executeBatch(callback) { const documents = this.batch; this.batch = []; - this._collection().bulkWrite( - documents.map((doc) => ({ insertOne: doc })), - { - ordered: this.stopOnErrors, - retryWrites: false, - checkKeys: false, - }, - (err, res) => { + let result; + let error; + + try { + result = await this._collection().bulkWrite( + documents.map((doc) => ({ insertOne: doc })), + { + ordered: this.stopOnErrors, + retryWrites: false, + checkKeys: false, + } + ); + } catch (bulkWriteError) { + // Currently, the server does not support batched inserts for FLE2: + // https://jira.mongodb.org/browse/SERVER-66315 + // We check for this specific error and re-try inserting documents one by one. + if (bulkWriteError.code === 6371202) { + this.BATCH_SIZE = 1; + + let nInserted = 0; + + for (const doc of documents) { + try { + await this._collection().insertOne(doc, {}); + nInserted += 1; + } catch (insertOneByOneError) { + this._errors.push(insertOneByOneError); + + if (this.stopOnErrors) { + break; + } + } + } + + result = { ok: 1, nInserted }; + } else { // If we are writing with `ordered: false`, bulkWrite will throw and // will not return any result, but server might write some docs and bulk // result can still be accessed on the error instance - const result = (err && err.result && err.result.result) || res; - - // Driver seems to return null instead of undefined in some rare cases - // when the operation ends in error, instead of relying on - // `_mergeBulkOpResult` default argument substitution, we need to keep - // this OR expression here - this._mergeBulkOpResult(result || {}); - - this.docsProcessed += documents.length; - - this.docsWritten = this._stats.nInserted; + result = bulkWriteError.result && bulkWriteError.result.result; + this._errors.push(bulkWriteError); + } + } - this._batchCounter++; + // Driver seems to return null instead of undefined in some rare cases + // when the operation ends in error, instead of relying on + // `_mergeBulkOpResult` default argument substitution, we need to keep + // this OR expression here + this._mergeBulkOpResult(result || {}); - if (err) { - this._errors.push(err); - } + this.docsWritten = this._stats.nInserted; + this.docsProcessed += documents.length; + this._batchCounter++; - this.printJobStats(); + this.printJobStats(); - this.emit('progress', { - docsWritten: this.docsWritten, - docsProcessed: this.docsProcessed, - errors: this._errors - .concat(this._stats.writeErrors) - .concat(this._stats.writeConcernErrors), - }); + this.emit('progress', { + docsWritten: this.docsWritten, + docsProcessed: this.docsProcessed, + errors: this._errors + .concat(this._stats.writeErrors) + .concat(this._stats.writeConcernErrors), + }); - if (this.stopOnErrors) { - return callback(err); - } + if (this.stopOnErrors) { + return callback(error); + } - return callback(); - } - ); + return callback(); } _mergeBulkOpResult(result = {}) { diff --git a/packages/compass-import-export/src/utils/collection-stream.spec.js b/packages/compass-import-export/src/utils/collection-stream.spec.js new file mode 100644 index 00000000000..99d8e78e250 --- /dev/null +++ b/packages/compass-import-export/src/utils/collection-stream.spec.js @@ -0,0 +1,149 @@ +import { createCollectionWriteStream } from './collection-stream'; +const { expect } = require('chai'); + +const TEST_COLLECTION_NAME = 'db.testimport'; + +const runImport = (dataService, stopOnErrors) => { + return new Promise((resolve, reject) => { + try { + const dest = createCollectionWriteStream( + dataService, + TEST_COLLECTION_NAME, + stopOnErrors + ); + + dest.once('progress', (stats) => { + resolve(stats); + }); + + dest.write({ phoneNumber: '+12874627836' }); + dest.write({ phoneNumber: '+12874627811' }); + dest.end({ phoneNumber: '+12874627222' }); + } catch (err) { + reject(err); + } + }); +}; + +describe('collection-stream', function () { + describe('_executeBatch', function () { + it('insert documents as bulk to regular collection', async function () { + const dataService = { + _collection: function () { + return { + bulkWrite: () => + Promise.resolve({ + nInserted: 3, + nMatched: 0, + nModified: 0, + nRemoved: 0, + nUpserted: 0, + ok: 1, + }), + }; + }, + }; + + const stats = await runImport(dataService, false); + + expect(stats.docsProcessed).to.be.equal(3); + expect(stats.docsWritten).to.be.equal(3); + expect(stats.errors.length).to.be.equal(0); + }); + + it('insert documents one by one to FLE2 collection', async function () { + const dataService = { + _collection: function () { + return { + bulkWrite: () => + new Promise((resolve, reject) => { + const error = new Error( + 'Only single insert batches are supported in FLE2' + ); + error.code = 6371202; + + reject(error); + }), + insertOne: () => Promise.resolve({ acknowledged: true }), + }; + }, + }; + + const stats = await runImport(dataService, false); + + expect(stats.docsProcessed).to.be.equal(3); + expect(stats.docsWritten).to.be.equal(3); + expect(stats.errors.length).to.be.equal(0); + }); + + it('insert documents to FLE2 collection even one is failed', async function () { + let i = 0; + const dataService = { + _collection: function () { + return { + bulkWrite: () => + new Promise((resolve, reject) => { + const error = new Error( + 'Only single insert batches are supported in FLE2' + ); + error.code = 6371202; + + reject(error); + }), + insertOne: () => + new Promise((resolve, reject) => { + i += 1; + + if (i === 2) { + return reject(new Error('foo')); + } else { + return resolve({ acknowledged: true }); + } + }), + }; + }, + }; + + const stats = await runImport(dataService, false); + + expect(stats.docsProcessed).to.be.equal(3); + expect(stats.docsWritten).to.be.equal(2); + expect(stats.errors.length).to.be.equal(1); + }); + + it('stops on error when inserting to FLE2 collection', async function () { + let i = 0; + const dataService = { + _collection: function () { + return { + bulkWrite: () => + new Promise((resolve, reject) => { + const error = new Error( + 'Only single insert batches are supported in FLE2' + ); + error.code = 6371202; + + reject(error); + }), + insertOne: () => + new Promise((resolve, reject) => { + i += 1; + + if (i === 2) { + return reject(new Error('foo')); + } else { + return resolve({ acknowledged: true }); + } + }), + }; + }, + }; + + const stats = await runImport(dataService, true); + + expect(stats.docsProcessed).to.be.equal(3); + expect(stats.docsWritten).to.be.equal(1); + expect(stats.errors.length).to.be.equal(1); + }); + }); +});