diff --git a/.gitignore b/.gitignore index ea1b8228..4ef38753 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,6 @@ *.DS_Store node_modules coverage + +# IDEs +.vscode diff --git a/README.md b/README.md index 7491b67d..4b79f050 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,143 @@ For a full list of supported collection and cursor methods, see `collectionOperationsMap`, `cursorTransformsMap` and `cursorOperationsMap` in index.js +## `getOps` without strict linking + +There is a `getOpsWithoutStrictLinking` flag, which can be set to +`true` to speed up `getOps` under certain circumstances, but with +potential risks to the integrity of the results. Read below for +more detail. + +### Introduction + +ShareDB has to deal with concurrency issues. In particular, here we +discuss the issue of submitting multiple competing ops against a +version of a document. + +For example, if I have a version of a document at v1, and I +simultaneously submit two ops (from different servers, say) against +this snapshot, then we need to handle the fact that only one of +these ops can be accepted as canonical and applied to the snapshot. + +This issue is dealt with through **optimistic locking**. Even if +you are only asking for a subset of the ops, under the default +behaviour, `getOps` will fetch **all** the ops up to the current +version. + +### Optimistic locking and linked ops + +`sharedb-mongo` deals with its concurrency issue with multiple op +submissions with optimistic locking. Here's an example of its +behaviour: + +- my doc exists at v1 +- two simultaneous v1 ops are submitted to ShareDB +- both ops are committed to the database +- one op is applied to the snapshot, and the updated snapshot is + written to the database +- the second op finds that its updated snapshot conflicts with + the committed snapshot, and the snapshot is rejected, but the + committed op **remains in the database** + +In reality, `sharedb-mongo` attempts to clean up this failed op, +but there's still the small chance that the server crashes +before it can do so, meaning that we may have multiple ops +lingering in the database with the same version. + +Because some non-canonical ops may exist in the database, we +cannot just perform a naive fetch of all the ops associated with +a document, because it may return multiple ops with the same +version (where one was successfully applied, and one was not). + +In order to return a valid set of canonical ops, the optimistic +locking has a notion of **linked ops**. That is, each op will +point back to the op that it built on top of, and ultimately +the current snapshot points to the op that committed it to the +database. + +Because of this, we can work backwards from the current snapshot, +following the trail of op links all the way back to get a chain +of canonical, valid, linked ops. This way, even if a spurious +op exists in the database, no other op will point to it, and it +will be correctly ignored. + +This approach has a big down-side: it forces us to fetch all the +ops up to the current version. This might be fine if you want +all ops, or are fetching very recent ops, but can have a large +impact on performance if you only want ops 1-10 of a 10,000 +op document, because you actually have to fetch all the ops. + +### Dropping strict linking + +In order to speed up the performance of `getOps`, you can set +`getOpsWithoutStrictLinking: true`. This will attempt to fetch +the bare minimum ops, whilst still trying to maintain op +integrity. + +The assumption that underpins this approach is that any op +that exists with a unique combination of `d` (document ID) +and `v` (version), **is a valid op**. In other words, it +had no conflicts and can be considered canonical. + +Consider a document with some ops, including some spurious, +failed ops: + +- v1: unique +- v2: unique +- v3: collision 3 +- v3: collision 3 +- v4: collision 4 +- v4: collision 4 +- v5: unique +- v6: unique +... +- v1000: unique + +If I want to fetch ops v1-v3, then we: + +- look up v4 +- find that v4 is not unique +- look up v5 +- see that v5 is unique and therefore assumed valid +- look backwards from v5 for a chain of valid ops, avoiding + the spurious commits for v4 and v3. +- This way we don't need to fetch all the ops from v5 to the + current version. + +In the case where a valid op cannot be determined, we still +fall back to fetching all ops and working backwards from the +current version. + +### Limitations + +#### Integrity + +Attempting to infer a canonical op can be dangerous compared +to simply following the valid op chain from the snapshot, +which is - by definition - canonical. + +This alternative behaviour should be safe, but should be used +with caution, because we are attempting to _infer_ a canonical +op, which may have unforeseen corner cases that return an +**invalid set of ops**. + +This may be especially true if the ops are modified outside +of `sharedb-mongo` (eg by setting a TTL, or manually updating +them). + +#### Recent ops + +There are cases where this flag may slow down behaviour. In +the case of attempting to fetch very recent ops, setting this +flag may make extra database round-trips where fetching the +snapshot would have been faster. + +#### `getOpsBulk` and `getOpsToSnapshot` + +This flag **only** applies to `getOps`, and **not** to the +similar `getOpsBulk` and `getOpsToSnapshot` methods, whose +performance will remain unchanged. + ## Error codes Mongo errors are passed back directly. Additional error codes: diff --git a/index.js b/index.js index dea9f590..bc43cb07 100644 --- a/index.js +++ b/index.js @@ -1,6 +1,7 @@ var async = require('async'); var mongodb = require('mongodb'); var DB = require('sharedb').DB; +var OpLinkValidator = require('./op-link-validator'); module.exports = ShareDbMongo; @@ -44,6 +45,12 @@ function ShareDbMongo(mongo, options) { // data in the mongo database. this.allowAggregateQueries = options.allowAllQueries || options.allowAggregateQueries || false; + // Setting this flag to true will attempt to infer a canonical op link for + // getOps rather than using the snapshot as the op link. This allows us to + // not fetch all ops to present when asking only for a subset. + // For more details on this, see the README. + this.getOpsWithoutStrictLinking = options.getOpsWithoutStrictLinking || false; + // Track whether the close method has been called this.closed = false; @@ -339,7 +346,8 @@ ShareDbMongo.prototype.getOpsToSnapshot = function(collectionName, id, from, sna var err = ShareDbMongo.missingLastOperationError(collectionName, id); return callback(err); } - this._getOps(collectionName, id, from, options, function(err, ops) { + var to = null; + this._getOps(collectionName, id, from, to, options, function(err, ops) { if (err) return callback(err); var filtered = getLinkedOps(ops, null, snapshot._opLink); var err = checkOpsFrom(collectionName, id, filtered, from); @@ -350,18 +358,22 @@ ShareDbMongo.prototype.getOpsToSnapshot = function(collectionName, id, from, sna ShareDbMongo.prototype.getOps = function(collectionName, id, from, to, options, callback) { var self = this; - this._getSnapshotOpLink(collectionName, id, function(err, doc) { + this._getOpLink(collectionName, id, to, function(err, opLink) { if (err) return callback(err); - if (doc) { - if (isCurrentVersion(doc, from)) { + // We need to fetch slightly more ops than requested in order to work backwards along + // linked ops to provide only valid ops + var fetchOpsTo = null; + if (opLink) { + if (isCurrentVersion(opLink, from)) { return callback(null, []); } - var err = doc && checkDocHasOp(collectionName, id, doc); + var err = opLink && checkDocHasOp(collectionName, id, opLink); if (err) return callback(err); + if (self.getOpsWithoutStrictLinking) fetchOpsTo = opLink._v; } - self._getOps(collectionName, id, from, options, function(err, ops) { + self._getOps(collectionName, id, from, fetchOpsTo, options, function(err, ops) { if (err) return callback(err); - var filtered = filterOps(ops, doc, to); + var filtered = filterOps(ops, opLink, to); var err = checkOpsFrom(collectionName, id, filtered, from); if (err) return callback(err); callback(null, filtered); @@ -540,16 +552,24 @@ function getLinkedOps(ops, to, link) { return linkedOps.reverse(); } -function getOpsQuery(id, from) { - return (from == null) ? - {d: id} : - {d: id, v: {$gte: from}}; +function getOpsQuery(id, from, to) { + from = from == null ? 0 : from; + var query = { + d: id, + v: { $gte: from } + }; + + if (to != null) { + query.v.$lt = to; + } + + return query; } -ShareDbMongo.prototype._getOps = function(collectionName, id, from, options, callback) { +ShareDbMongo.prototype._getOps = function(collectionName, id, from, to, options, callback) { this.getOpCollection(collectionName, function(err, opCollection) { if (err) return callback(err); - var query = getOpsQuery(id, from); + var query = getOpsQuery(id, from, to); // Exclude the `d` field, which is only for use internal to livedb-mongo. // Also exclude the `m` field, which can be used to store metadata on ops // for tracking purposes @@ -600,6 +620,77 @@ function readOpsBulk(stream, callback) { }); } +ShareDbMongo.prototype._getOpLink = function(collectionName, id, to, callback) { + if (!this.getOpsWithoutStrictLinking) return this._getSnapshotOpLink(collectionName, id, callback); + + var db = this; + this.getOpCollection(collectionName, function (error, collection) { + if (error) return callback(error); + + // If to is null, we want the most recent version, so just return the + // snapshot link, which is more efficient than cursoring + if (to == null) { + return db._getSnapshotOpLink(collectionName, id, callback); + } + + var query = { + d: id, + v: { $gte: to } + }; + + var projection = { + _id: 0, + v: 1, + o: 1 + }; + + var cursor = collection.find(query).sort({ v: 1 }).project(projection); + + getFirstOpWithUniqueVersion(cursor, null, function (error, op) { + if (error) return callback(error); + if (op) return callback(null, { _o: op.o, _v: op.v }); + + // If we couldn't find an op to link back from, then fall back to using the current + // snapshot, which is guaranteed to have a link to a valid op. + db._getSnapshotOpLink(collectionName, id, callback); + }); + }); +}; + +// When getting ops, we need to consider the case where an op is committed to the database, +// but its application to the snapshot is subsequently rejected. This can leave multiple ops +// with the same values for 'd' and 'v', and means that we may return multiple ops for a single +// version if we just perform a naive 'find' operation. +// To avoid this, we try to fetch the first op from 'to' which has a unique 'v', and then we +// work backwards from that op using the linked op 'o' field to get a valid chain of ops. +// See the README for more details. +function getFirstOpWithUniqueVersion(cursor, opLinkValidator, callback) { + opLinkValidator = opLinkValidator || new OpLinkValidator(); + + var opWithUniqueVersion = opLinkValidator.opWithUniqueVersion(); + + if (opWithUniqueVersion || opLinkValidator.isAtEndOfList()) { + var error = null; + return closeCursor(cursor, callback, error, opWithUniqueVersion); + } + + cursor.next(function (error, op) { + if (error) { + return closeCursor(cursor, callback, error); + } + + opLinkValidator.push(op); + getFirstOpWithUniqueVersion(cursor, opLinkValidator, callback); + }); +} + +function closeCursor(cursor, callback, error, returnValue) { + cursor.close(function (closeError) { + error = error || closeError; + callback(error, returnValue); + }); +} + ShareDbMongo.prototype._getSnapshotOpLink = function(collectionName, id, callback) { this.getCollection(collectionName, function(err, collection) { if (err) return callback(err); diff --git a/op-link-validator.js b/op-link-validator.js new file mode 100644 index 00000000..82f61a90 --- /dev/null +++ b/op-link-validator.js @@ -0,0 +1,78 @@ +/** + * This is a class for determining an op with a unique version number + * when presented with an **ordered** series of ops. + * + * For example, consider the following chain of op versions: + * 1 -> 1 -> 2 -> 2 -> 3 -> 4 + * If we want to find the first unique version, we must consider a + * window of three versions. For example, if we consider the first + * three versions: + * 1 -> 1 -> 2 + * Then we know that 1 is not unique. We don't know if 2 is unique + * yet, because we don't know what comes next. Therefore we push + * one more version and check again: + * 1 -> 2 -> 2 + * Again we now see that 2 is not unique, so we keep pushing ops + * until we reach the final window: + * 2 -> 3 -> 4 + * From here, **assuming the ops are well ordered** we can safely + * see that v3 is unique. We cannot make the same assumption of + * v4, because we don't know what comes next. + * + * Note that we also assume that the chain starts with **all** + * of the copies of an op version. That is that if we are provided + * 1 -> 2 + * Then v1 is unique (because there are no other v1s). + * + * Similarly, if a null op is pushed into the class, it is assumed + * to be the end of the chain, and hence a unique version can be + * inferred, eg with this chain: + * 5 -> 6 -> null + * We say that 6 is unique, because we've reached the end of the + * list + */ +function OpLinkValidator() { + this.currentOp = undefined; + this.previousOp = undefined; + this.oneBeforePreviousOp = undefined; +} + +OpLinkValidator.prototype.push = function (op) { + this.oneBeforePreviousOp = this.previousOp; + this.previousOp = this.currentOp; + this.currentOp = op; +}; + +OpLinkValidator.prototype.opWithUniqueVersion = function () { + return this._previousVersionWasUnique() ? this.previousOp : null; +}; + +OpLinkValidator.prototype.isAtEndOfList = function () { + // We ascribe a special meaning to a current op of null + // being that we're at the end of the list, because this + // is the value that the Mongo cursor will return when + // the cursor is exhausted + return this.currentOp === null; +}; + +OpLinkValidator.prototype._previousVersionWasUnique = function () { + const previousVersion = this._previousVersion(); + + return typeof previousVersion === 'number' + && previousVersion !== this._currentVersion() + && previousVersion !== this._oneBeforePreviousVersion(); +}; + +OpLinkValidator.prototype._currentVersion = function () { + return this.currentOp && this.currentOp.v; +}; + +OpLinkValidator.prototype._previousVersion = function () { + return this.previousOp && this.previousOp.v; +}; + +OpLinkValidator.prototype._oneBeforePreviousVersion = function () { + return this.oneBeforePreviousOp && this.oneBeforePreviousOp.v; +}; + +module.exports = OpLinkValidator; diff --git a/package.json b/package.json index a546abee..36c4c093 100644 --- a/package.json +++ b/package.json @@ -13,7 +13,8 @@ "expect.js": "^0.3.1", "istanbul": "^0.4.2", "mocha": "^2.3.3", - "sharedb-mingo-memory": "^1.0.2" + "sharedb-mingo-memory": "^1.0.2", + "sinon": "^6.1.5" }, "scripts": { "test": "node_modules/.bin/mocha", diff --git a/test/test_get_ops_without_strict_linking.js b/test/test_get_ops_without_strict_linking.js new file mode 100644 index 00000000..20a05376 --- /dev/null +++ b/test/test_get_ops_without_strict_linking.js @@ -0,0 +1,192 @@ +var expect = require('expect.js'); +var mongodb = require('mongodb'); +var ShareDbMongo = require('..'); +var getQuery = require('sharedb-mingo-memory/get-query'); +var sinon = require('sinon'); + +var mongoUrl = process.env.TEST_MONGO_URL || 'mongodb://localhost:27017/test'; + +function create(callback) { + var db = ShareDbMongo({ + mongo: function(shareDbCallback) { + mongodb.connect(mongoUrl, function(err, mongo) { + if (err) return callback(err); + mongo.dropDatabase(function(err) { + if (err) return callback(err); + shareDbCallback(null, mongo); + callback(null, db, mongo); + }); + }); + }, + getOpsWithoutStrictLinking: true, + }); +}; + +require('sharedb/test/db')({create: create, getQuery: getQuery}); + +describe('getOpsWithoutStrictLinking: true', function () { + beforeEach(function (done) { + var self = this; + create(function (err, db, mongo) { + if (err) return done(err); + self.db = db; + self.mongo = mongo; + done(); + }); + }); + + afterEach(function (done) { + this.db.close(done); + }); + + describe('a chain of ops', function () { + var db + var mongo; + var id; + var collection; + + beforeEach(function (done) { + db = this.db; + mongo = this.mongo; + id = 'document1'; + collection = 'testcollection'; + + sinon.spy(db, '_getOps'); + sinon.spy(db, '_getSnapshotOpLink'); + + var ops = [ + { v: 0, create: {} }, + { v: 1, p: ['foo'], oi: 'bar' }, + { v: 2, p: ['foo'], oi: 'baz' }, + { v: 3, p: ['foo'], oi: 'qux' } + ]; + + commitOpChain(db, mongo, collection, id, ops, done); + }); + + it('fetches ops 0-1 without fetching all ops', function (done) { + db.getOps(collection, id, 0, 2, null, function (error, ops) { + if (error) return done(error); + expect(ops.length).to.be(2); + expect(ops[0].v).to.be(0); + expect(ops[1].v).to.be(1); + expect(db._getSnapshotOpLink.notCalled).to.be(true); + expect(db._getOps.calledOnceWith(collection, id, 0, 2)).to.be(true); + done(); + }); + }); + + it('fetches ops 0-1 when v1 has a spurious duplicate', function (done) { + var spuriousOp = { v: 1, d: id, p: ['foo'], oi: 'corrupt', o: null }; + + callInSeries([ + function (next) { + mongo.collection('o_' + collection).insert(spuriousOp, next); + }, + function (result, next) { + db.getOps(collection, id, 0, 2, null, next); + }, + function (ops, next) { + expect(ops.length).to.be(2); + expect(ops[1].oi).to.be('bar'); + expect(db._getSnapshotOpLink.notCalled).to.be(true); + expect(db._getOps.calledOnceWith(collection, id, 0, 2)).to.be(true); + next(); + }, + done + ]); + }); + + it('fetches ops 0-1 when the next op v2 has a spurious duplicate', function (done) { + var spuriousOp = { v: 2, d: id, p: ['foo'], oi: 'corrupt', o: null }; + + callInSeries([ + function (next) { + mongo.collection('o_' + collection).insert(spuriousOp, next); + }, + function (result, next) { + db.getOps(collection, id, 0, 2, null, next); + }, + function (ops, next) { + expect(ops.length).to.be(2); + expect(ops[1].oi).to.be('bar'); + expect(db._getSnapshotOpLink.notCalled).to.be(true); + expect(db._getOps.calledOnceWith(collection, id, 0, 3)).to.be(true); + next(); + }, + done + ]); + }); + + it('fetches ops 0-1 when all the ops have spurious duplicates', function (done) { + var spuriousOps = [ + { v: 0, d: id, p: ['foo'], oi: 'corrupt', o: null }, + { v: 1, d: id, p: ['foo'], oi: 'corrupt', o: null }, + { v: 2, d: id, p: ['foo'], oi: 'corrupt', o: null }, + { v: 3, d: id, p: ['foo'], oi: 'corrupt', o: null }, + ]; + + callInSeries([ + function (next) { + mongo.collection('o_' + collection).insertMany(spuriousOps, next); + }, + function (result, next) { + db.getOps(collection, id, 0, 2, null, next); + }, + function (ops, next) { + expect(ops.length).to.be(2); + expect(ops[0].create).to.eql({}); + expect(ops[1].oi).to.be('bar'); + expect(db._getSnapshotOpLink.calledOnce).to.be(true); + next(); + }, + done, + ]) + }); + }); +}); + +function commitOpChain(db, mongo, collection, id, ops, previousOpId, version, callback) { + if (typeof previousOpId === 'function') { + callback = previousOpId; + previousOpId = undefined; + version = 0; + } + + ops = ops.slice(); + var op = ops.shift(); + + if (!op) { + return callback(); + } + + var snapshot = { id: id, v: version + 1, type: 'json0', data: {}, m: null, _opLink: previousOpId }; + db.commit(collection, id, op, snapshot, null, function (error) { + if (error) return callback(error); + mongo.collection('o_' + collection).find({ d: id, v: version }).next(function (error, op) { + if (error) return callback(error); + commitOpChain(db, mongo, collection, id, ops, op._id, ++version, callback); + }); + }); +} + +function callInSeries(callbacks, args) { + if (!callbacks.length) return; + args = args || []; + var error = args.shift(); + + if (error) { + var finalCallback = callbacks[callbacks.length - 1]; + return finalCallback(error); + } + + var callback = callbacks.shift(); + if (callbacks.length) { + args.push(function () { + var args = Array.from(arguments); + callInSeries(callbacks, args); + }); + } + + callback.apply(callback, args); +} diff --git a/test/test_op_link_validator.js b/test/test_op_link_validator.js new file mode 100644 index 00000000..bfc23af7 --- /dev/null +++ b/test/test_op_link_validator.js @@ -0,0 +1,159 @@ +var OpLinkValidator = require('../op-link-validator'); +var expect = require('expect.js'); + +describe('OpLinkValidator', function () { + it('starts with no unique op', function () { + var validator = new OpLinkValidator(); + var opWithUniqueVersion = validator.opWithUniqueVersion(); + expect(opWithUniqueVersion).to.be(null); + }); + + it('starts not at the end of the list', function () { + var validator = new OpLinkValidator(); + expect(validator.isAtEndOfList()).to.be(false); + }); + + it('has no unique op with just one op', function () { + var op = {v: 1}; + var validator = new OpLinkValidator(); + + validator.push(op); + var opWithUniqueVersion = validator.opWithUniqueVersion(); + + expect(opWithUniqueVersion).to.be(null); + }); + + it('has a unique op with just two different ops', function () { + var op1 = {v: 1}; + var op2 = {v: 2}; + var validator = new OpLinkValidator(); + + validator.push(op1); + validator.push(op2); + var opWithUniqueVersion = validator.opWithUniqueVersion(); + + expect(opWithUniqueVersion).to.be(op1); + }); + + it('does not have a uniquye op with just two identical ops', function () { + var op1 = { v: 1 }; + var op2 = { v: 1 }; + var validator = new OpLinkValidator(); + + validator.push(op1); + validator.push(op2); + var opWithUniqueVersion = validator.opWithUniqueVersion(); + + expect(opWithUniqueVersion).to.be(null); + }); + + it('has a unique op with three ops with different versions', function () { + var op1 = {v: 1}; + var op2 = {v: 2}; + var op3 = {v: 3}; + var validator = new OpLinkValidator(); + + validator.push(op1); + validator.push(op2); + validator.push(op3); + var opWithUniqueVersion = validator.opWithUniqueVersion(); + + expect(opWithUniqueVersion).to.be(op2); + }); + + it('is not at the end of the list with three ops', function () { + var op1 = { v: 1 }; + var op2 = { v: 2 }; + var op3 = { v: 3 }; + var validator = new OpLinkValidator(); + + validator.push(op1); + validator.push(op2); + validator.push(op3); + + expect(validator.isAtEndOfList()).to.be(false); + }); + + it('does not have a unique op with three ops with the same version', function () { + var op = { v: 1 }; + var validator = new OpLinkValidator(); + + validator.push(op); + validator.push(op); + validator.push(op); + var opWithUniqueVersion = validator.opWithUniqueVersion(); + + expect(opWithUniqueVersion).to.be(null); + }); + + it('does not have a unique op if the first two ops are the same', function () { + var op1 = { v: 1 }; + var op2 = { v: 1 }; + var op3 = { v: 2 }; + var validator = new OpLinkValidator(); + + validator.push(op1); + validator.push(op2); + validator.push(op3); + var opWithUniqueVersion = validator.opWithUniqueVersion(); + + expect(opWithUniqueVersion).to.be(null); + }); + + it('does not have a unique op if the last two ops are the same', function () { + var op1 = { v: 1 }; + var op2 = { v: 2 }; + var op3 = { v: 2 }; + var validator = new OpLinkValidator(); + + validator.push(op1); + validator.push(op2); + validator.push(op3); + var opWithUniqueVersion = validator.opWithUniqueVersion(); + + expect(opWithUniqueVersion).to.be(null); + }); + + it('has a unique op in a long chain', function () { + var op1 = { v: 1 }; + var op2 = { v: 1 }; + var op3 = { v: 1 }; + var op4 = { v: 2 }; + var op5 = { v: 2 }; + var op6 = { v: 3 }; + var op7 = { v: 4 }; + var validator = new OpLinkValidator(); + + validator.push(op1); + validator.push(op2); + validator.push(op3); + validator.push(op4); + validator.push(op5); + validator.push(op6); + validator.push(op7); + var opWithUniqueVersion = validator.opWithUniqueVersion(); + + expect(opWithUniqueVersion).to.be(op6); + }); + + it('has a unique op with two ops and a current op of null', function () { + var op1 = { v: 1 }; + var op2 = { v: 2 }; + var op3 = null; + var validator = new OpLinkValidator(); + + validator.push(op1); + validator.push(op2); + validator.push(op3); + var opWithUniqueVersion = validator.opWithUniqueVersion(); + + expect(opWithUniqueVersion).to.be(op2); + }); + + it('is at the end of the list with a current op of null', function () { + var op = null; + var validator = new OpLinkValidator(); + validator.push(op); + expect(validator.isAtEndOfList()).to.be(true); + }); +});