Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@
*.DS_Store
node_modules
coverage

# IDEs
.vscode
137 changes: 137 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,143 @@ For a full list of supported collection and cursor methods, see
`collectionOperationsMap`, `cursorTransformsMap` and
`cursorOperationsMap` in index.js

## `getOps` without strict linking

There is a `getOpsWithoutStrictLinking` flag, which can be set to
`true` to speed up `getOps` under certain circumstances, but with
potential risks to the integrity of the results. Read below for
more detail.

### Introduction

ShareDB has to deal with concurrency issues. In particular, here we
discuss the issue of submitting multiple competing ops against a
version of a document.

For example, if I have a version of a document at v1, and I
simultaneously submit two ops (from different servers, say) against
this snapshot, then we need to handle the fact that only one of
these ops can be accepted as canonical and applied to the snapshot.

This issue is dealt with through **optimistic locking**. Even if
you are only asking for a subset of the ops, under the default
behaviour, `getOps` will fetch **all** the ops up to the current
version.

### Optimistic locking and linked ops

`sharedb-mongo` deals with its concurrency issue with multiple op
submissions with optimistic locking. Here's an example of its
behaviour:

- my doc exists at v1
- two simultaneous v1 ops are submitted to ShareDB
- both ops are committed to the database
- one op is applied to the snapshot, and the updated snapshot is
written to the database
- the second op finds that its updated snapshot conflicts with
the committed snapshot, and the snapshot is rejected, but the
committed op **remains in the database**

In reality, `sharedb-mongo` attempts to clean up this failed op,
but there's still the small chance that the server crashes
before it can do so, meaning that we may have multiple ops
lingering in the database with the same version.

Because some non-canonical ops may exist in the database, we
cannot just perform a naive fetch of all the ops associated with
a document, because it may return multiple ops with the same
version (where one was successfully applied, and one was not).

In order to return a valid set of canonical ops, the optimistic
locking has a notion of **linked ops**. That is, each op will
point back to the op that it built on top of, and ultimately
the current snapshot points to the op that committed it to the
database.

Because of this, we can work backwards from the current snapshot,
following the trail of op links all the way back to get a chain
of canonical, valid, linked ops. This way, even if a spurious
op exists in the database, no other op will point to it, and it
will be correctly ignored.

This approach has a big down-side: it forces us to fetch all the
ops up to the current version. This might be fine if you want
all ops, or are fetching very recent ops, but can have a large
impact on performance if you only want ops 1-10 of a 10,000
op document, because you actually have to fetch all the ops.

### Dropping strict linking

In order to speed up the performance of `getOps`, you can set
`getOpsWithoutStrictLinking: true`. This will attempt to fetch
the bare minimum ops, whilst still trying to maintain op
integrity.

The assumption that underpins this approach is that any op
that exists with a unique combination of `d` (document ID)
and `v` (version), **is a valid op**. In other words, it
had no conflicts and can be considered canonical.

Consider a document with some ops, including some spurious,
failed ops:

- v1: unique
- v2: unique
- v3: collision 3
- v3: collision 3
- v4: collision 4
- v4: collision 4
- v5: unique
- v6: unique
...
- v1000: unique

If I want to fetch ops v1-v3, then we:

- look up v4
- find that v4 is not unique
- look up v5
- see that v5 is unique and therefore assumed valid
- look backwards from v5 for a chain of valid ops, avoiding
the spurious commits for v4 and v3.
- This way we don't need to fetch all the ops from v5 to the
current version.

In the case where a valid op cannot be determined, we still
fall back to fetching all ops and working backwards from the
current version.

### Limitations

#### Integrity

Attempting to infer a canonical op can be dangerous compared
to simply following the valid op chain from the snapshot,
which is - by definition - canonical.

This alternative behaviour should be safe, but should be used
with caution, because we are attempting to _infer_ a canonical
op, which may have unforeseen corner cases that return an
**invalid set of ops**.

This may be especially true if the ops are modified outside
of `sharedb-mongo` (eg by setting a TTL, or manually updating
them).

#### Recent ops

There are cases where this flag may slow down behaviour. In
the case of attempting to fetch very recent ops, setting this
flag may make extra database round-trips where fetching the
snapshot would have been faster.

#### `getOpsBulk` and `getOpsToSnapshot`

This flag **only** applies to `getOps`, and **not** to the
similar `getOpsBulk` and `getOpsToSnapshot` methods, whose
performance will remain unchanged.

## Error codes

