Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Held ops #384

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
96 changes: 79 additions & 17 deletions lib/client/doc.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ function Doc(connection, collection, id) {
// This is a list of {[create:{...}], [del:true], [op:...], callbacks:[...]}
this.pendingOps = [];

this._heldOpId = 1;
this._heldOps = {};
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that the other props on this class aren't prefixed, but I'd like to keep things "private" where possible


// The OT type of this document. An uncreated document has type `null`
this.type = null;

Expand Down Expand Up @@ -342,13 +345,15 @@ Doc.prototype._handleOp = function(err, message) {
return;
}

if (this.inflightOp) {
var transformErr = transformX(this.inflightOp, message);
if (transformErr) return this._hardRollback(transformErr);
var opsToTransform = [];
if (this.inflightOp) opsToTransform.push(this.inflightOp);
opsToTransform = opsToTransform.concat(this.pendingOps);
for (var id in this._heldOps) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where my understanding of OT starts to get pushed a bit — does the transformation ordering matter here? I'd have normally assumed no, but I think transformX mutates the server op, so order does matter (I confirmed this trivially by moving the inflight op to after pending, and a couple of tests broke). I just don't know how (because held ops are temporally separate from pending ops). It could be that it's only important for create and del?

opsToTransform.push(this._heldOps[id]);
}

for (var i = 0; i < this.pendingOps.length; i++) {
var transformErr = transformX(this.pendingOps[i], message);
for (var i = 0; i < opsToTransform.length; i++) {
var transformErr = transformX(opsToTransform[i], message);
if (transformErr) return this._hardRollback(transformErr);
}

Expand Down Expand Up @@ -712,18 +717,9 @@ Doc.prototype._submit = function(op, source, callback) {
// Locally submitted ops must always have a truthy source
if (!source) source = true;

// The op contains either op, create, delete, or none of the above (a no-op).
if ('op' in op) {
if (!this.type) {
var err = new ShareDBError(
ERROR_CODE.ERR_DOC_DOES_NOT_EXIST,
'Cannot submit op. Document has not been created. ' + this.collection + '.' + this.id
);
if (callback) return callback(err);
return this.emit('error', err);
}
// Try to normalize the op. This removes trailing skip:0's and things like that.
if (this.type.normalize) op.op = this.type.normalize(op.op);
var error = this._normalizeOp(op);
if (error) return this._callbackOrEmit(error, callback);
}

try {
Expand All @@ -741,6 +737,19 @@ Doc.prototype._submit = function(op, source, callback) {
});
};

// Try to normalize the op. This removes trailing skip:0's and things like that.
Doc.prototype._normalizeOp = function(op) {
if (!this.type) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't really "normalising", but it's common code. Open to naming improvements on the method.

var err = new ShareDBError(
ERROR_CODE.ERR_DOC_DOES_NOT_EXIST,
'Cannot submit op. Document has not been created. ' + this.collection + '.' + this.id
);
return err;
}

if (this.type.normalize) op.op = this.type.normalize(op.op);
};

Doc.prototype._pushOp = function(op, callback) {
if (this.applyStack) {
// If we are in the process of incrementally applying an operation, don't
Expand All @@ -760,6 +769,12 @@ Doc.prototype._pushOp = function(op, callback) {
op.type = this.type;
op.callbacks = [callback];
this.pendingOps.push(op);

if (op.del) this._heldOps = {};
for (var id in this._heldOps) {
var transformErr = transformX(op, this._heldOps[id]);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, my understanding of transformX isn't great. It's going to mutate op. But we're at the end of the function, so I assume that's okay?

if (transformErr) return this._hardRollback(transformErr);
}
};

Doc.prototype._popApplyStack = function(to) {
Expand Down Expand Up @@ -828,7 +843,8 @@ Doc.prototype.submitOp = function(component, options, callback) {
options = null;
}
var op = {op: component};
var source = options && options.source;
options = this._initOpSubmitOptions(options);
var source = options.source;
this._submit(op, source, callback);
};

Expand Down Expand Up @@ -885,6 +901,40 @@ Doc.prototype.del = function(options, callback) {
this._submit(op, source, callback);
};

Doc.prototype.submitHeldOp = function(component, options) {
var op = {
op: component,
type: this.type
};
options = this._initOpSubmitOptions(options);
var source = options.source;

var error = this._normalizeOp(op);
if (error) return this._callbackOrEmit(error, callback);

var heldOpId = this._heldOpId++;
this._heldOps[heldOpId] = op;

try {
this._otApply(op, source);
} catch (error) {
return this._hardRollback(error);
}

var doc = this;
return function(callback) {
var heldOp = doc._heldOps[heldOpId];
if (!heldOp || !heldOp.op) return callback();
delete doc._heldOps[heldOpId];
doc._pushOp(heldOp, callback);

// The call to flush is delayed so if multiple submit callbacks are called
// synchronously, all the ops are combined before being sent to the server.
process.nextTick(function() {
doc.flush();
});
};
};

// Stops the document from sending any operations to the server.
Doc.prototype.pause = function() {
Expand Down Expand Up @@ -968,6 +1018,7 @@ Doc.prototype._hardRollback = function(err) {
this.version = null;
this.inflightOp = null;
this.pendingOps = [];
this._heldOps = {};

// Fetch the latest version from the server to get us back into a working state
var doc = this;
Expand Down Expand Up @@ -997,3 +1048,14 @@ Doc.prototype._clearInflightOp = function(err) {

if (err && !called) return this.emit('error', err);
};

Doc.prototype._initOpSubmitOptions = function(options) {
options = options || {};
options.source = options.source || true;
return options;
};

Doc.prototype._callbackOrEmit = function(error, callback) {
if (callback) return callback(error);
if (error) this.emit(error);
};
150 changes: 150 additions & 0 deletions test/client/doc.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
var Backend = require('../../lib/backend');
var expect = require('chai').expect;
var async = require('async');
var sinon = require('sinon');
var util = require('../util');
var errorHandler = util.errorHandler;

describe('Doc', function() {
beforeEach(function() {
Expand Down Expand Up @@ -418,4 +421,151 @@ describe('Doc', function() {
], done);
});
});

describe('held ops', function() {
var connection1;
var connection2;
var doc1;
var doc2;

beforeEach(function(done) {
connection1 = this.backend.connect();
connection2 = this.backend.connect();
doc1 = connection1.get('books', '1984');
doc2 = connection2.get('books', '1984');

async.series([
doc1.subscribe.bind(doc1),
doc1.create.bind(doc1, {title: 'Nineteen Eightyfour'}),
doc2.subscribe.bind(doc2)
], done);
});

it('applies a held op locally', function() {
doc1.submitHeldOp({p: ['author'], oi: 'George Orwell'});
expect(doc1.data.author).to.equal('George Orwell');
});

it('does not send the op to a remote client', function(done) {
sinon.spy(connection1, 'send');
doc2.on('op', function() {
done(new Error('Should not have received op'));
});
doc1.submitHeldOp({p: ['author'], oi: 'George Orwell'});
expect(connection1.send.called).to.be.false;
done();
});

it('flushes a held op to a remote client', function(done) {
var flush = doc1.submitHeldOp({p: ['author'], oi: 'George Orwell'});
flush(errorHandler(done));
doc2.on('op', function() {
expect(doc2.data.author).to.equal('George Orwell');
done();
});
});

it('transforms held ops by remote ops', function(done) {
var flush = doc1.submitHeldOp({p: ['title', 19], si: ': A Novel'});
async.series([
doc2.submitOp.bind(doc2, {p: ['title', 15], si: '-'}),
doc1.fetch.bind(doc1),
flush,
doc2.fetch.bind(doc2),
function(next) {
expect(doc1.data.title).to.equal('Nineteen Eighty-four: A Novel');
expect(doc2.data.title).to.equal(doc1.data.title);
next();
}
], done);
});

it('transforms held ops by local ops synchronously', function(done) {
var flush = doc1.submitHeldOp({p: ['title', 19], si: ': A Novel'});
doc1.submitOp({p: ['title', 15], si: '-'}, errorHandler(done));
flush(errorHandler(done));
var count = 0;
doc2.on('op', function() {
if (++count < 2) return;
expect(doc2.data.title).to.equal('Nineteen Eighty-four: A Novel');
done();
});
});

it('agrees with a remote client about a conflicting local insert', function(done) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also add a test case where the submit op has an index higher than the held op. This currently fails, because the submitted op should be transformed against held ops when being submitted, but they aren't.

var flush = doc1.submitHeldOp({p: ['title', 0], si: '1'});
doc1.submitOp({p: ['title', 0], si: '2'}, errorHandler(done));
flush(errorHandler(done));
doc2.on('op', function() {
expect(doc1.data.title).to.equal('21Nineteen Eightyfour');
expect(doc2.data.title).to.equal(doc1.data.title);
done();
});
});

it('submits other ops whilst holding ops', function(done) {
doc1.submitHeldOp({p: ['author'], oi: 'George Orwell'});
async.series([
doc1.submitOp.bind(doc1, {p: ['title', 19], si: ': A Novel'}),
doc2.fetch.bind(doc2),
function(next) {
expect(doc1.data).to.eql({
title: 'Nineteen Eightyfour: A Novel',
author: 'George Orwell'
});
expect(doc2.data).to.eql({
title: 'Nineteen Eightyfour: A Novel'
});
next();
}
], done);
});

it('holds multiple ops and flushes them one at a time', function(done) {
var flush1 = doc1.submitHeldOp({p: ['author'], oi: 'George Orwell'});
var flush2 = doc1.submitHeldOp({p: ['title', 19], si: ': A Novel'});

async.series([
flush2,
doc2.fetch.bind(doc2),
function(next) {
expect(doc2.data).to.eql({
title: 'Nineteen Eightyfour: A Novel'
});
next();
},
flush1,
doc2.fetch.bind(doc2),
function(next) {
expect(doc2.data).to.eql({
title: 'Nineteen Eightyfour: A Novel',
author: 'George Orwell'
});
next();
}
], done);
});

it('does not apply the flush function twice', function(done) {
var flush = doc1.submitHeldOp({p: ['title', 0], si: 'foo '});
async.series([
flush,
flush,
function(next) {
expect(doc1.data.title).to.equal('foo Nineteen Eightyfour');
next();
}
], done);
});

it('clears held ops when a document is deleted', function(done) {
var flush = doc1.submitHeldOp({p: ['author'], oi: 'George Orwell'});
doc1.del(errorHandler(done));
flush(function(error) {
if (error) return done(error);
expect(doc1.data).to.be.undefined;
done();
});
});
});
});