Skip to content

Commit

Permalink
Merge 2cb16ee into cc0e338
Browse files Browse the repository at this point in the history
  • Loading branch information
maxfortun committed Jan 31, 2023
2 parents cc0e338 + 2cb16ee commit 25ee7c9
Show file tree
Hide file tree
Showing 5 changed files with 235 additions and 8 deletions.
10 changes: 10 additions & 0 deletions docs/middleware/op-submission.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ backend.use('submit', (context, next) => {
if (!userCanChangeDoc(userId, id)) {
return next(new Error('Unauthorized'))
}

// add custom metadata to the op
Object.assign(context.op.m, context.agent.custom);
// Explicitly specify which metadata fields to be included when storing the op
context.opMetadataProjection = { userId: true };

next()
})
```
Expand All @@ -61,6 +67,10 @@ backend.use('apply', (context, next) => {
if (userId !== ownerId) {
return next(new Error('Unauthorized'))
}

// Add op metadata to snapshot before snapshot is stored
Object.assign(context.snapshot.m, context.op.m);

next()
})
```
Expand Down
1 change: 1 addition & 0 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ Agent.prototype._sendOp = function(collection, id, op) {
if ('op' in op) message.op = op.op;
if (op.create) message.create = op.create;
if (op.del) message.del = true;
if (op.m) message.m = op.m;

this.send(message);
};
Expand Down
12 changes: 5 additions & 7 deletions lib/client/doc.js
Original file line number Diff line number Diff line change
Expand Up @@ -600,9 +600,7 @@ Doc.prototype._otApply = function(op, source) {
);
}

// NB: If we need to add another argument to this event, we should consider
// the fact that the 'op' event has op.src as its 3rd argument
this.emit('before op batch', op.op, source);
this.emit('before op batch', op.op, source, op.src, {op: op});

// Iteratively apply multi-component remote operations and rollback ops
// (source === false) for the default JSON0 OT type. It could use
Expand Down Expand Up @@ -637,24 +635,24 @@ Doc.prototype._otApply = function(op, source) {
this._setData(this.type.apply(this.data, componentOp.op));
this.emit('op', componentOp.op, source, op.src);
}
this.emit('op batch', op.op, source);
this.emit('op batch', op.op, source, op.src, {op: op});
// Pop whatever was submitted since we started applying this op
this._popApplyStack(stackLength);
return;
}

// The 'before op' event enables clients to pull any necessary data out of
// the snapshot before it gets changed
this.emit('before op', op.op, source, op.src);
this.emit('before op', op.op, source, op.src, {op: op});
// Apply the operation to the local data, mutating it in place
this._setData(this.type.apply(this.data, op.op));
// Emit an 'op' event once the local data includes the changes from the
// op. For locally submitted ops, this will be synchronously with
// submission and before the server or other clients have received the op.
// For ops from other clients, this will be after the op has been
// committed to the database and published
this.emit('op', op.op, source, op.src);
this.emit('op batch', op.op, source);
this.emit('op', op.op, source, op.src, {op: op});
this.emit('op batch', op.op, source, op.src, {op: op});
return;
}

Expand Down
34 changes: 33 additions & 1 deletion lib/submit-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ SubmitRequest.prototype.commit = function(callback) {
var op = request.op;
op.c = request.collection;
op.d = request.id;
op.m = undefined;
op.m = request._metadataProjection();
// Needed for agent to detect if it can ignore sending the op back to
// the client that submitted it in subscriptions
if (request.collection !== request.index) op.i = request.index;
Expand All @@ -185,6 +185,38 @@ SubmitRequest.prototype.commit = function(callback) {
});
};

SubmitRequest.prototype._metadataProjection = function() {
var request = this;

// Default behavior
if (!request.opMetadataProjection) {
return undefined;
}

// Granular projection
if (typeof request.opMetadataProjection === 'object') {
return request._granularMetadataProjection();
}

// Full projection
return request.op.m;
};

// Specify top level fields to beincluded in a granular projection
SubmitRequest.prototype._granularMetadataProjection = function() {
var request = this;
var metadataProjection = {};
for (var key in request.opMetadataProjection) {
var doProject = request.opMetadataProjection[key];
if (doProject ) {
metadataProjection[key] = request.op.m[key];
}
}

return metadataProjection;
};


SubmitRequest.prototype.retry = function(callback) {
this.retries++;
if (this.maxRetries != null && this.retries > this.maxRetries) {
Expand Down
186 changes: 186 additions & 0 deletions test/client/submit.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ var numberType = require('./number-type');
types.register(deserializedType.type);
types.register(deserializedType.type2);
types.register(numberType.type);
var util = require('../util');
var errorHandler = util.errorHandler;

module.exports = function() {
describe('client submit', function() {
Expand Down Expand Up @@ -1210,5 +1212,189 @@ module.exports = function() {
});
});
});