Mongo errors are passed back directly. Additional error codes:
Expand Down
117 changes: 104 additions & 13 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
var async = require('async');
var mongodb = require('mongodb');
var DB = require('sharedb').DB;
var OpLinkValidator = require('./op-link-validator');

module.exports = ShareDbMongo;

Expand Down Expand Up @@ -44,6 +45,12 @@ function ShareDbMongo(mongo, options) {
// data in the mongo database.
this.allowAggregateQueries = options.allowAllQueries || options.allowAggregateQueries || false;

// Setting this flag to true will attempt to infer a canonical op link for
// getOps rather than using the snapshot as the op link. This allows us to
// not fetch all ops to present when asking only for a subset.
// For more details on this, see the README.
this.getOpsWithoutStrictLinking = options.getOpsWithoutStrictLinking || false;

// Track whether the close method has been called
this.closed = false;

Expand Down Expand Up @@ -339,7 +346,8 @@ ShareDbMongo.prototype.getOpsToSnapshot = function(collectionName, id, from, sna
var err = ShareDbMongo.missingLastOperationError(collectionName, id);
return callback(err);
}
this._getOps(collectionName, id, from, options, function(err, ops) {
var to = null;
this._getOps(collectionName, id, from, to, options, function(err, ops) {
if (err) return callback(err);
var filtered = getLinkedOps(ops, null, snapshot._opLink);
var err = checkOpsFrom(collectionName, id, filtered, from);
Expand All @@ -350,18 +358,22 @@ ShareDbMongo.prototype.getOpsToSnapshot = function(collectionName, id, from, sna

ShareDbMongo.prototype.getOps = function(collectionName, id, from, to, options, callback) {
var self = this;
this._getSnapshotOpLink(collectionName, id, function(err, doc) {
this._getOpLink(collectionName, id, to, function(err, opLink) {
if (err) return callback(err);
if (doc) {
if (isCurrentVersion(doc, from)) {
// We need to fetch slightly more ops than requested in order to work backwards along
// linked ops to provide only valid ops
var fetchOpsTo = null;
if (opLink) {
if (isCurrentVersion(opLink, from)) {
return callback(null, []);
}
var err = doc && checkDocHasOp(collectionName, id, doc);
var err = opLink && checkDocHasOp(collectionName, id, opLink);
if (err) return callback(err);
if (self.getOpsWithoutStrictLinking) fetchOpsTo = opLink._v;
}
self._getOps(collectionName, id, from, options, function(err, ops) {
self._getOps(collectionName, id, from, fetchOpsTo, options, function(err, ops) {
if (err) return callback(err);
var filtered = filterOps(ops, doc, to);
var filtered = filterOps(ops, opLink, to);
var err = checkOpsFrom(collectionName, id, filtered, from);
if (err) return callback(err);
callback(null, filtered);
Expand Down Expand Up @@ -540,16 +552,24 @@ function getLinkedOps(ops, to, link) {
return linkedOps.reverse();
}

function getOpsQuery(id, from) {
return (from == null) ?
{d: id} :
{d: id, v: {$gte: from}};
function getOpsQuery(id, from, to) {
from = from == null ? 0 : from;
var query = {
d: id,
v: { $gte: from }
};

if (to != null) {
query.v.$lt = to;
}

return query;
}

ShareDbMongo.prototype._getOps = function(collectionName, id, from, options, callback) {
ShareDbMongo.prototype._getOps = function(collectionName, id, from, to, options, callback) {
this.getOpCollection(collectionName, function(err, opCollection) {
if (err) return callback(err);
var query = getOpsQuery(id, from);
var query = getOpsQuery(id, from, to);
// Exclude the `d` field, which is only for use internal to livedb-mongo.
// Also exclude the `m` field, which can be used to store metadata on ops
// for tracking purposes
Expand Down Expand Up @@ -600,6 +620,77 @@ function readOpsBulk(stream, callback) {
});
}

ShareDbMongo.prototype._getOpLink = function(collectionName, id, to, callback) {
if (!this.getOpsWithoutStrictLinking) return this._getSnapshotOpLink(collectionName, id, callback);

var db = this;
this.getOpCollection(collectionName, function (error, collection) {
if (error) return callback(error);

// If to is null, we want the most recent version, so just return the
// snapshot link, which is more efficient than cursoring
if (to == null) {
return db._getSnapshotOpLink(collectionName, id, callback);
}

var query = {
d: id,
v: { $gte: to }
};

var projection = {
_id: 0,
v: 1,
o: 1
};

var cursor = collection.find(query).sort({ v: 1 }).project(projection);

getFirstOpWithUniqueVersion(cursor, null, function (error, op) {
if (error) return callback(error);
if (op) return callback(null, { _o: op.o, _v: op.v });

// If we couldn't find an op to link back from, then fall back to using the current
// snapshot, which is guaranteed to have a link to a valid op.
db._getSnapshotOpLink(collectionName, id, callback);
});
});
};

// When getting ops, we need to consider the case where an op is committed to the database,
// but its application to the snapshot is subsequently rejected. This can leave multiple ops
// with the same values for 'd' and 'v', and means that we may return multiple ops for a single
// version if we just perform a naive 'find' operation.
// To avoid this, we try to fetch the first op from 'to' which has a unique 'v', and then we
// work backwards from that op using the linked op 'o' field to get a valid chain of ops.
// See the README for more details.
function getFirstOpWithUniqueVersion(cursor, opLinkValidator, callback) {
opLinkValidator = opLinkValidator || new OpLinkValidator();

var opWithUniqueVersion = opLinkValidator.opWithUniqueVersion();

if (opWithUniqueVersion || opLinkValidator.isAtEndOfList()) {
var error = null;
return closeCursor(cursor, callback, error, opWithUniqueVersion);
}

cursor.next(function (error, op) {
if (error) {
return closeCursor(cursor, callback, error);
}

opLinkValidator.push(op);
getFirstOpWithUniqueVersion(cursor, opLinkValidator, callback);
});
}

function closeCursor(cursor, callback, error, returnValue) {
cursor.close(function (closeError) {
error = error || closeError;
callback(error, returnValue);
});
}

ShareDbMongo.prototype._getSnapshotOpLink = function(collectionName, id, callback) {
this.getCollection(collectionName, function(err, collection) {
if (err) return callback(err);
Expand Down
78 changes: 78 additions & 0 deletions op-link-validator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/**
* This is a class for determining an op with a unique version number
* when presented with an **ordered** series of ops.
*
* For example, consider the following chain of op versions:
* 1 -> 1 -> 2 -> 2 -> 3 -> 4
* If we want to find the first unique version, we must consider a
* window of three versions. For example, if we consider the first
* three versions:
* 1 -> 1 -> 2
* Then we know that 1 is not unique. We don't know if 2 is unique
* yet, because we don't know what comes next. Therefore we push
* one more version and check again:
* 1 -> 2 -> 2
* Again we now see that 2 is not unique, so we keep pushing ops
* until we reach the final window:
* 2 -> 3 -> 4
* From here, **assuming the ops are well ordered** we can safely
* see that v3 is unique. We cannot make the same assumption of
* v4, because we don't know what comes next.
*
* Note that we also assume that the chain starts with **all**
* of the copies of an op version. That is that if we are provided
* 1 -> 2
* Then v1 is unique (because there are no other v1s).
*
* Similarly, if a null op is pushed into the class, it is assumed
* to be the end of the chain, and hence a unique version can be
* inferred, eg with this chain:
* 5 -> 6 -> null
* We say that 6 is unique, because we've reached the end of the
* list
*/
function OpLinkValidator() {
this.currentOp = undefined;
this.previousOp = undefined;
this.oneBeforePreviousOp = undefined;
}

OpLinkValidator.prototype.push = function (op) {
this.oneBeforePreviousOp = this.previousOp;
this.previousOp = this.currentOp;
this.currentOp = op;
};

OpLinkValidator.prototype.opWithUniqueVersion = function () {
return this._previousVersionWasUnique() ? this.previousOp : null;
};

OpLinkValidator.prototype.isAtEndOfList = function () {
// We ascribe a special meaning to a current op of null
// being that we're at the end of the list, because this
// is the value that the Mongo cursor will return when
// the cursor is exhausted
return this.currentOp === null;
};

OpLinkValidator.prototype._previousVersionWasUnique = function () {
const previousVersion = this._previousVersion();

return typeof previousVersion === 'number'
&& previousVersion !== this._currentVersion()
&& previousVersion !== this._oneBeforePreviousVersion();
};

OpLinkValidator.prototype._currentVersion = function () {
return this.currentOp && this.currentOp.v;
};

OpLinkValidator.prototype._previousVersion = function () {
return this.previousOp && this.previousOp.v;
};

OpLinkValidator.prototype._oneBeforePreviousVersion = function () {
return this.oneBeforePreviousOp && this.oneBeforePreviousOp.v;
};

module.exports = OpLinkValidator;
Loading