Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(NODE-5044): Write Concern 0 Must Not Affect Read Operations (#3541) #3575

Merged
merged 4 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,8 @@ export class ChangeStream<
super();

this.pipeline = pipeline;
this.options = options;
this.options = { ...options };
delete this.options.writeConcern;
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved

if (parent instanceof Collection) {
this.type = CHANGE_DOMAIN_TYPES.COLLECTION;
Expand Down
4 changes: 3 additions & 1 deletion src/operations/aggregate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export class AggregateOperation<T = Document> extends CommandOperation<T> {
constructor(ns: MongoDBNamespace, pipeline: Document[], options?: AggregateOptions) {
super(undefined, { ...options, dbName: ns.db });

this.options = options ?? {};
this.options = { ...options };

// Covers when ns.collection is null, undefined or the empty string, use DB_AGGREGATE_COLLECTION
this.target = ns.collection || DB_AGGREGATE_COLLECTION;
Expand All @@ -65,6 +65,8 @@ export class AggregateOperation<T = Document> extends CommandOperation<T> {

if (this.hasWriteStage) {
this.trySecondaryWrite = true;
} else {
delete this.options.writeConcern;
}

if (this.explain && this.writeConcern) {
Expand Down
3 changes: 2 additions & 1 deletion src/operations/find.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ export class FindOperation extends CommandOperation<Document> {
) {
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
super(collection, options);

this.options = options;
this.options = { ...options };
delete this.options.writeConcern;
this.ns = ns;

if (typeof filter !== 'object' || Array.isArray(filter)) {
Expand Down
3 changes: 2 additions & 1 deletion src/operations/indexes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,8 @@ export class ListIndexesOperation extends CommandOperation<Document> {
constructor(collection: Collection, options?: ListIndexesOptions) {
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
super(collection, options);

this.options = options ?? {};
this.options = { ...options };
delete this.options.writeConcern;
this.collectionNamespace = collection.s.namespace;
}

Expand Down
3 changes: 2 additions & 1 deletion src/operations/list_collections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ export class ListCollectionsOperation extends CommandOperation<string[]> {
constructor(db: Db, filter: Document, options?: ListCollectionsOptions) {
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
super(db, options);

this.options = options ?? {};
this.options = { ...options };
delete this.options.writeConcern;
this.db = db;
this.filter = filter;
this.nameOnly = !!this.options.nameOnly;
Expand Down
75 changes: 0 additions & 75 deletions test/integration/read-write-concern/write_concern.test.js

This file was deleted.

174 changes: 174 additions & 0 deletions test/integration/read-write-concern/write_concern.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
import { expect } from 'chai';
import { on, once } from 'events';

import { Collection, Db, LEGACY_HELLO_COMMAND, MongoClient } from '../../mongodb';
import * as mock from '../../tools/mongodb-mock/index';

describe('Write Concern', function () {
it('should respect writeConcern from uri', function (done) {
const client = this.configuration.newClient(
`${this.configuration.url()}&w=0&monitorCommands=true`
);
const events: any[] = [];
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
client.on('commandStarted', event => {
if (event.commandName === 'insert') {
events.push(event);
}
});

expect(client.writeConcern).to.eql({ w: 0 });
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
client
.db('test')
.collection('test')
.insertOne({ a: 1 }, (err, result) => {
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
expect(err).to.not.exist;
expect(result).to.exist;
expect(events).to.be.an('array').with.lengthOf(1);
expect(events[0]).to.containSubset({
commandName: 'insert',
command: {
writeConcern: { w: 0 }
}
});
client.close(done);
});
});

describe('mock server write concern test', () => {
let server;
before(() => {
return mock.createServer().then(s => {
server = s;
});
});

after(() => mock.cleanup());

// TODO(NODE-3816): the mock server response is looking for writeConcern on all messages, but endSessions doesn't have it
it.skip('should pipe writeConcern from client down to API call', function () {
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
server.setMessageHandler(request => {
if (request.document && request.document[LEGACY_HELLO_COMMAND]) {
return request.reply(mock.HELLO);
}
expect(request.document.writeConcern).to.exist;
expect(request.document.writeConcern.w).to.equal('majority');
return request.reply({ ok: 1 });
});

const uri = `mongodb://${server.uri()}`;
const client = new MongoClient(uri, { writeConcern: { w: 'majority' } });
return client
.connect()
.then(() => {
const db = client.db('wc_test');
const collection = db.collection('wc');

return collection.insertMany([{ a: 2 }]);
})
.then(() => {
return client.close();
});
});
});

context('when performing read operations', function () {
context('when writeConcern = 0', function () {
describe('cursor creating operations with a getMore', function () {
let client: MongoClient;
let db: Db;
let col: Collection;

beforeEach(async function () {
client = this.configuration.newClient({ writeConcern: { w: 0 } });
await client.connect();
db = client.db('writeConcernTest');
col = db.collection('writeConcernTest');

const docs: any[] = [];
for (let i = 0; i < 100; i++) {
docs.push({ a: i, b: i + 1 });
}

await col.insertMany(docs);
});

afterEach(async function () {
await db.dropDatabase();
await client.close();
});

it('succeeds on find', async function () {
const findResult = col.find({}, { batchSize: 2 });
const err = await findResult.toArray().catch(e => e);

expect(err).to.not.be.instanceOf(Error);
});

it('succeeds on listCollections', async function () {
const collections: any[] = [];
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
for (let i = 0; i < 10; i++) {
collections.push(`writeConcernTestCol${i + 1}`);
}

for (const colName of collections) {
await db.createCollection(colName).catch(() => null);
}

const cols = db.listCollections({}, { batchSize: 2 });

const err = await cols.toArray().catch(e => e);

expect(err).to.not.be.instanceOf(Error);
});

it('succeeds on aggregate', async function () {
const aggResult = col.aggregate([{ $match: { a: { $gte: 0 } } }], { batchSize: 2 });
const err = await aggResult.toArray().catch(e => e);

expect(err).to.not.be.instanceOf(Error);
});

it('succeeds on listIndexes', async function () {
await col.createIndex({ a: 1 });
await col.createIndex({ b: -1 });
await col.createIndex({ a: 1, b: -1 });

const listIndexesResult = col.listIndexes({ batchSize: 2 });
const err = await listIndexesResult.toArray().catch(e => e);

expect(err).to.not.be.instanceOf(Error);
});

it('succeeds on changeStream', {
metadata: { requires: { topology: 'replicaset' } },
async test() {
const changeStream = col.watch(undefined, { batchSize: 2 });
const changes = on(changeStream, 'change');
await once(changeStream.cursor, 'init');

await col.insertMany(
[
{ a: 10 },
{ a: 10 },
{ a: 10 },
{ a: 10 },
{ a: 10 },
{ a: 10 },
{ a: 10 },
{ a: 10 },
{ a: 10 },
{ a: 10 },
{ a: 10 },
{ a: 10 }
],
{ writeConcern: { w: 'majority' } }
);

const err = await changes.next().catch(e => e);
expect(err).to.not.be.instanceOf(Error);
}
});
});
});
});
});
6 changes: 3 additions & 3 deletions test/unit/operations/find.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ describe('FindOperation', function () {
const operation = new FindOperation(undefined, namespace, filter, options);

it('sets the namespace', function () {
expect(operation.ns).to.equal(namespace);
expect(operation.ns).to.deep.equal(namespace);
});

it('sets options', function () {
expect(operation.options).to.equal(options);
expect(operation.options).to.deep.equal(options);
});

it('sets filter', function () {
expect(operation.filter).to.equal(filter);
expect(operation.filter).to.deep.equal(filter);
});
});

Expand Down