Skip to content

Commit

Permalink
readSnapshots rejection code review updates
Browse files Browse the repository at this point in the history
  • Loading branch information
ericyhwang committed Dec 13, 2019
1 parent 5bceb2d commit 3fe83f3
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 85 deletions.
16 changes: 7 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -602,26 +602,24 @@ An `Agent` is the representation of a client's `Connection` state on the server.
The `Agent` will be made available in all [middleware](#middlewares) requests. The `agent.custom` field is an object that can be used for storing arbitrary information for use in middleware. For example:

```javascript
backend.useMiddleware('connect', function (request, callback) {
backend.useMiddleware('connect', (request, callback) => {
// Best practice to clone to prevent mutating the object after connection.
// You may also want to consider a deep clone, depending on the shape of request.req.
Object.assign(request.agent.custom, request.req);
callback();
});

backend.useMiddleware('readSnapshots', function (request, callback) {
var connectionInfo = request.agent.custom;
var snapshots = request.snapshots;
backend.useMiddleware('readSnapshots', (request, callback) => {
const connectionInfo = request.agent.custom;
const snapshots = request.snapshots;

// Use the information provided at connection to determine if a user can access the snapshots.
// This should also be checked when fetching and submitting ops.
if (!userCanAccessCollection(connectionInfo, request.collection)) {
return callback(new Error('Not allowed to access collection ' + request.collection));
}
// Check each snapshot individually.
// In ES6, this could be `for (const snapshot of snapshots)`
for (var i = 0; i < snapshots.length; i++) {
var snapshot = snapshots[i];
for (const snapshot of snapshots) {
if (!userCanAccessSnapshot(connectionInfo, request.collection, snapshot)) {
request.rejectSnapshotRead(snapshot,
new Error('Not allowed to access snapshot in ' request.collection));
Expand All @@ -635,10 +633,10 @@ backend.useMiddleware('readSnapshots', function (request, callback) {
// potentially making some database request to check which documents they can access, or which
// roles they have, etc. If doing this asynchronously, make sure you call backend.connect
// after the permissions have been fetched.
var connectionInfo = getUserPermissions();
const connectionInfo = getUserPermissions();
// Pass info in as the second argument. This will be made available as request.req in the
// 'connection' middleware.
var connection = backend.connect(null, connectionInfo);
const connection = backend.connect(null, connectionInfo);
```
### Logging
Expand Down
45 changes: 19 additions & 26 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ Agent.prototype._querySubscribe = function(queryId, collection, query, options,
wait++;
this.backend.fetchBulk(this, collection, options.fetch, function(err, snapshotMap) {
if (err) return finish(err);
message = {data: getMapData(snapshotMap)};
message = getMapResult(snapshotMap);
finish();
});
}
Expand Down Expand Up @@ -463,12 +463,23 @@ function getResultsData(results) {
return items;
}

function getMapData(snapshotMap) {
function getMapResult(snapshotMap) {
var data = {};
var errorMap = {};
var hasSnapshotError = false;
for (var id in snapshotMap) {
data[id] = getSnapshotData(snapshotMap[id]);
var mapValue = snapshotMap[id];
// fetchBulk / subscribeBulk map data can have either a Snapshot or an object
// `{error: Error | string}` as a value.
if (mapValue.error) {
hasSnapshotError = true;
// Transform errors to serialization-friendly objects.
errorMap[id] = getReplyErrorObject(mapValue.error);
} else {
data[id] = getSnapshotData(mapValue);
}
}
return data;
return hasSnapshotError ? {data: data, errorMap: errorMap} : {data: data};
}

function getSnapshotData(snapshot) {
Expand Down Expand Up @@ -517,20 +528,11 @@ Agent.prototype._fetchOps = function(collection, id, version, callback) {
Agent.prototype._fetchBulk = function(collection, versions, callback) {
if (Array.isArray(versions)) {
this.backend.fetchBulk(this, collection, versions, function(err, snapshotMap) {
var isPartialError = err != null && err.code === ERROR_CODE.ERR_SNAPSHOT_READS_REJECTED;
if (err && !isPartialError) {
if (err) {
return callback(err);
}
if (snapshotMap) {
var result = {data: getMapData(snapshotMap)};
if (err && isPartialError) {
var errorMap = {};
for (var docId in err.idToError) {
// Transform Error instances to serialization-friendly objects.
errorMap[docId] = getReplyErrorObject(err.idToError[docId]);
}
result.errorMap = errorMap;
}
var result = getMapResult(snapshotMap);
callback(null, result);
} else {
callback();
Expand Down Expand Up @@ -581,8 +583,7 @@ Agent.prototype._subscribeBulk = function(collection, versions, callback) {
// See _subscribe() above. This function's logic should match but in bulk
var agent = this;
this.backend.subscribeBulk(this, collection, versions, function(err, streams, snapshotMap, opsMap) {
var isPartialError = err != null && err.code === ERROR_CODE.ERR_SNAPSHOT_READS_REJECTED;
if (err && !isPartialError) {
if (err) {
return callback(err);
}
if (opsMap) {
Expand All @@ -592,15 +593,7 @@ Agent.prototype._subscribeBulk = function(collection, versions, callback) {
agent._subscribeToStream(collection, id, streams[id]);
}
if (snapshotMap) {
var result = {data: getMapData(snapshotMap)};
if (err && isPartialError) {
var errorMap = {};
for (var docId in err.idToError) {
// Transform Error instances to serialization-friendly objects.
errorMap[docId] = getReplyErrorObject(err.idToError[docId]);
}
result.errorMap = errorMap;
}
var result = getMapResult(snapshotMap);
callback(null, result);
} else {
callback();
Expand Down
97 changes: 47 additions & 50 deletions lib/backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ var ShareDBError = require('./error');
var Snapshot = require('./snapshot');
var StreamSocket = require('./stream-socket');
var SubmitRequest = require('./submit-request');
var ReadSnapshotsRequest = require('./readSnapshots-request');
var ReadSnapshotsRequest = require('./read-snapshots-request');

var ERROR_CODE = ShareDBError.CODES;

Expand Down Expand Up @@ -260,7 +260,7 @@ Backend.prototype._sanitizeSnapshots = function(agent, projection, collection, s
// Handle "partial rejection" - "readSnapshots" middleware functions can use
// `request.rejectSnapshotRead(snapshot, error)` to reject the read of a specific snapshot.
if (request.hasSnapshotRejection()) {
err = getReadSnapshotsError(snapshots, request._idToError);
err = request.getReadSnapshotsError();
}
if (err) {
callback(err);
Expand All @@ -270,42 +270,6 @@ Backend.prototype._sanitizeSnapshots = function(agent, projection, collection, s
});
};

/**
* Returns an overall error from "readSnapshots" based on the snapshot-specific errors.
*
* - If there's exactly one snapshot and it has an error, then that error is returned.
* - If there's more than one snapshot and at least one has an error, then an overall
* "ERR_SNAPSHOT_READS_REJECTED" is returned, with an `idToError` property.
*
* @param {Snapshot[]} snapshots
* @param {{[docId: string]: string | Error}} idToError
*/
function getReadSnapshotsError(snapshots, idToError) {
// If there are 0 snapshots, there can't be any snapshot-specific errors.
if (snapshots.length === 0) {
return;
}

// Single snapshot with error is treated as a full error.
if (snapshots.length === 1) {
var snapshotError = idToError[snapshots[0].id];
if (snapshotError) {
return snapshotError;
} else {
return;
}
}

// Errors in specific snapshots result in an overall ERR_SNAPSHOT_READS_REJECTED.
//
// fetchBulk and subscribeBulk know how to handle that special error by sending a doc-by-doc
// success/failure to the client. Other methods that don't or can't handle partial failures
// will treat it as a full rejection.
var err = new ShareDBError('ERR_SNAPSHOT_READS_REJECTED');
err.idToError = idToError;
return err;
}

Backend.prototype._getSnapshotProjection = function(db, projection) {
return (db.projectsSnapshots) ? null : projection;
};
Expand Down Expand Up @@ -427,6 +391,18 @@ Backend.prototype.fetch = function(agent, index, id, options, callback) {
});
};

/**
* Map of document id to Snapshot or error object.
* @typedef {{ [id: string]: Snapshot | { error: Error | string } }} SnapshotMap
*/

/**
* @param {Agent} agent
* @param {string} index
* @param {string[]} ids
* @param {*} options
* @param {(err?: Error | string, snapshotMap?: SnapshotMap) => void} callback
*/
Backend.prototype.fetchBulk = function(agent, index, ids, options, callback) {
if (typeof options === 'function') {
callback = options;
Expand Down Expand Up @@ -458,8 +434,9 @@ Backend.prototype.fetchBulk = function(agent, index, ids, options, callback) {
if (err) {
if (err.code === ERROR_CODE.ERR_SNAPSHOT_READS_REJECTED) {
for (var docId in err.idToError) {
delete snapshotMap[docId];
snapshotMap[docId] = {error: err.idToError[docId]};
}
err = undefined;
} else {
snapshotMap = undefined;
}
Expand Down Expand Up @@ -524,6 +501,26 @@ Backend.prototype.subscribe = function(agent, index, id, version, options, callb
});
};

/**
* Map of document id to pubsub stream.
* @typedef {{ [id: string]: Stream }} StreamMap
*/
/**
* Map of document id to array of ops for the doc.
* @typedef {{ [id: string]: Op[] }} OpsMap
*/

/**
* @param {Agent} agent
* @param {string} index
* @param {string[]} versions
* @param {(
* err?: Error | string | null,
* streams?: StreamMap,
* snapshotMap?: SnapshotMap | null
* opsMap?: OpsMap
* ) => void} callback
*/
Backend.prototype.subscribeBulk = function(agent, index, versions, callback) {
var start = Date.now();
var projection = this.projections[index];
Expand Down Expand Up @@ -554,17 +551,17 @@ Backend.prototype.subscribeBulk = function(agent, index, versions, callback) {
// If an array of ids, get current snapshots
backend.fetchBulk(agent, index, ids, function(err, snapshotMap) {
if (err) {
if (err.code === ERROR_CODE.ERR_SNAPSHOT_READS_REJECTED) {
// Partial error, destroy only streams for docs with errors.
for (var docId in err.idToError) {
streams[docId].destroy();
delete streams[docId];
}
} else {
// Full error, destroy all streams.
destroyStreams(streams);
streams = undefined;
snapshotMap = undefined;
// Full error, destroy all streams.
destroyStreams(streams);
streams = undefined;
snapshotMap = undefined;
}
for (var docId in snapshotMap) {
// The doc id could map to an object `{error: Error | string}`, which indicates that
// particular snapshot's read was rejected. Destroy the streams fur such docs.
if (snapshotMap[docId].error) {
streams[docId].destroy();
delete streams[docId];
}
}
backend.emit('timing', 'subscribeBulk.snapshot', Date.now() - start, request);
Expand Down
40 changes: 40 additions & 0 deletions lib/readSnapshots-request.js → lib/read-snapshots-request.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
var ShareDBError = require('./error');

module.exports = ReadSnapshotsRequest;

/**
Expand All @@ -18,6 +20,9 @@ function ReadSnapshotsRequest(collection, snapshots, snapshotType) {
this.agent = null;
this.backend = null;

/**
* Map of doc id to error: `{[docId: string]: string | Error}`
*/
this._idToError = null;
}

Expand Down Expand Up @@ -47,3 +52,38 @@ ReadSnapshotsRequest.prototype.rejectSnapshotRead = function(snapshot, error) {
ReadSnapshotsRequest.prototype.hasSnapshotRejection = function() {
return this._idToError != null;
};

/**
* Returns an overall error from "readSnapshots" based on the snapshot-specific errors.
*
* - If there's exactly one snapshot and it has an error, then that error is returned.
* - If there's more than one snapshot and at least one has an error, then an overall
* "ERR_SNAPSHOT_READS_REJECTED" is returned, with an `idToError` property.
*/
ReadSnapshotsRequest.prototype.getReadSnapshotsError = function() {
var snapshots = this.snapshots;
var idToError = this._idToError;
// If there are 0 snapshots, there can't be any snapshot-specific errors.
if (snapshots.length === 0) {
return;
}

// Single snapshot with error is treated as a full error.
if (snapshots.length === 1) {
var snapshotError = idToError[snapshots[0].id];
if (snapshotError) {
return snapshotError;
} else {
return;
}
}

// Errors in specific snapshots result in an overall ERR_SNAPSHOT_READS_REJECTED.
//
// fetchBulk and subscribeBulk know how to handle that special error by sending a doc-by-doc
// success/failure to the client. Other methods that don't or can't handle partial failures
// will treat it as a full rejection.
var err = new ShareDBError(ShareDBError.CODES.ERR_SNAPSHOT_READS_REJECTED);
err.idToError = idToError;
return err;
};

0 comments on commit 3fe83f3

Please sign in to comment.