Skip to content

Commit

Permalink
Merge 8e46271 into 95c81b8
Browse files Browse the repository at this point in the history
  • Loading branch information
alecgibson committed Jan 23, 2019
2 parents 95c81b8 + 8e46271 commit 80dedc8
Show file tree
Hide file tree
Showing 17 changed files with 1,111 additions and 78 deletions.
23 changes: 23 additions & 0 deletions README.md
Expand Up @@ -269,6 +269,27 @@ Get a read-only snapshot of a document at the requested version.
}
```

`connection.fetchSnapshotByTimestamp(collection, id, timestamp, callback): void;`
Get a read-only snapshot of a document at the requested version.

* `collection` _(String)_
Collection name of the snapshot
* `id` _(String)_
ID of the snapshot
* `timestamp` _(number) [optional]_
The timestamp of the desired snapshot. The returned snapshot will be the latest snapshot before the provided timestamp
* `callback` _(Function)_
Called with `(error, snapshot)`, where `snapshot` takes the following form:

```javascript
{
id: string; // ID of the snapshot
v: number; // version number of the snapshot
type: string; // the OT type of the snapshot, or null if it doesn't exist or is deleted
data: any; // the snapshot
}
```

### Class: `ShareDB.Doc`

`doc.type` _(String_)
Expand Down Expand Up @@ -464,3 +485,5 @@ The `41xx` and `51xx` codes are reserved for use by ShareDB DB adapters, and the
* 5018 - Required QueryEmitter listener not assigned
* 5019 - getMilestoneSnapshot MilestoneDB method unimplemented
* 5020 - saveMilestoneSnapshot MilestoneDB method unimplemented
* 5021 - getMilestoneSnapshotAtOrBeforeTime MilestoneDB method unimplemented
* 5022 - getMilestoneSnapshotAtOrAfterTime MilestoneDB method unimplemented
6 changes: 6 additions & 0 deletions lib/agent.js
Expand Up @@ -303,6 +303,8 @@ Agent.prototype._handleMessage = function(request, callback) {
return this._submit(request.c, request.d, op, callback);
case 'nf':
return this._fetchSnapshot(request.c, request.d, request.v, callback);
case 'nt':
return this._fetchSnapshotByTimestamp(request.c, request.d, request.ts, callback);
default:
callback({code: 4000, message: 'Invalid or unknown message'});
}
Expand Down Expand Up @@ -589,3 +591,7 @@ Agent.prototype._createOp = function(request) {
Agent.prototype._fetchSnapshot = function (collection, id, version, callback) {
this.backend.fetchSnapshot(this, collection, id, version, callback);
};

Agent.prototype._fetchSnapshotByTimestamp = function (collection, id, timestamp, callback) {
this.backend.fetchSnapshotByTimestamp(this, collection, id, timestamp, callback);
};
113 changes: 83 additions & 30 deletions lib/backend.js
Expand Up @@ -81,7 +81,9 @@ Backend.prototype.SNAPSHOT_TYPES = {
// The current snapshot is being fetched (eg through backend.fetch)
current: 'current',
// A specific snapshot is being fetched by version (eg through backend.fetchSnapshot)
byVersion: 'byVersion'
byVersion: 'byVersion',
// A specific snapshot is being fetch by timestamp (eg through backend.fetchSnapshotByTimestamp)
byTimestamp: 'byTimestamp'
};

Backend.prototype._shimDocAction = function() {
Expand Down Expand Up @@ -627,6 +629,8 @@ Backend.prototype.fetchSnapshot = function(agent, index, id, version, callback)

Backend.prototype._fetchSnapshot = function (collection, id, version, callback) {
var db = this.db;
var backend = this;

this.milestoneDb.getMilestoneSnapshot(collection, id, version, function (error, milestoneSnapshot) {
if (error) return callback(error);

Expand All @@ -637,49 +641,98 @@ Backend.prototype._fetchSnapshot = function (collection, id, version, callback)
db.getOps(collection, id, from, version, null, function (error, ops) {
if (error) return callback(error);

var type = null;
var data;
var fetchedVersion = 0;

if (milestoneSnapshot) {
type = types.map[milestoneSnapshot.type];
if (!type) return callback({ code: 4008, message: 'Unknown type' });
data = milestoneSnapshot.data;
fetchedVersion = milestoneSnapshot.v;
}
backend._buildSnapshotFromOps(id, milestoneSnapshot, ops, function (error, snapshot) {
if (error) return callback(error);

for (var index = 0; index < ops.length; index++) {
var op = ops[index];
fetchedVersion = op.v + 1;

if (op.create) {
type = types.map[op.create.type];
if (!type) return callback({ code: 4008, message: 'Unknown type' });
data = type.create(op.create.data);
} else if (op.del) {
data = undefined;
type = null;
} else {
data = type.apply(data, op.op);
if (version > snapshot.v) {
return callback({ code: 4024, message: 'Requested version exceeds latest snapshot version' });
}
}

type = type ? type.uri : null;
callback(null, snapshot);
});
});
});
};

if (version > fetchedVersion) {
return callback({ code: 4024, message: 'Requested version exceeds latest snapshot version' });
}
Backend.prototype.fetchSnapshotByTimestamp = function (agent, index, id, timestamp, callback) {
var start = Date.now();
var backend = this;
var projection = this.projections[index];
var collection = projection ? projection.target : index;
var request = {
agent: agent,
index: index,
collection: collection,
id: id,
timestamp: timestamp
};

var snapshot = new Snapshot(id, fetchedVersion, type, data, null);
this._fetchSnapshotByTimestamp(collection, id, timestamp, function (error, snapshot) {
if (error) return callback(error);
var snapshotProjection = backend._getSnapshotProjection(backend.db, projection);
var snapshots = [snapshot];
var snapshotType = backend.SNAPSHOT_TYPES.byTimestamp;
backend._sanitizeSnapshots(agent, snapshotProjection, collection, snapshots, snapshotType, function (error) {
if (error) return callback(error);
backend.emit('timing', 'fetchSnapshot', Date.now() - start, request);
callback(null, snapshot);
});
});
};

Backend.prototype._fetchSnapshotByTimestamp = function (collection, id, timestamp, callback) {
var db = this.db;
var milestoneDb = this.milestoneDb;
var backend = this;

var milestoneSnapshot;
var from = 0;
var to = null;

milestoneDb.getMilestoneSnapshotAtOrBeforeTime(collection, id, timestamp, function (error, snapshot) {
if (error) return callback(error);
milestoneSnapshot = snapshot;
if (snapshot) from = snapshot.v;

milestoneDb.getMilestoneSnapshotAtOrAfterTime(collection, id, timestamp, function (error, snapshot) {
if (error) return callback(error);
if (snapshot) to = snapshot.v;

var options = {metadata: true};
db.getOps(collection, id, from, to, options, function (error, ops) {
if (error) return callback(error);
filterOpsInPlaceBeforeTimestamp(ops, timestamp);
backend._buildSnapshotFromOps(id, milestoneSnapshot, ops, callback);
});
});
});
};

Backend.prototype._buildSnapshotFromOps = function (id, startingSnapshot, ops, callback) {
var snapshot = startingSnapshot || new Snapshot(id, 0, null, undefined, null);
var error = ot.applyOps(snapshot, ops);
callback(error, snapshot);
};

function pluckIds(snapshots) {
var ids = [];
for (var i = 0; i < snapshots.length; i++) {
ids.push(snapshots[i].id);
}
return ids;
}

function filterOpsInPlaceBeforeTimestamp(ops, timestamp) {
if (timestamp === null) {
return;
}

for (var i = 0; i < ops.length; i++) {
var op = ops[i];
var opTimestamp = op.m && op.m.ts;
if (opTimestamp > timestamp) {
ops.length = i;
return;
}
}
}
34 changes: 32 additions & 2 deletions lib/client/connection.js
@@ -1,6 +1,7 @@
var Doc = require('./doc');
var Query = require('./query');
var SnapshotRequest = require('./snapshot-request');
var SnapshotVersionRequest = require('./snapshot-request/snapshot-version-request');
var SnapshotTimestampRequest = require('./snapshot-request/snapshot-timestamp-request');
var emitter = require('../emitter');
var ShareDBError = require('../error');
var types = require('../types');
Expand Down Expand Up @@ -233,6 +234,7 @@ Connection.prototype.handleMessage = function(message) {
return this._handleBulkMessage(message, '_handleUnsubscribe');

case 'nf':
case 'nt':
return this._handleSnapshotFetch(err, message);

case 'f':
Expand Down Expand Up @@ -634,7 +636,35 @@ Connection.prototype.fetchSnapshot = function(collection, id, version, callback)
}

var requestId = this.nextSnapshotRequestId++;
var snapshotRequest = new SnapshotRequest(this, requestId, collection, id, version, callback);
var snapshotRequest = new SnapshotVersionRequest(this, requestId, collection, id, version, callback);
this._snapshotRequests[snapshotRequest.requestId] = snapshotRequest;
snapshotRequest.send();
};

/**
* Fetch a read-only snapshot at a given timestamp
*
* @param collection - the collection name of the snapshot
* @param id - the ID of the snapshot
* @param timestamp (optional) - the timestamp to fetch
* @param callback - (error, snapshot) => void, where snapshot takes the following schema:
*
* {
* id: string; // ID of the snapshot
* v: number; // version number of the snapshot
* type: string; // the OT type of the snapshot, or null if it doesn't exist or is deleted
* data: any; // the snapshot
* }
*
*/
Connection.prototype.fetchSnapshotByTimestamp = function (collection, id, timestamp, callback) {
if (typeof timestamp === 'function') {
callback = timestamp;
timestamp = null;
}

var requestId = this.nextSnapshotRequestId++;
var snapshotRequest = new SnapshotTimestampRequest(this, requestId, collection, id, timestamp, callback);
this._snapshotRequests[snapshotRequest.requestId] = snapshotRequest;
snapshotRequest.send();
};
Expand Down
@@ -1,25 +1,19 @@
var Snapshot = require('../snapshot');
var util = require('../util');
var emitter = require('../emitter');
var Snapshot = require('../../snapshot');
var emitter = require('../../emitter');

module.exports = SnapshotRequest;

function SnapshotRequest(connection, requestId, collection, id, version, callback) {
function SnapshotRequest(connection, requestId, collection, id, callback) {
emitter.EventEmitter.call(this);

if (typeof callback !== 'function') {
throw new Error('Callback is required for SnapshotRequest');
}

if (!util.isValidVersion(version)) {
throw new Error('Snapshot version must be a positive integer or null');
}

this.requestId = requestId;
this.connection = connection;
this.id = id;
this.collection = collection;
this.version = version;
this.callback = callback;

this.sent = false;
Expand All @@ -31,15 +25,7 @@ SnapshotRequest.prototype.send = function () {
return;
}

var message = {
a: 'nf',
id: this.requestId,
c: this.collection,
d: this.id,
v: this.version,
};

this.connection.send(message);
this.connection.send(this._message());
this.sent = true;
};

Expand All @@ -61,6 +47,8 @@ SnapshotRequest.prototype._handleResponse = function (error, message) {
return this.callback(error);
}

var snapshot = new Snapshot(this.id, message.v, message.type, message.data, null);
var metadata = message.meta ? message.meta : null;
var snapshot = new Snapshot(this.id, message.v, message.type, message.data, metadata);

this.callback(null, snapshot);
};
26 changes: 26 additions & 0 deletions lib/client/snapshot-request/snapshot-timestamp-request.js
@@ -0,0 +1,26 @@
var SnapshotRequest = require('./snapshot-request');
var util = require('../../util');

module.exports = SnapshotTimestampRequest;

function SnapshotTimestampRequest(connection, requestId, collection, id, timestamp, callback) {
SnapshotRequest.call(this, connection, requestId, collection, id, callback);

if (!util.isValidTimestamp(timestamp)) {
throw new Error('Snapshot timestamp must be a positive integer or null');
}

this.timestamp = timestamp;
}

SnapshotTimestampRequest.prototype = Object.create(SnapshotRequest.prototype);

SnapshotTimestampRequest.prototype._message = function () {
return {
a: 'nt',
id: this.requestId,
c: this.collection,
d: this.id,
ts: this.timestamp,
};
};
26 changes: 26 additions & 0 deletions lib/client/snapshot-request/snapshot-version-request.js
@@ -0,0 +1,26 @@
var SnapshotRequest = require('./snapshot-request');
var util = require('../../util');

module.exports = SnapshotVersionRequest;

function SnapshotVersionRequest (connection, requestId, collection, id, version, callback) {
SnapshotRequest.call(this, connection, requestId, collection, id, callback);

if (!util.isValidVersion(version)) {
throw new Error('Snapshot version must be a positive integer or null');
}

this.version = version;
}

SnapshotVersionRequest.prototype = Object.create(SnapshotRequest.prototype);

SnapshotVersionRequest.prototype._message = function () {
return {
a: 'nf',
id: this.requestId,
c: this.collection,
d: this.id,
v: this.version,
};
};

0 comments on commit 80dedc8

Please sign in to comment.