Skip to content

Commit

Permalink
feat(NODE-5274): deprecate write concern options (#3752)
Browse files Browse the repository at this point in the history
  • Loading branch information
durran committed Jul 6, 2023
1 parent 0668cd8 commit 1f880ea
Show file tree
Hide file tree
Showing 9 changed files with 215 additions and 56 deletions.
21 changes: 4 additions & 17 deletions src/gridfs/upload.ts
Expand Up @@ -263,9 +263,8 @@ async function checkChunksIndex(stream: GridFSBucketWriteStream): Promise<void>
});

if (!hasChunksIndex) {
const writeConcernOptions = getWriteOptions(stream);
await stream.chunks.createIndex(index, {
...writeConcernOptions,
...stream.writeConcern,
background: true,
unique: true
});
Expand All @@ -292,7 +291,7 @@ function checkDone(stream: GridFSBucketWriteStream, callback?: Callback): boolea
return false;
}

stream.files.insertOne(filesDoc, getWriteOptions(stream)).then(
stream.files.insertOne(filesDoc, { writeConcern: stream.writeConcern }).then(
() => {
stream.emit(GridFSBucketWriteStream.FINISH, filesDoc);
stream.emit(GridFSBucketWriteStream.CLOSE);
Expand Down Expand Up @@ -423,7 +422,7 @@ function doWrite(
return false;
}

stream.chunks.insertOne(doc, getWriteOptions(stream)).then(
stream.chunks.insertOne(doc, { writeConcern: stream.writeConcern }).then(
() => {
--stream.state.outstandingRequests;
--outstandingRequests;
Expand Down Expand Up @@ -453,18 +452,6 @@ function doWrite(
return false;
}

function getWriteOptions(stream: GridFSBucketWriteStream): WriteConcernOptions {
const obj: WriteConcernOptions = {};
if (stream.writeConcern) {
obj.writeConcern = {
w: stream.writeConcern.w,
wtimeout: stream.writeConcern.wtimeout,
j: stream.writeConcern.j
};
}
return obj;
}

function waitForIndexes(
stream: GridFSBucketWriteStream,
callback: (res: boolean) => boolean
Expand Down Expand Up @@ -499,7 +486,7 @@ function writeRemnant(stream: GridFSBucketWriteStream, callback?: Callback): boo
return false;
}

stream.chunks.insertOne(doc, getWriteOptions(stream)).then(
stream.chunks.insertOne(doc, { writeConcern: stream.writeConcern }).then(
() => {
--stream.state.outstandingRequests;
checkDone(stream);
Expand Down
3 changes: 2 additions & 1 deletion src/operations/aggregate.ts
Expand Up @@ -3,6 +3,7 @@ import { MongoInvalidArgumentError } from '../error';
import type { Server } from '../sdam/server';
import type { ClientSession } from '../sessions';
import { type Callback, maxWireVersion, type MongoDBNamespace } from '../utils';
import { WriteConcern } from '../write_concern';
import { type CollationOptions, CommandOperation, type CommandOperationOptions } from './command';
import { Aspect, defineAspects, type Hint } from './operation';

Expand Down Expand Up @@ -102,7 +103,7 @@ export class AggregateOperation<T = Document> extends CommandOperation<T> {
}

if (this.hasWriteStage && this.writeConcern) {
Object.assign(command, { writeConcern: this.writeConcern });
WriteConcern.apply(command, this.writeConcern);
}

if (options.bypassDocumentValidation === true) {
Expand Down
2 changes: 1 addition & 1 deletion src/operations/command.ts
Expand Up @@ -135,7 +135,7 @@ export abstract class CommandOperation<T> extends AbstractCallbackOperation<T> {
}

if (this.writeConcern && this.hasAspect(Aspect.WRITE_OPERATION) && !inTransaction) {
Object.assign(cmd, { writeConcern: this.writeConcern });
WriteConcern.apply(cmd, this.writeConcern);
}

if (
Expand Down
5 changes: 3 additions & 2 deletions src/sessions.ts
Expand Up @@ -45,6 +45,7 @@ import {
now,
uuidV4
} from './utils';
import { WriteConcern } from './write_concern';

const minWireVersionForShardedTransactions = 8;

Expand Down Expand Up @@ -703,11 +704,11 @@ function endTransaction(
}

if (txnState === TxnState.TRANSACTION_COMMITTED) {
writeConcern = Object.assign({ wtimeout: 10000 }, writeConcern, { w: 'majority' });
writeConcern = Object.assign({ wtimeoutMS: 10000 }, writeConcern, { w: 'majority' });
}

if (writeConcern) {
Object.assign(command, { writeConcern });
WriteConcern.apply(command, writeConcern);
}

if (commandName === 'commitTransaction' && session.transaction.options.maxTimeMS) {
Expand Down
81 changes: 64 additions & 17 deletions src/write_concern.ts
@@ -1,3 +1,5 @@
import { type Document } from './bson';

/** @public */
export type W = number | 'majority';

Expand All @@ -17,16 +19,35 @@ export interface WriteConcernSettings {
journal?: boolean;

// legacy options
/** The journal write concern */
/**
* The journal write concern.
* @deprecated Will be removed in the next major version. Please use the journal option.
*/
j?: boolean;
/** The write concern timeout */
/**
* The write concern timeout.
* @deprecated Will be removed in the next major version. Please use the wtimeoutMS option.
*/
wtimeout?: number;
/** The file sync write concern */
/**
* The file sync write concern.
* @deprecated Will be removed in the next major version. Please use the journal option.
*/
fsync?: boolean | 1;
}

export const WRITE_CONCERN_KEYS = ['w', 'wtimeout', 'j', 'journal', 'fsync'];

/** The write concern options that decorate the server command. */
interface CommandWriteConcernOptions {
/** The write concern */
w?: W;
/** The journal write concern. */
j?: boolean;
/** The write concern timeout. */
wtimeout?: number;
}

/**
* A MongoDB WriteConcern, which describes the level of acknowledgement
* requested from MongoDB for write operations.
Expand All @@ -35,41 +56,67 @@ export const WRITE_CONCERN_KEYS = ['w', 'wtimeout', 'j', 'journal', 'fsync'];
* @see https://www.mongodb.com/docs/manual/reference/write-concern/
*/
export class WriteConcern {
/** request acknowledgment that the write operation has propagated to a specified number of mongod instances or to mongod instances with specified tags. */
w?: W;
/** specify a time limit to prevent write operations from blocking indefinitely */
/** Request acknowledgment that the write operation has propagated to a specified number of mongod instances or to mongod instances with specified tags. */
readonly w?: W;
/** Request acknowledgment that the write operation has been written to the on-disk journal */
readonly journal?: boolean;
/** Specify a time limit to prevent write operations from blocking indefinitely */
readonly wtimeoutMS?: number;
/**
* Specify a time limit to prevent write operations from blocking indefinitely.
* @deprecated Will be removed in the next major version. Please use wtimeoutMS.
*/
wtimeout?: number;
/** request acknowledgment that the write operation has been written to the on-disk journal */
/**
* Request acknowledgment that the write operation has been written to the on-disk journal.
* @deprecated Will be removed in the next major version. Please use journal.
*/
j?: boolean;
/** equivalent to the j option */
/**
* Equivalent to the j option.
* @deprecated Will be removed in the next major version. Please use journal.
*/
fsync?: boolean | 1;

/**
* Constructs a WriteConcern from the write concern properties.
* @param w - request acknowledgment that the write operation has propagated to a specified number of mongod instances or to mongod instances with specified tags.
* @param wtimeout - specify a time limit to prevent write operations from blocking indefinitely
* @param j - request acknowledgment that the write operation has been written to the on-disk journal
* @param fsync - equivalent to the j option
* @param wtimeoutMS - specify a time limit to prevent write operations from blocking indefinitely
* @param journal - request acknowledgment that the write operation has been written to the on-disk journal
* @param fsync - equivalent to the j option. Is deprecated and will be removed in the next major version.
*/
constructor(w?: W, wtimeout?: number, j?: boolean, fsync?: boolean | 1) {
constructor(w?: W, wtimeoutMS?: number, journal?: boolean, fsync?: boolean | 1) {
if (w != null) {
if (!Number.isNaN(Number(w))) {
this.w = Number(w);
} else {
this.w = w;
}
}
if (wtimeout != null) {
this.wtimeout = wtimeout;
if (wtimeoutMS != null) {
this.wtimeoutMS = this.wtimeout = wtimeoutMS;
}
if (j != null) {
this.j = j;
if (journal != null) {
this.journal = this.j = journal;
}
if (fsync != null) {
this.fsync = fsync;
this.journal = this.j = fsync ? true : false;
}
}

/**
* Apply a write concern to a command document. Will modify and return the command.
*/
static apply(command: Document, writeConcern: WriteConcern): Document {
const wc: CommandWriteConcernOptions = {};
// The write concern document sent to the server has w/wtimeout/j fields.
if (writeConcern.w != null) wc.w = writeConcern.w;
if (writeConcern.wtimeoutMS != null) wc.wtimeout = writeConcern.wtimeoutMS;
if (writeConcern.journal != null) wc.j = writeConcern.j;
command.writeConcern = wc;
return command;
}

/** Construct a WriteConcern given an options object. */
static fromOptions(
options?: WriteConcernOptions | WriteConcern | W,
Expand Down
28 changes: 14 additions & 14 deletions test/integration/crud/find_and_modify.test.ts
Expand Up @@ -119,36 +119,36 @@ describe('Collection (#findOneAnd...)', function () {
});

it('passes through the writeConcern', async function () {
await collection.findOneAndDelete({}, { writeConcern: { fsync: 1 } });
expect(started[0].command.writeConcern).to.deep.equal({ fsync: 1 });
await collection.findOneAndDelete({}, { writeConcern: { j: 1 } });
expect(started[0].command.writeConcern).to.deep.equal({ j: 1 });
});
});

context('when provided at the collection level', function () {
beforeEach(async function () {
collection = client
.db('test')
.collection('findAndModifyTest', { writeConcern: { fsync: 1 } });
.collection('findAndModifyTest', { writeConcern: { j: 1 } });
await collection.insertMany([{ a: 1, b: 1 }], { writeConcern: { w: 1 } });
});

it('passes through the writeConcern', async function () {
await collection.findOneAndDelete({});
expect(started[0].command.writeConcern).to.deep.equal({ fsync: 1 });
expect(started[0].command.writeConcern).to.deep.equal({ j: 1 });
});
});

context('when provided at the db level', function () {
beforeEach(async function () {
collection = client
.db('test', { writeConcern: { fsync: 1 } })
.db('test', { writeConcern: { j: 1 } })
.collection('findAndModifyTest');
await collection.insertMany([{ a: 1, b: 1 }], { writeConcern: { w: 1 } });
});

it('passes through the writeConcern', async function () {
await collection.findOneAndDelete({});
expect(started[0].command.writeConcern).to.deep.equal({ fsync: 1 });
expect(started[0].command.writeConcern).to.deep.equal({ j: 1 });
});
});
});
Expand Down Expand Up @@ -297,36 +297,36 @@ describe('Collection (#findOneAnd...)', function () {
});

it('passes through the writeConcern', async function () {
await collection.findOneAndUpdate({}, { $set: { a: 1 } }, { writeConcern: { fsync: 1 } });
expect(started[0].command.writeConcern).to.deep.equal({ fsync: 1 });
await collection.findOneAndUpdate({}, { $set: { a: 1 } }, { writeConcern: { j: 1 } });
expect(started[0].command.writeConcern).to.deep.equal({ j: 1 });
});
});

context('when provided at the collection level', function () {
beforeEach(async function () {
collection = client
.db('test')
.collection('findAndModifyTest', { writeConcern: { fsync: 1 } });
.collection('findAndModifyTest', { writeConcern: { j: 1 } });
await collection.insertMany([{ a: 1, b: 1 }], { writeConcern: { w: 1 } });
});

it('passes through the writeConcern', async function () {
await collection.findOneAndUpdate({}, { $set: { a: 1 } });
expect(started[0].command.writeConcern).to.deep.equal({ fsync: 1 });
expect(started[0].command.writeConcern).to.deep.equal({ j: 1 });
});
});

context('when provided at the db level', function () {
beforeEach(async function () {
collection = client
.db('test', { writeConcern: { fsync: 1 } })
.db('test', { writeConcern: { j: 1 } })
.collection('findAndModifyTest');
await collection.insertMany([{ a: 1, b: 1 }], { writeConcern: { w: 1 } });
});

it('passes through the writeConcern', async function () {
await collection.findOneAndUpdate({}, { $set: { a: 1 } });
expect(started[0].command.writeConcern).to.deep.equal({ fsync: 1 });
expect(started[0].command.writeConcern).to.deep.equal({ j: 1 });
});
});
});
Expand Down Expand Up @@ -468,8 +468,8 @@ describe('Collection (#findOneAnd...)', function () {
});

it('passes through the writeConcern', async function () {
await collection.findOneAndReplace({}, { b: 1 }, { writeConcern: { fsync: 1 } });
expect(started[0].command.writeConcern).to.deep.equal({ fsync: 1 });
await collection.findOneAndReplace({}, { b: 1 }, { writeConcern: { j: 1 } });
expect(started[0].command.writeConcern).to.deep.equal({ j: 1 });
});
});

Expand Down
5 changes: 2 additions & 3 deletions test/integration/node-specific/mongo_client.test.ts
Expand Up @@ -52,9 +52,8 @@ describe('class MongoClient', function () {

expect(db).to.have.property('writeConcern');
expect(db.writeConcern).to.have.property('w', 1);
expect(db.writeConcern).to.have.property('wtimeout', 1000);
expect(db.writeConcern).to.have.property('fsync', true);
expect(db.writeConcern).to.have.property('j', true);
expect(db.writeConcern).to.have.property('wtimeoutMS', 1000);
expect(db.writeConcern).to.have.property('journal', true);

expect(db).to.have.property('s');
expect(db.s).to.have.property('readPreference');
Expand Down
2 changes: 1 addition & 1 deletion test/integration/uri-options/uri.test.js
Expand Up @@ -66,7 +66,7 @@ describe('URI', function () {
const client = this.configuration.newClient('mongodb://127.0.0.1:27017/?fsync=true');
client.connect((err, client) => {
var db = client.db(this.configuration.db);
expect(db.writeConcern.fsync).to.be.true;
expect(db.writeConcern.journal).to.be.true;
client.close(done);
});
}
Expand Down

0 comments on commit 1f880ea

Please sign in to comment.