From 06ca0ca258d31a31d7e5f6bc440e1639fe221db8 Mon Sep 17 00:00:00 2001 From: Alena Khineika Date: Tue, 17 May 2022 12:02:01 +0200 Subject: [PATCH 1/4] feat: allow import into fle2 collections COMPASS-5810 --- .../src/utils/collection-stream.js | 106 +++++++++++------ .../src/utils/collection-stream.spec.js | 110 ++++++++++++++++++ 2 files changed, 178 insertions(+), 38 deletions(-) create mode 100644 packages/compass-import-export/src/utils/collection-stream.spec.js diff --git a/packages/compass-import-export/src/utils/collection-stream.js b/packages/compass-import-export/src/utils/collection-stream.js index 8a21cff9508..32b0c4e5b96 100644 --- a/packages/compass-import-export/src/utils/collection-stream.js +++ b/packages/compass-import-export/src/utils/collection-stream.js @@ -1,6 +1,8 @@ /* eslint-disable no-console */ import { Writable } from 'stream'; import { createDebug } from './logger'; +import { promisify } from 'util'; + const debug = createDebug('collection-stream'); function mongodbServerErrorToJSError({ index, code, errmsg, op, errInfo }) { @@ -69,59 +71,87 @@ class WritableCollectionStream extends Writable { this._executeBatch(callback); } - _executeBatch(callback) { + async _executeBatch(callback) { const documents = this.batch; this.batch = []; - this._collection().bulkWrite( - documents.map((doc) => ({ insertOne: doc })), - { - ordered: this.stopOnErrors, - retryWrites: false, - checkKeys: false, - }, - (err, res) => { + const bulkWrite = promisify( + this._collection().bulkWrite.bind(this._collection()) + ); + let result; + let error; + + try { + result = await bulkWrite( + documents.map((doc) => ({ insertOne: doc })), + { + ordered: this.stopOnErrors, + retryWrites: false, + checkKeys: false, + } + ); + } catch (bulkWriteError) { + // Currently, the server does not support batched inserts for FLE2: + // https://jira.mongodb.org/browse/SERVER-66315 + // We check for this specific error and re-try inserting documents one by one. + if (bulkWriteError.code === 6371202) { + const insertOne = promisify( + this._collection().insertOne.bind(this._collection()) + ); + + try { + let nInserted = 0; + await Promise.allSettled( + documents.map(async (doc) => { + try { + await insertOne(doc, {}); + nInserted += 1; + } catch (error) { + this._errors.push(error); + } + }) + ); + result = { ok: 1, nInserted }; + } catch (insertOneByOneError) { + error = insertOneByOneError; + } + } else { // If we are writing with `ordered: false`, bulkWrite will throw and // will not return any result, but server might write some docs and bulk // result can still be accessed on the error instance - const result = (err && err.result && err.result.result) || res; - - // Driver seems to return null instead of undefined in some rare cases - // when the operation ends in error, instead of relying on - // `_mergeBulkOpResult` default argument substitution, we need to keep - // this OR expression here - this._mergeBulkOpResult(result || {}); - - this.docsProcessed += documents.length; - - this.docsWritten = this._stats.nInserted; + result = bulkWriteError.result && bulkWriteError.result.result; + this._errors.push(bulkWriteError); + } + } - this._batchCounter++; + this._mergeBulkOpResult(result); - if (err) { - this._errors.push(err); - } + this.docsWritten = this._stats.nInserted; + this.docsProcessed += documents.length; + this._batchCounter++; - this.printJobStats(); + this.printJobStats(); - this.emit('progress', { - docsWritten: this.docsWritten, - docsProcessed: this.docsProcessed, - errors: this._errors - .concat(this._stats.writeErrors) - .concat(this._stats.writeConcernErrors), - }); + this.emit('progress', { + docsWritten: this.docsWritten, + docsProcessed: this.docsProcessed, + errors: this._errors + .concat(this._stats.writeErrors) + .concat(this._stats.writeConcernErrors), + }); - if (this.stopOnErrors) { - return callback(err); - } + if (this.stopOnErrors) { + return callback(error); + } - return callback(); - } - ); + return callback(); } + // Driver seems to return null instead of undefined in some rare cases + // when the operation ends in error, instead of relying on + // `_mergeBulkOpResult` default argument substitution, we need to keep + // this OR expression here _mergeBulkOpResult(result = {}) { const numKeys = [ 'nInserted', diff --git a/packages/compass-import-export/src/utils/collection-stream.spec.js b/packages/compass-import-export/src/utils/collection-stream.spec.js new file mode 100644 index 00000000000..644d90e9db4 --- /dev/null +++ b/packages/compass-import-export/src/utils/collection-stream.spec.js @@ -0,0 +1,110 @@ +import { createCollectionWriteStream } from './collection-stream'; +const { expect } = require('chai'); + +const TEST_COLLECTION_NAME = 'db.testimport'; + +const runImport = (dataService) => { + return new Promise((resolve, reject) => { + try { + const dest = createCollectionWriteStream( + dataService, + TEST_COLLECTION_NAME, + true + ); + + dest.once('progress', (stats) => { + resolve(stats); + }); + + dest.write({ phoneNumber: '+12874627836' }); + dest.end({ phoneNumber: '+12874627222' }); + } catch (err) { + reject(err); + } + }); +}; + +describe('collection-stream', function () { + describe('_executeBatch', function () { + it('insert documents as bulk to regular collections', async function () { + const dataService = { + _collection: function () { + return { + bulkWrite: (operations, options, callback) => + callback(null, { + nInserted: 2, + nMatched: 0, + nModified: 0, + nRemoved: 0, + nUpserted: 0, + ok: 1, + }), + }; + }, + }; + + const stats = await runImport(dataService); + + expect(stats.docsProcessed).to.be.equal(2); + expect(stats.docsWritten).to.be.equal(2); + expect(stats.errors.length).to.be.equal(0); + }); + + it('insert documents one by one to FLE2 collections', async function () { + const dataService = { + _collection: function () { + return { + bulkWrite: (operations, options, callback) => { + const error = new Error( + 'Only single insert batches are supported in FLE2' + ); + error.code = 6371202; + + callback(error); + }, + insertOne: (document, options, callback) => + callback(null, { acknowledged: true }), + }; + }, + }; + + const stats = await runImport(dataService); + + expect(stats.docsProcessed).to.be.equal(2); + expect(stats.docsWritten).to.be.equal(2); + expect(stats.errors.length).to.be.equal(0); + }); + + it('insert documents to FLE2 collections even one is failed', async function () { + let i = 0; + const dataService = { + _collection: function () { + return { + bulkWrite: (operations, options, callback) => { + const error = new Error( + 'Only single insert batches are supported in FLE2' + ); + error.code = 6371202; + + callback(error); + }, + insertOne: (document, options, callback) => { + if (i === 0) { + i += 1; + return callback(null, { acknowledged: true }); + } else { + return callback(new Error('foo')); + } + }, + }; + }, + }; + + const stats = await runImport(dataService); + + expect(stats.docsProcessed).to.be.equal(2); + expect(stats.docsWritten).to.be.equal(1); + expect(stats.errors.length).to.be.equal(1); + }); + }); +}); From d6502917d9d9bd65d20c086cd74bca68616ff6a7 Mon Sep 17 00:00:00 2001 From: Alena Khineika Date: Tue, 17 May 2022 12:08:33 +0200 Subject: [PATCH 2/4] docs: move comment back to the function call --- .../compass-import-export/src/utils/collection-stream.js | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/compass-import-export/src/utils/collection-stream.js b/packages/compass-import-export/src/utils/collection-stream.js index 32b0c4e5b96..b0206e1263e 100644 --- a/packages/compass-import-export/src/utils/collection-stream.js +++ b/packages/compass-import-export/src/utils/collection-stream.js @@ -125,6 +125,10 @@ class WritableCollectionStream extends Writable { } } + // Driver seems to return null instead of undefined in some rare cases + // when the operation ends in error, instead of relying on + // `_mergeBulkOpResult` default argument substitution, we need to keep + // this OR expression here this._mergeBulkOpResult(result); this.docsWritten = this._stats.nInserted; @@ -148,10 +152,6 @@ class WritableCollectionStream extends Writable { return callback(); } - // Driver seems to return null instead of undefined in some rare cases - // when the operation ends in error, instead of relying on - // `_mergeBulkOpResult` default argument substitution, we need to keep - // this OR expression here _mergeBulkOpResult(result = {}) { const numKeys = [ 'nInserted', From fb36f7813ddad77eaa5924d85dc3c72e95ecb799 Mon Sep 17 00:00:00 2001 From: Alena Khineika Date: Tue, 17 May 2022 16:35:55 +0200 Subject: [PATCH 3/4] feat: stop on error option while inserting one by one --- .../src/utils/collection-stream.js | 44 +++---- .../src/utils/collection-stream.spec.js | 121 ++++++++++++------ 2 files changed, 99 insertions(+), 66 deletions(-) diff --git a/packages/compass-import-export/src/utils/collection-stream.js b/packages/compass-import-export/src/utils/collection-stream.js index b0206e1263e..30e6727ea82 100644 --- a/packages/compass-import-export/src/utils/collection-stream.js +++ b/packages/compass-import-export/src/utils/collection-stream.js @@ -1,7 +1,6 @@ /* eslint-disable no-console */ import { Writable } from 'stream'; import { createDebug } from './logger'; -import { promisify } from 'util'; const debug = createDebug('collection-stream'); @@ -76,14 +75,11 @@ class WritableCollectionStream extends Writable { this.batch = []; - const bulkWrite = promisify( - this._collection().bulkWrite.bind(this._collection()) - ); let result; let error; try { - result = await bulkWrite( + result = await this._collection().bulkWrite( documents.map((doc) => ({ insertOne: doc })), { ordered: this.stopOnErrors, @@ -96,26 +92,24 @@ class WritableCollectionStream extends Writable { // https://jira.mongodb.org/browse/SERVER-66315 // We check for this specific error and re-try inserting documents one by one. if (bulkWriteError.code === 6371202) { - const insertOne = promisify( - this._collection().insertOne.bind(this._collection()) - ); - - try { - let nInserted = 0; - await Promise.allSettled( - documents.map(async (doc) => { - try { - await insertOne(doc, {}); - nInserted += 1; - } catch (error) { - this._errors.push(error); - } - }) - ); - result = { ok: 1, nInserted }; - } catch (insertOneByOneError) { - error = insertOneByOneError; + this.BATCH_SIZE = 1; + + let nInserted = 0; + + for (const doc of documents) { + try { + await this._collection().insertOne(doc, {}); + nInserted += 1; + } catch (insertOneByOneError) { + this._errors.push(insertOneByOneError); + + if (this.stopOnErrors) { + break; + } + } } + + result = { ok: 1, nInserted }; } else { // If we are writing with `ordered: false`, bulkWrite will throw and // will not return any result, but server might write some docs and bulk @@ -129,7 +123,7 @@ class WritableCollectionStream extends Writable { // when the operation ends in error, instead of relying on // `_mergeBulkOpResult` default argument substitution, we need to keep // this OR expression here - this._mergeBulkOpResult(result); + this._mergeBulkOpResult(result || {}); this.docsWritten = this._stats.nInserted; this.docsProcessed += documents.length; diff --git a/packages/compass-import-export/src/utils/collection-stream.spec.js b/packages/compass-import-export/src/utils/collection-stream.spec.js index 644d90e9db4..99d8e78e250 100644 --- a/packages/compass-import-export/src/utils/collection-stream.spec.js +++ b/packages/compass-import-export/src/utils/collection-stream.spec.js @@ -3,13 +3,13 @@ const { expect } = require('chai'); const TEST_COLLECTION_NAME = 'db.testimport'; -const runImport = (dataService) => { +const runImport = (dataService, stopOnErrors) => { return new Promise((resolve, reject) => { try { const dest = createCollectionWriteStream( dataService, TEST_COLLECTION_NAME, - true + stopOnErrors ); dest.once('progress', (stats) => { @@ -17,6 +17,7 @@ const runImport = (dataService) => { }); dest.write({ phoneNumber: '+12874627836' }); + dest.write({ phoneNumber: '+12874627811' }); dest.end({ phoneNumber: '+12874627222' }); } catch (err) { reject(err); @@ -26,13 +27,13 @@ const runImport = (dataService) => { describe('collection-stream', function () { describe('_executeBatch', function () { - it('insert documents as bulk to regular collections', async function () { + it('insert documents as bulk to regular collection', async function () { const dataService = { _collection: function () { return { - bulkWrite: (operations, options, callback) => - callback(null, { - nInserted: 2, + bulkWrite: () => + Promise.resolve({ + nInserted: 3, nMatched: 0, nModified: 0, nRemoved: 0, @@ -43,66 +44,104 @@ describe('collection-stream', function () { }, }; - const stats = await runImport(dataService); + const stats = await runImport(dataService, false); - expect(stats.docsProcessed).to.be.equal(2); - expect(stats.docsWritten).to.be.equal(2); + expect(stats.docsProcessed).to.be.equal(3); + expect(stats.docsWritten).to.be.equal(3); expect(stats.errors.length).to.be.equal(0); }); - it('insert documents one by one to FLE2 collections', async function () { + it('insert documents one by one to FLE2 collection', async function () { const dataService = { _collection: function () { return { - bulkWrite: (operations, options, callback) => { - const error = new Error( - 'Only single insert batches are supported in FLE2' - ); - error.code = 6371202; - - callback(error); - }, - insertOne: (document, options, callback) => - callback(null, { acknowledged: true }), + bulkWrite: () => + new Promise((resolve, reject) => { + const error = new Error( + 'Only single insert batches are supported in FLE2' + ); + error.code = 6371202; + + reject(error); + }), + insertOne: () => Promise.resolve({ acknowledged: true }), }; }, }; - const stats = await runImport(dataService); + const stats = await runImport(dataService, false); - expect(stats.docsProcessed).to.be.equal(2); - expect(stats.docsWritten).to.be.equal(2); + expect(stats.docsProcessed).to.be.equal(3); + expect(stats.docsWritten).to.be.equal(3); expect(stats.errors.length).to.be.equal(0); }); - it('insert documents to FLE2 collections even one is failed', async function () { + it('insert documents to FLE2 collection even one is failed', async function () { + let i = 0; + const dataService = { + _collection: function () { + return { + bulkWrite: () => + new Promise((resolve, reject) => { + const error = new Error( + 'Only single insert batches are supported in FLE2' + ); + error.code = 6371202; + + reject(error); + }), + insertOne: () => + new Promise((resolve, reject) => { + i += 1; + + if (i === 2) { + return reject(new Error('foo')); + } else { + return resolve({ acknowledged: true }); + } + }), + }; + }, + }; + + const stats = await runImport(dataService, false); + + expect(stats.docsProcessed).to.be.equal(3); + expect(stats.docsWritten).to.be.equal(2); + expect(stats.errors.length).to.be.equal(1); + }); + + it('stops on error when inserting to FLE2 collection', async function () { let i = 0; const dataService = { _collection: function () { return { - bulkWrite: (operations, options, callback) => { - const error = new Error( - 'Only single insert batches are supported in FLE2' - ); - error.code = 6371202; - - callback(error); - }, - insertOne: (document, options, callback) => { - if (i === 0) { + bulkWrite: () => + new Promise((resolve, reject) => { + const error = new Error( + 'Only single insert batches are supported in FLE2' + ); + error.code = 6371202; + + reject(error); + }), + insertOne: () => + new Promise((resolve, reject) => { i += 1; - return callback(null, { acknowledged: true }); - } else { - return callback(new Error('foo')); - } - }, + + if (i === 2) { + return reject(new Error('foo')); + } else { + return resolve({ acknowledged: true }); + } + }), }; }, }; - const stats = await runImport(dataService); + const stats = await runImport(dataService, true); - expect(stats.docsProcessed).to.be.equal(2); + expect(stats.docsProcessed).to.be.equal(3); expect(stats.docsWritten).to.be.equal(1); expect(stats.errors.length).to.be.equal(1); }); From be401113b09a6cf2922a0329605f0bac18693742 Mon Sep 17 00:00:00 2001 From: Alena Khineika Date: Tue, 17 May 2022 16:37:31 +0200 Subject: [PATCH 4/4] fix: batch.length ge BATCH_SIZE --- packages/compass-import-export/src/utils/collection-stream.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/compass-import-export/src/utils/collection-stream.js b/packages/compass-import-export/src/utils/collection-stream.js index 30e6727ea82..3e463c5df09 100644 --- a/packages/compass-import-export/src/utils/collection-stream.js +++ b/packages/compass-import-export/src/utils/collection-stream.js @@ -49,7 +49,7 @@ class WritableCollectionStream extends Writable { _write(document, _encoding, next) { this.batch.push(document); - if (this.batch.length === this.BATCH_SIZE) { + if (this.batch.length >= this.BATCH_SIZE) { return this._executeBatch(next); }