describe('metadata projection', function() {
it('passed metadata to connect', function(done) {
var metadata = {username: 'user'};

this.backend.use('connect', function(request, next) {
Object.assign(request.agent.custom, request.req);
next();
});

var connection = this.backend.connect(undefined, metadata);
connection.on('connected', function() {
expect(connection.agent.custom).eql(metadata);
done();
});
});

it('passed metadata to submit', function(done) {
var metadata = {username: 'user'};

this.backend.use('connect', function(request, next) {
Object.assign(request.agent.custom, request.req);
next();
});

this.backend.use('submit', function(request) {
expect(request.agent.custom).eql(metadata);
done();
});

var connection = this.backend.connect(undefined, metadata);
var doc = null;
connection.on('connected', function() {
expect(connection.agent.custom).eql(metadata);
doc = connection.get('dogs', 'fido');
doc.create({name: 'fido'}, function() {
doc.submitOp([{p: ['tricks'], oi: ['fetch']}], {source: 'trainer'}, errorHandler(done));
});
});
});

it('received local op without metadata', function(done) {
var metadata = {username: 'user'};

this.backend.use('connect', function(request, next) {
Object.assign(request.agent.custom, request.req);
next();
});

this.backend.use('submit', function(request, next) {
expect(request.agent.custom).eql(metadata);
Object.assign(request.op.m, request.agent.custom);
request.opMetadataProjection = {username: true};
next();
});

var connection = this.backend.connect(undefined, metadata);
var doc = null;
connection.on('connected', function() {
expect(connection.agent.custom).eql(metadata);
doc = connection.get('dogs', 'fido');
doc.create({name: 'fido'}, function() {
doc.on('op', function(op, source, src, context) {
if (src) {
return;
}
expect(context.op.m).equal(undefined);
done();
});
doc.submitOp([{p: ['tricks'], oi: ['fetch']}], {source: 'trainer'}, errorHandler(function() {}));
});
});
});

it('concurrent changes', function(done) {
this.backend.use('connect', function(request, next) {
expect(request.req).to.have.property('username');
Object.assign(request.agent.custom, request.req);
next();
});

this.backend.use('submit', function(request, next) {
expect(request.agent.custom).to.have.property('username');
Object.assign(request.op.m, request.agent.custom);
request.opMetadataProjection = {username: true};
next();
});

this.backend.use('apply', function(request, next) {
Object.assign(request.snapshot.m, request.op.m);
expect(request.op.m).to.have.property('username');
next();
});

this.backend.use('commit', function(request, next) {
expect(request.op.m).to.have.property('username');
next();
});

this.backend.use('afterWrite', function(request, next) {
expect(request.op.m).to.have.property('username');
next();
});

var subscriberCount = 10;
var subscriberOpCount = 10;

var metadatas = [];
for (var i = 0; i < subscriberCount; i++) {
metadatas[i] = {username: 'user-'+i};
}

var ops = [];
for (var i = 0; i < subscriberCount; i++) {
ops[i] = [];
for (var j = 0; j < subscriberOpCount; j++) {
ops[i].push({p: ['tricks '+i+' '+j], oi: 1});
}
}

var docs = [];

function submitOps() {
for (var j = 0; j < subscriberOpCount; j++) {
for (var i = 0; i < subscriberCount; i++) {
var doc = docs[i];
doc.submitOp([ops[i][j]], {source: 'src-'+i}, errorHandler(doneAfter));
}
}
}

function validateAndDone() {
var firstDoc = docs[0];
// validate that all documents across connections are in sync
for (var i = 1; i < subscriberCount; i++) {
var doc = docs[i];
expect(doc.data).eql(firstDoc.data);
}
done();
};

var submitOpsAfter = util.callAfter(subscriberCount - 1, submitOps);
var doneAfter = util.callAfter((subscriberCount * subscriberCount * subscriberOpCount) - 1, validateAndDone);

function getDoc(callback) {
var thisDoc = this;
thisDoc.fetch(function() {
if (!thisDoc.data) {
return thisDoc.create({}, function() {
thisDoc.subscribe(callback);
});
}
thisDoc.subscribe(callback);
});
}

for (var i = 0; i < subscriberCount; i++) {
var metadata = metadatas[i];

var connection = this.backend.connect(undefined, Object.assign({}, metadata));
connection.__test_metadata = Object.assign({}, metadata);
connection.__test_id = i;

connection.on('connected', function() {
var thisConnection = this;

expect(thisConnection.agent.custom).eql(thisConnection.__test_metadata);

thisConnection.doc = docs[thisConnection.__test_id] = thisConnection.get('dogs', 'fido');

thisConnection.doc.on('op', function(op, source, src, context) {
if (!src || !context) { // If I am the source there is no metadata to check
return doneAfter();
}
var id = op[0].p[0].split(' ')[1];
expect(context.op.m).eql(metadatas[id]);
doneAfter();
});

getDoc.bind(thisConnection.doc)(submitOpsAfter);
});
}
});
});
});
};

0 comments on commit 25ee7c9

Please sign in to comment.