Skip to content

Commit

Permalink
feat: add withReadConcern builder to AbstractCursor (#2645)
Browse files Browse the repository at this point in the history
This patch adds a builder method to add a read concern to a cursor
similar to the existing method to build with a ReadPreference. It
also changes `setReadPreference` to `withReadPreference` so the two
follow a common convention.

NODE-2806
  • Loading branch information
mbroadst committed Dec 1, 2020
1 parent 8d44cc2 commit 0cca729
Show file tree
Hide file tree
Showing 10 changed files with 72 additions and 20 deletions.
3 changes: 2 additions & 1 deletion docs/reference/content/tutorials/crud.md
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,8 @@ collection.find({}).max(10) // Set the cursor
collection.find({}).maxTimeMS(1000) // Set the cursor maxTimeMS
collection.find({}).min(100) // Set the cursor min
collection.find({}).returnKey(10) // Set the cursor returnKey
collection.find({}).setReadPreference(ReadPreference.PRIMARY) // Set the cursor readPreference
collection.find({}).withReadPreference(ReadPreference.PRIMARY) // Set the cursor readPreference
collection.find({}).withReadConcern('majority') // Set the cursor readConcern
collection.find({}).showRecordId(true) // Set the cursor showRecordId
collection.find({}).sort([['a', 1]]) // Sets the sort order of the cursor query
collection.find({}).hint('a_1') // Set the cursor hint
Expand Down
29 changes: 28 additions & 1 deletion src/cursor/abstract_cursor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type { Topology } from '../sdam/topology';
import { Readable, Transform } from 'stream';
import { EventEmitter } from 'events';
import type { ExecutionResult } from '../operations/execute_operation';
import { ReadConcern, ReadConcernLike } from '../read_concern';

const kId = Symbol('id');
const kDocuments = Symbol('documents');
Expand Down Expand Up @@ -50,6 +51,7 @@ export type CursorFlag = typeof CURSOR_FLAGS[number];
export interface AbstractCursorOptions extends BSONSerializeOptions {
session?: ClientSession;
readPreference?: ReadPreferenceLike;
readConcern?: ReadConcernLike;
batchSize?: number;
maxTimeMS?: number;
comment?: Document | string;
Expand All @@ -62,6 +64,7 @@ export interface AbstractCursorOptions extends BSONSerializeOptions {
export type InternalAbstractCursorOptions = Omit<AbstractCursorOptions, 'readPreference'> & {
// resolved
readPreference: ReadPreference;
readConcern?: ReadConcern;

// cursor flags, some are deprecated
oplogReplay?: boolean;
Expand Down Expand Up @@ -107,6 +110,11 @@ export abstract class AbstractCursor extends EventEmitter {
...pluckBSONSerializeOptions(options)
};

const readConcern = ReadConcern.fromOptions(options);
if (readConcern) {
this[kOptions].readConcern = readConcern;
}

if (typeof options.batchSize === 'number') {
this[kOptions].batchSize = options.batchSize;
}
Expand Down Expand Up @@ -144,6 +152,10 @@ export abstract class AbstractCursor extends EventEmitter {
return this[kOptions].readPreference;
}

get readConcern(): ReadConcern | undefined {
return this[kOptions].readConcern;
}

get session(): ClientSession | undefined {
return this[kSession];
}
Expand Down Expand Up @@ -434,7 +446,7 @@ export abstract class AbstractCursor extends EventEmitter {
*
* @param readPreference - The new read preference for the cursor.
*/
setReadPreference(readPreference: ReadPreferenceLike): this {
withReadPreference(readPreference: ReadPreferenceLike): this {
assertUninitialized(this);
if (readPreference instanceof ReadPreference) {
this[kOptions].readPreference = readPreference;
Expand All @@ -447,6 +459,21 @@ export abstract class AbstractCursor extends EventEmitter {
return this;
}

/**
* Set the ReadPreference for the cursor.
*
* @param readPreference - The new read preference for the cursor.
*/
withReadConcern(readConcern: ReadConcernLike): this {
assertUninitialized(this);
const resolvedReadConcern = ReadConcern.fromOptions({ readConcern });
if (resolvedReadConcern) {
this[kOptions].readConcern = resolvedReadConcern;
}

return this;
}

/**
* Set a maxTimeMS on the cursor query, allowing for hard timeout limits on queries (Only supported on MongoDB 2.6 or higher)
*
Expand Down
2 changes: 1 addition & 1 deletion src/gridfs-stream/download.ts
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ function init(stream: GridFSBucketReadStream): void {
stream.s.cursor = stream.s.chunks.find(filter).sort({ n: 1 });

if (stream.s.readPreference) {
stream.s.cursor.setReadPreference(stream.s.readPreference);
stream.s.cursor.withReadPreference(stream.s.readPreference);
}

stream.s.expectedEnd = Math.ceil(doc.length / doc.chunkSize);
Expand Down
4 changes: 2 additions & 2 deletions src/operations/find.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ export class FindOperation extends CommandOperation<FindOptions, Document> {
findCommand.maxTimeMS = options.maxTimeMS;
}

if (this.readConcern && (!this.session || !this.session.inTransaction())) {
findCommand.readConcern = this.readConcern;
if (this.readConcern) {
findCommand.readConcern = this.readConcern.toJSON();
}

if (options.max) {
Expand Down
6 changes: 6 additions & 0 deletions src/read_concern.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { Document } from './bson';

/** @public */
export enum ReadConcernLevel {
local = 'local',
Expand Down Expand Up @@ -78,4 +80,8 @@ export class ReadConcern {
static get SNAPSHOT(): string {
return ReadConcernLevel.snapshot;
}

toJSON(): Document {
return { level: this.level };
}
}
4 changes: 2 additions & 2 deletions test/functional/apm.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ describe('APM', function () {
.batchSize(2)
.comment('some comment')
.maxTimeMS(5000)
.setReadPreference(ReadPreference.PRIMARY)
.withReadPreference(ReadPreference.PRIMARY)
.addCursorFlag('noCursorTimeout', true)
.toArray();
})
Expand Down Expand Up @@ -445,7 +445,7 @@ describe('APM', function () {
.batchSize(2)
.comment('some comment')
.maxTimeMS(5000)
.setReadPreference(ReadPreference.PRIMARY)
.withReadPreference(ReadPreference.PRIMARY)
.addCursorFlag('noCursorTimeout', true)
.toArray();
})
Expand Down
4 changes: 2 additions & 2 deletions test/functional/buffering_proxy.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ describe.skip('Buffering Proxy', function () {

db.collection('test')
.find({})
.setReadPreference(new ReadPreference(ReadPreference.SECONDARY))
.withReadPreference(new ReadPreference(ReadPreference.SECONDARY))
.toArray(function (err) {
expect(err).to.not.exist;
results.push('find');
Expand Down Expand Up @@ -439,7 +439,7 @@ describe.skip('Buffering Proxy', function () {

db.collection('test')
.find({})
.setReadPreference(new ReadPreference(ReadPreference.SECONDARY))
.withReadPreference(new ReadPreference(ReadPreference.SECONDARY))
.toArray(function (err) {
expect(err).to.not.exist;
results.push('find');
Expand Down
22 changes: 20 additions & 2 deletions test/functional/cursor.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -2321,19 +2321,37 @@ describe('Cursor', function () {
try {
db.collection('shouldFailToSetReadPreferenceOnCursor')
.find()
.setReadPreference('notsecondary');
.withReadPreference('notsecondary');
test.ok(false);
} catch (err) {} // eslint-disable-line

db.collection('shouldFailToSetReadPreferenceOnCursor')
.find()
.setReadPreference('secondary');
.withReadPreference('secondary');

done();
});
}
});

it('should allow setting the cursors readConcern through a builder', {
metadata: { requires: { mongodb: '>=3.2' } },
test: withMonitoredClient(['find'], function (client, events, done) {
const db = client.db(this.configuration.db);
const cursor = db.collection('foo').find().withReadConcern('local');
expect(cursor).property('readConcern').to.have.property('level').equal('local');

cursor.toArray(err => {
expect(err).to.not.exist;

expect(events).to.have.length(1);
const findCommand = events[0];
expect(findCommand).nested.property('command.readConcern').to.eql({ level: 'local' });
done();
});
})
});

it('shouldNotFailDueToStackOverflowEach', {
// Add a tag that our runner can trigger on
// in this case we are setting that node needs to be higher than 0.10.X to run
Expand Down
2 changes: 1 addition & 1 deletion test/functional/max_staleness.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ describe('Max Staleness', function () {
// Get a db with a new readPreference
db.collection('test')
.find({})
.setReadPreference(readPreference)
.withReadPreference(readPreference)
.toArray(function (err) {
expect(err).to.not.exist;
expect(test.checkCommand).to.eql({
Expand Down
16 changes: 8 additions & 8 deletions test/functional/readpreference.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -478,15 +478,15 @@ describe('ReadPreference', function () {
})
});

it('should set hedge using [.setReadPreference & empty hedge] ', {
it('should set hedge using [.withReadPreference & empty hedge] ', {
metadata: { requires: { mongodb: '>=3.6.0' } },
test: withMonitoredClient(['find'], function (client, events, done) {
const rp = new ReadPreference(ReadPreference.SECONDARY, null, { hedge: {} });
client
.db(this.configuration.db)
.collection('test')
.find({})
.setReadPreference(rp)
.withReadPreference(rp)
.toArray(err => {
expect(err).to.not.exist;
const expected = { mode: ReadPreference.SECONDARY, hedge: {} };
Expand All @@ -496,15 +496,15 @@ describe('ReadPreference', function () {
})
});

it('should set hedge using [.setReadPreference & enabled hedge] ', {
it('should set hedge using [.withReadPreference & enabled hedge] ', {
metadata: { requires: { mongodb: '>=3.6.0' } },
test: withMonitoredClient(['find'], function (client, events, done) {
const rp = new ReadPreference(ReadPreference.SECONDARY, null, { hedge: { enabled: true } });
client
.db(this.configuration.db)
.collection('test')
.find({})
.setReadPreference(rp)
.withReadPreference(rp)
.toArray(err => {
expect(err).to.not.exist;
const expected = { mode: ReadPreference.SECONDARY, hedge: { enabled: true } };
Expand All @@ -514,7 +514,7 @@ describe('ReadPreference', function () {
})
});

it('should set hedge using [.setReadPreference & disabled hedge] ', {
it('should set hedge using [.withReadPreference & disabled hedge] ', {
metadata: { requires: { mongodb: '>=3.6.0' } },
test: withMonitoredClient(['find'], function (client, events, done) {
const rp = new ReadPreference(ReadPreference.SECONDARY, null, {
Expand All @@ -524,7 +524,7 @@ describe('ReadPreference', function () {
.db(this.configuration.db)
.collection('test')
.find({})
.setReadPreference(rp)
.withReadPreference(rp)
.toArray(err => {
expect(err).to.not.exist;
const expected = { mode: ReadPreference.SECONDARY, hedge: { enabled: false } };
Expand All @@ -534,15 +534,15 @@ describe('ReadPreference', function () {
})
});

it('should set hedge using [.setReadPreference & undefined hedge] ', {
it('should set hedge using [.withReadPreference & undefined hedge] ', {
metadata: { requires: { mongodb: '>=3.6.0' } },
test: withMonitoredClient(['find'], function (client, events, done) {
const rp = new ReadPreference(ReadPreference.SECONDARY, null);
client
.db(this.configuration.db)
.collection('test')
.find({})
.setReadPreference(rp)
.withReadPreference(rp)
.toArray(err => {
expect(err).to.not.exist;
const expected = { mode: ReadPreference.SECONDARY };
Expand Down

0 comments on commit 0cca729

Please sign in to comment.