Skip to content

Commit 66e2580

Browse files
authored
refactor(collection): move operations in collection to their own classes
Fixes NODE-1897
1 parent f2e3ec2 commit 66e2580

33 files changed

+1836
-130
lines changed

lib/collection.js

Lines changed: 118 additions & 110 deletions
Large diffs are not rendered by default.
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
'use strict';
2+
3+
const OperationBase = require('./operation').OperationBase;
4+
const AggregationCursor = require('../aggregation_cursor');
5+
const applyWriteConcern = require('../utils').applyWriteConcern;
6+
const decorateWithCollation = require('../utils').decorateWithCollation;
7+
const decorateWithReadConcern = require('../utils').decorateWithReadConcern;
8+
const handleCallback = require('../utils').handleCallback;
9+
const MongoError = require('mongodb-core').MongoError;
10+
const resolveReadPreference = require('../utils').resolveReadPreference;
11+
const toError = require('../utils').toError;
12+
13+
const DB_AGGREGATE_COLLECTION = 1;
14+
15+
class AggregateOperation extends OperationBase {
16+
constructor(db, collection, pipeline, options) {
17+
super(options);
18+
19+
this.db = db;
20+
this.collection = collection;
21+
this.pipeline = pipeline;
22+
}
23+
24+
execute(callback) {
25+
const db = this.db;
26+
const coll = this.collection;
27+
let pipeline = this.pipeline;
28+
let options = this.options;
29+
30+
const isDbAggregate = typeof coll === 'string';
31+
const target = isDbAggregate ? db : coll;
32+
const topology = target.s.topology;
33+
let hasOutStage = false;
34+
35+
if (typeof options.out === 'string') {
36+
pipeline = pipeline.concat({ $out: options.out });
37+
hasOutStage = true;
38+
} else if (pipeline.length > 0 && pipeline[pipeline.length - 1]['$out']) {
39+
hasOutStage = true;
40+
}
41+
42+
let command;
43+
let namespace;
44+
let optionSources;
45+
46+
if (isDbAggregate) {
47+
command = { aggregate: DB_AGGREGATE_COLLECTION, pipeline: pipeline };
48+
namespace = `${db.s.databaseName}.${DB_AGGREGATE_COLLECTION}`;
49+
50+
optionSources = { db };
51+
} else {
52+
command = { aggregate: coll.s.name, pipeline: pipeline };
53+
namespace = coll.s.namespace;
54+
55+
optionSources = { db: coll.s.db, collection: coll };
56+
}
57+
58+
const takesWriteConcern = topology.capabilities().commandsTakeWriteConcern;
59+
60+
if (!hasOutStage) {
61+
decorateWithReadConcern(command, target, options);
62+
}
63+
64+
if (pipeline.length > 0 && pipeline[pipeline.length - 1]['$out'] && takesWriteConcern) {
65+
applyWriteConcern(command, optionSources, options);
66+
}
67+
68+
try {
69+
decorateWithCollation(command, target, options);
70+
} catch (err) {
71+
if (typeof callback === 'function') return callback(err, null);
72+
throw err;
73+
}
74+
75+
if (options.bypassDocumentValidation === true) {
76+
command.bypassDocumentValidation = options.bypassDocumentValidation;
77+
}
78+
79+
if (typeof options.allowDiskUse === 'boolean') command.allowDiskUse = options.allowDiskUse;
80+
if (typeof options.maxTimeMS === 'number') command.maxTimeMS = options.maxTimeMS;
81+
82+
if (options.hint) command.hint = options.hint;
83+
84+
options = Object.assign({}, options);
85+
86+
// Ensure we have the right read preference inheritance
87+
options.readPreference = resolveReadPreference(options, optionSources);
88+
89+
if (options.explain) {
90+
if (command.readConcern || command.writeConcern) {
91+
throw toError(
92+
'"explain" cannot be used on an aggregate call with readConcern/writeConcern'
93+
);
94+
}
95+
command.explain = options.explain;
96+
}
97+
98+
if (typeof options.comment === 'string') command.comment = options.comment;
99+
100+
// Validate that cursor options is valid
101+
if (options.cursor != null && typeof options.cursor !== 'object') {
102+
throw toError('cursor options must be an object');
103+
}
104+
105+
options.cursor = options.cursor || {};
106+
if (options.batchSize && !hasOutStage) options.cursor.batchSize = options.batchSize;
107+
command.cursor = options.cursor;
108+
109+
// promiseLibrary
110+
options.promiseLibrary = target.s.promiseLibrary;
111+
112+
// Set the AggregationCursor constructor
113+
options.cursorFactory = AggregationCursor;
114+
115+
if (typeof callback !== 'function') {
116+
if (!topology.capabilities()) {
117+
throw new MongoError('cannot connect to server');
118+
}
119+
120+
return topology.cursor(namespace, command, options);
121+
}
122+
123+
return handleCallback(callback, null, topology.cursor(namespace, command, options));
124+
}
125+
}
126+
127+
module.exports = AggregateOperation;

