Skip to content

Commit

Permalink
Merge e80fe46 into 08f3339
Browse files Browse the repository at this point in the history
  • Loading branch information
alecgibson committed Aug 9, 2018
2 parents 08f3339 + e80fe46 commit 61295b9
Show file tree
Hide file tree
Showing 11 changed files with 624 additions and 29 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
# Emacs
\#*\#

# VS Code
.vscode/

# Logs
logs
*.log
Expand Down
24 changes: 23 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var socket = new WebSocket('ws://' + window.location.host);
var connection = new sharedb.Connection(socket);
```

The native Websocket object that you feed to ShareDB's `Connection` constructor **does not** handle reconnections.
The native Websocket object that you feed to ShareDB's `Connection` constructor **does not** handle reconnections.

The easiest way is to give it a WebSocket object that does reconnect. There are plenty of example on the web. The most important thing is that the custom reconnecting websocket, must have the same API as the native rfc6455 version.

Expand Down Expand Up @@ -227,6 +227,27 @@ changes. Returns a [`ShareDB.Query`](#class-sharedbquery) instance.
* `options.*`
All other options are passed through to the database adapter.

`connection.fetchSnapshot(collection, id, version, callback): void;`
Get a read-only snapshot of a document at the requested version.

* `collection` _(String)_
Collection name of the snapshot
* `id` _(String)_
ID of the snapshot
* `version` _(number) [optional]_
The version number of the desired snapshot
* `callback` _(Function)_
Called with `(error, snapshot)`, where `snapshot` takes the following form:

```javascript
{
id: string; // ID of the snapshot
v: number; // version number of the snapshot
type: string; // the OT type of the snapshot, or null if it doesn't exist or is deleted
data: any; // the snapshot
}
```

### Class: `ShareDB.Doc`

`doc.type` _(String_)
Expand Down Expand Up @@ -375,6 +396,7 @@ Additional fields may be added to the error object for debugging context dependi
* 4021 - Invalid client id
* 4022 - Database adapter does not support queries
* 4023 - Cannot project snapshots of this type
* 4024 - Invalid version

### 5000 - Internal error

Expand Down
6 changes: 6 additions & 0 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ Agent.prototype._handleMessage = function(request, callback) {
var op = this._createOp(request);
if (!op) return callback({code: 4000, message: 'Invalid op message'});
return this._submit(request.c, request.d, op, callback);
case 'nf':
return this._fetchSnapshot(request.c, request.d, request.v, callback);
default:
callback({code: 4000, message: 'Invalid or unknown message'});
}
Expand Down Expand Up @@ -582,3 +584,7 @@ Agent.prototype._createOp = function(request) {
return new DeleteOp(src, request.seq, request.v, request.del);
}
};

Agent.prototype._fetchSnapshot = function (collection, id, version, callback) {
this.backend.fetchSnapshot(this, collection, id, version, callback);
};
66 changes: 66 additions & 0 deletions lib/backend.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ var MemoryPubSub = require('./pubsub/memory');
var ot = require('./ot');
var projections = require('./projections');
var QueryEmitter = require('./query-emitter');
var Snapshot = require('./snapshot');
var StreamSocket = require('./stream-socket');
var SubmitRequest = require('./submit-request');
var types = require('./types');

var warnDeprecatedDoc = true;
var warnDeprecatedAfterSubmit = true;

Expand Down Expand Up @@ -580,6 +583,69 @@ Backend.prototype.getChannels = function(collection, id) {
];
};

Backend.prototype.fetchSnapshot = function(agent, index, id, version, callback) {
var start = Date.now();
var backend = this;
var projection = this.projections[index];
var collection = projection ? projection.target : index;
var request = {
agent: agent,
index: index,
collection: collection,
id: id,
version: version
};

this._fetchSnapshot(collection, id, version, function (error, snapshot) {
if (error) return callback(error);
var snapshotProjection = backend._getSnapshotProjection(backend.db, projection);
var snapshots = [snapshot];
backend._sanitizeSnapshots(agent, snapshotProjection, collection, snapshots, function (error) {
if (error) return callback(error);
backend.emit('timing', 'fetchSnapshot', Date.now() - start, request);
callback(null, snapshot);
});
});
};

Backend.prototype._fetchSnapshot = function (collection, id, version, callback) {
// Bypass backend.getOps so that we don't call _sanitizeOps. We want to avoid this, because:
// - we want to avoid the 'op' middleware, because we later use the 'readSnapshots' middleware in _sanitizeSnapshots
// - we handle the projection in _sanitizeSnapshots
this.db.getOps(collection, id, 0, version, null, function (error, ops) {
if (error) return callback(error);

var type = null;
var data;
var fetchedVersion = 0;

for (var index = 0; index < ops.length; index++) {
var op = ops[index];
fetchedVersion = op.v + 1;

if (op.create) {
type = types.map[op.create.type];
if (!type) return callback({ code: 4008, message: 'Unknown type' });
data = type.create(op.create.data);
} else if (op.del) {
data = undefined;
type = null;
} else {
data = type.apply(data, op.op);
}
}

type = type ? type.uri : null;

if (version > fetchedVersion) {
return callback({ code: 4024, message: 'Requested version exceeds latest snapshot version' });
}

var snapshot = new Snapshot(id, fetchedVersion, type, data, null);
callback(null, snapshot);
});
};

function pluckIds(snapshots) {
var ids = [];
for (var i = 0; i < snapshots.length; i++) {
Expand Down
66 changes: 63 additions & 3 deletions lib/client/connection.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
var Doc = require('./doc');
var Query = require('./query');
var SnapshotRequest = require('./snapshot-request');
var emitter = require('../emitter');
var ShareDBError = require('../error');
var types = require('../types');
Expand Down Expand Up @@ -33,13 +34,17 @@ function Connection(socket) {
// (created documents MUST BE UNIQUE)
this.collections = {};

// Each query is created with an id that the server uses when it sends us
// info about the query (updates, etc)
// Each query and snapshot request is created with an id that the server uses when it sends us
// info about the request (updates, etc)
this.nextQueryId = 1;
this.nextSnapshotRequestId = 1;

// Map from query ID -> query object.
this.queries = {};

// Map from snapshot request ID -> snapshot request
this._snapshotRequests = {};

// A unique message number for the given id
this.seq = 1;

Expand Down Expand Up @@ -226,6 +231,9 @@ Connection.prototype.handleMessage = function(message) {
case 'bu':
return this._handleBulkMessage(message, '_handleUnsubscribe');

case 'nf':
return this._handleSnapshotFetch(err, message);

case 'f':
var doc = this.getExisting(message.c, message.d);
if (doc) doc._handleFetch(err, message.data);
Expand Down Expand Up @@ -310,6 +318,11 @@ Connection.prototype._setState = function(newState, reason) {
docs[id]._onConnectionStateChanged();
}
}
// Emit the event to all snapshots
for (var id in this._snapshotRequests) {
var snapshotRequest = this._snapshotRequests[id];
snapshotRequest._onConnectionStateChanged();
}
this.endBulk();

this.emit(newState, reason);
Expand Down Expand Up @@ -523,7 +536,8 @@ Connection.prototype.createSubscribeQuery = function(collection, q, options, cal
Connection.prototype.hasPending = function() {
return !!(
this._firstDoc(hasPending) ||
this._firstQuery(hasPending)
this._firstQuery(hasPending) ||
this._firstSnapshotRequest()
);
};
function hasPending(object) {
Expand Down Expand Up @@ -552,6 +566,11 @@ Connection.prototype.whenNothingPending = function(callback) {
query.once('ready', this._nothingPendingRetry(callback));
return;
}
var snapshotRequest = this._firstSnapshotRequest();
if (snapshotRequest) {
snapshotRequest.once('ready', this._nothingPendingRetry(callback));
return;
}
// Call back when no pending operations
process.nextTick(callback);
};
Expand Down Expand Up @@ -584,3 +603,44 @@ Connection.prototype._firstQuery = function(fn) {
}
}
};

Connection.prototype._firstSnapshotRequest = function () {
for (var id in this._snapshotRequests) {
return this._snapshotRequests[id];
}
};

/**
* Fetch a read-only snapshot at a given version
*
* @param collection - the collection name of the snapshot
* @param id - the ID of the snapshot
* @param version (optional) - the version number to fetch
* @param callback - (error, snapshot) => void, where snapshot takes the following schema:
*
* {
* id: string; // ID of the snapshot
* v: number; // version number of the snapshot
* type: string; // the OT type of the snapshot, or null if it doesn't exist or is deleted
* data: any; // the snapshot
* }
*
*/
Connection.prototype.fetchSnapshot = function(collection, id, version, callback) {
if (typeof version === 'function') {
callback = version;
version = null;
}

var requestId = this.nextSnapshotRequestId++;
var snapshotRequest = new SnapshotRequest(this, requestId, collection, id, version, callback);
this._snapshotRequests[snapshotRequest.requestId] = snapshotRequest;
snapshotRequest.send();
};

Connection.prototype._handleSnapshotFetch = function (error, message) {
var snapshotRequest = this._snapshotRequests[message.id];
if (!snapshotRequest) return;
delete this._snapshotRequests[message.id];
snapshotRequest._handleResponse(error, message);
};
75 changes: 75 additions & 0 deletions lib/client/snapshot-request.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
var Snapshot = require('../snapshot');
var util = require('../util');
var emitter = require('../emitter');

module.exports = SnapshotRequest;

function SnapshotRequest(connection, requestId, collection, id, version, callback) {
emitter.EventEmitter.call(this);

if (typeof callback !== 'function') {
throw new Error('Callback is required for SnapshotRequest');
}

if (!this.isValidVersion(version)) {
throw new Error('Snapshot version must be a positive integer or null');
}

this.requestId = requestId;
this.connection = connection;
this.id = id;
this.collection = collection;
this.version = version;
this.callback = callback;

this.sent = false;
}
emitter.mixin(SnapshotRequest);

SnapshotRequest.prototype.isValidVersion = function (version) {
if (version === null) {
return true;
}

if (!util.isInteger(version)) {
return false;
}

return version >= 0;
}

SnapshotRequest.prototype.send = function () {
if (!this.connection.canSend) {
return;
}

var message = {
a: 'nf',
id: this.requestId,
c: this.collection,
d: this.id,
v: this.version,
};

this.connection.send(message);
this.sent = true;
};

SnapshotRequest.prototype._onConnectionStateChanged = function () {
if (this.connection.canSend && !this.sent) {
this.send();
} else if (!this.connection.canSend) {
this.sent = false;
}
};

SnapshotRequest.prototype._handleResponse = function (error, message) {
this.emit('ready');

if (error) {
return this.callback(error);
}

var snapshot = new Snapshot(this.id, message.v, message.type, message.data, null);
this.callback(null, snapshot);
};
16 changes: 4 additions & 12 deletions lib/db/memory.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
var DB = require('./index');
var Snapshot = require('../snapshot');

// In-memory ShareDB database
//
Expand Down Expand Up @@ -151,24 +152,15 @@ MemoryDB.prototype._getSnapshotSync = function(collection, id, includeMetadata)
var snapshot;
if (doc) {
var data = clone(doc.data);
var meta = (includeMetadata) ? clone(doc.m) : undefined;
snapshot = new MemorySnapshot(id, doc.v, doc.type, data, meta);
var meta = (includeMetadata) ? clone(doc.m) : null;
snapshot = new Snapshot(id, doc.v, doc.type, data, meta);
} else {
var version = this._getVersionSync(collection, id);
snapshot = new MemorySnapshot(id, version, null, undefined);
snapshot = new Snapshot(id, version, null, undefined, null);
}
return snapshot;
};

// `id`, and `v` should be on every returned snapshot
function MemorySnapshot(id, version, type, data, meta) {
this.id = id;
this.v = version;
this.type = type;
this.data = data;
if (meta) this.m = meta;
}

MemoryDB.prototype._getOpLogSync = function(collection, id) {
var collectionOps = this.ops[collection] || (this.ops[collection] = {});
return collectionOps[id] || (collectionOps[id] = []);
Expand Down
8 changes: 8 additions & 0 deletions lib/snapshot.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
module.exports = Snapshot;
function Snapshot(id, version, type, data, meta) {
this.id = id;
this.v = version;
this.type = type;
this.data = data;
this.m = meta;
}
7 changes: 7 additions & 0 deletions lib/util.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,10 @@ exports.hasKeys = function(object) {
for (var key in object) return true;
return false;
};

// https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Number/isInteger#Polyfill
exports.isInteger = Number.isInteger || function (value) {
return typeof value === 'number' &&
isFinite(value) &&
Math.floor(value) === value;
};

0 comments on commit 61295b9

Please sign in to comment.