lib/operations/bulk_write.js

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
'use strict';
2+
3+
const applyRetryableWrites = require('../utils').applyRetryableWrites;
4+
const applyWriteConcern = require('../utils').applyWriteConcern;
5+
const MongoError = require('mongodb-core').MongoError;
6+
const OperationBase = require('./operation').OperationBase;
7+
8+
class BulkWriteOperation extends OperationBase {
9+
constructor(collection, operations, options) {
10+
super(options);
11+
12+
this.collection = collection;
13+
this.operations = operations;
14+
}
15+
16+
execute(callback) {
17+
const coll = this.collection;
18+
const operations = this.operations;
19+
let options = this.options;
20+
21+
// Add ignoreUndfined
22+
if (coll.s.options.ignoreUndefined) {
23+
options = Object.assign({}, options);
24+
options.ignoreUndefined = coll.s.options.ignoreUndefined;
25+
}
26+
27+
// Create the bulk operation
28+
const bulk =
29+
options.ordered === true || options.ordered == null
30+
? coll.initializeOrderedBulkOp(options)
31+
: coll.initializeUnorderedBulkOp(options);
32+
33+
// Do we have a collation
34+
let collation = false;
35+
36+
// for each op go through and add to the bulk
37+
try {
38+
for (let i = 0; i < operations.length; i++) {
39+
// Get the operation type
40+
const key = Object.keys(operations[i])[0];
41+
// Check if we have a collation
42+
if (operations[i][key].collation) {
43+
collation = true;
44+
}
45+
46+
// Pass to the raw bulk
47+
bulk.raw(operations[i]);
48+
}
49+
} catch (err) {
50+
return callback(err, null);
51+
}
52+
53+
// Final options for retryable writes and write concern
54+
let finalOptions = Object.assign({}, options);
55+
finalOptions = applyRetryableWrites(finalOptions, coll.s.db);
56+
finalOptions = applyWriteConcern(finalOptions, { db: coll.s.db, collection: coll }, options);
57+
58+
const writeCon = finalOptions.writeConcern ? finalOptions.writeConcern : {};
59+
const capabilities = coll.s.topology.capabilities();
60+
61+
// Did the user pass in a collation, check if our write server supports it
62+
if (collation && capabilities && !capabilities.commandsTakeCollation) {
63+
return callback(new MongoError('server/primary/mongos does not support collation'));
64+
}
65+
66+
// Execute the bulk
67+
bulk.execute(writeCon, finalOptions, (err, r) => {
68+
// We have connection level error
69+
if (!r && err) {
70+
return callback(err, null);
71+
}
72+
73+
r.insertedCount = r.nInserted;
74+
r.matchedCount = r.nMatched;
75+
r.modifiedCount = r.nModified || 0;
76+
r.deletedCount = r.nRemoved;
77+
r.upsertedCount = r.getUpsertedIds().length;
78+
r.upsertedIds = {};
79+
r.insertedIds = {};
80+
81+
// Update the n
82+
r.n = r.insertedCount;
83+
84+
// Inserted documents
85+
const inserted = r.getInsertedIds();
86+
// Map inserted ids
87+
for (let i = 0; i < inserted.length; i++) {
88+
r.insertedIds[inserted[i].index] = inserted[i]._id;
89+
}
90+
91+
// Upserted documents
92+
const upserted = r.getUpsertedIds();
93+
// Map upserted ids
94+
for (let i = 0; i < upserted.length; i++) {
95+
r.upsertedIds[upserted[i].index] = upserted[i]._id;
96+
}
97+
98+
// Return the results
99+
callback(null, r);
100+
});
101+
}
102+
}
103+
104+
module.exports = BulkWriteOperation;

0 commit comments

Comments
 (0)