From cb78434b629d85690148a44700d8e758b68be0c6 Mon Sep 17 00:00:00 2001
From: Andrew Imm Open Event - When you call query.subscribe(), we send a subscribe request to
+ * the LiveQuery server, when we get the confirmation from the LiveQuery server,
+ * this event will be emitted. When the client loses WebSocket connection to the
+ * LiveQuery server, we will try to auto reconnect the LiveQuery server. If we
+ * reconnect the LiveQuery server and successfully resubscribe the ParseQuery,
+ * you'll also get this event.
+ *
+ *
+ * var p1 = Parse.Promise.as(1);
+ * var p2 = Parse.Promise.as(2);
+ * var p3 = Parse.Promise.as(3);
+ *
+ * Parse.Promise.all([p1, p2, p3]).then(function([r1, r2, r3]) {
+ * console.log(r1); // prints 1
+ * console.log(r2); // prints 2
+ * console.log(r3); // prints 3
+ * });
+ *
+ * @method all
+ * @param {Iterable} promises an iterable of promises to wait for.
+ * @static
+ * @return {Parse.Promise} the new promise.
+ */
+ static all(promises) {
+ let total = 0;
+ let objects = [];
+
+ for (let p of promises) {
+ objects[total++] = p;
+ }
+
+ if (total === 0) {
+ return ParsePromise.as([]);
+ }
+
+ let hadError = false;
+ let promise = new ParsePromise();
+ let resolved = 0;
+ let results = [];
+ objects.forEach((object, i) => {
+ if (ParsePromise.is(object)) {
+ object.then((result) => {
+ if (hadError) {
+ return false;
+ }
+ results[i] = result;
+ resolved++;
+ if (resolved >= total) {
+ promise.resolve(results);
+ }
+ }, (error) => {
+ // Reject immediately
+ promise.reject(error);
+ hadError = true;
+ });
+ } else {
+ results[i] = object;
+ resolved++;
+ if (!hadError && resolved >= total) {
+ promise.resolve(results);
+ }
+ }
+ });
+
+ return promise;
+ }
+
+ /**
+ * Returns a new promise that is immediately fulfilled when any of the
+ * promises in the iterable argument are resolved or rejected. If the
+ * first promise to complete is resolved, the returned promise will be
+ * resolved with the same value. Likewise, if the first promise to
+ * complete is rejected, the returned promise will be rejected with the
+ * same reason.
+ *
+ * @method race
+ * @param {Iterable} promises an iterable of promises to wait for.
+ * @static
+ * @return {Parse.Promise} the new promise.
+ */
+ static race(promises) {
+ let completed = false;
+ let promise = new ParsePromise();
+ for (let p of promises) {
+ if (ParsePromise.is(p)) {
+ p.then((result) => {
+ if (completed) {
+ return;
+ }
+ completed = true;
+ promise.resolve(result);
+ }, (error) => {
+ if (completed) {
+ return;
+ }
+ completed = true;
+ promise.reject(error);
+ });
+ } else if (!completed) {
+ completed = true;
+ promise.resolve(p);
+ }
+ }
+
+ return promise;
+ }
+
/**
* Runs the given asyncFunction repeatedly, as long as the predicate
* function returns a truthy value. Stops repeating if asyncFunction returns
diff --git a/src/__tests__/ParsePromise-test.js b/src/__tests__/ParsePromise-test.js
index f5a56d3b0..8e03c4e54 100644
--- a/src/__tests__/ParsePromise-test.js
+++ b/src/__tests__/ParsePromise-test.js
@@ -14,6 +14,18 @@ var ParsePromise = require('../ParsePromise');
var asyncHelper = require('./test_helpers/asyncHelper');
describe('Promise', () => {
+ it('can disable A+ compliance', () => {
+ ParsePromise.disableAPlusCompliant();
+ expect(ParsePromise.isPromisesAPlusCompliant()).toBe(false);
+ });
+
+ it('can enable A+ compliance', () => {
+ ParsePromise.enableAPlusCompliant();
+ expect(ParsePromise.isPromisesAPlusCompliant()).toBe(true);
+ });
+});
+
+function promiseTests() {
it('can be initially resolved', () => {
var promise = ParsePromise.as('foo');
promise.then((result) => {
@@ -428,6 +440,40 @@ describe('Promise', () => {
});
});
+ it('runs catch callbacks on error', () => {
+ var promise = ParsePromise.error('foo');
+ promise.fail((error) => {
+ expect(error).toBe('foo');
+ }).then((result) => {
+ if (ParsePromise.isPromisesAPlusCompliant()) {
+ expect(result).toBe(undefined);
+ } else {
+ // This should not be reached
+ expect(true).toBe(false);
+ }
+ }, (error) => {
+ if (ParsePromise.isPromisesAPlusCompliant()) {
+ // This should not be reached
+ expect(true).toBe(false);
+ } else {
+ expect(error).toBe(undefined);
+ }
+ });
+ });
+
+ it('does not run catch callbacks on success', () => {
+ var promise = ParsePromise.as('foo');
+ promise.catch((error) => {
+ // This should not be reached
+ expect(true).toBe(false);
+ }).then((result) => {
+ expect(result).toBe('foo');
+ }, (error) => {
+ // This should not be reached
+ expect(true).toBe(false);
+ });
+ });
+
it('operates asynchonously', () => {
var triggered = false;
ParsePromise.as().then(() => {
@@ -518,4 +564,116 @@ describe('Promise', () => {
done();
});
}));
+
+ it('resolves Promise.all with the set of resolved results', asyncHelper((done) => {
+ let firstSet = [
+ new ParsePromise(),
+ new ParsePromise(),
+ new ParsePromise(),
+ new ParsePromise()
+ ];
+
+ let secondSet = [5,6,7];
+
+ ParsePromise.all([]).then((results) => {
+ expect(results).toEqual([]);
+
+ return ParsePromise.all(firstSet);
+ }).then((results) => {
+ expect(results).toEqual([1,2,3,4]);
+ return ParsePromise.all(secondSet);
+ }).then((results) => {
+ expect(results).toEqual([5,6,7]);
+ done();
+ });
+ firstSet[0].resolve(1);
+ firstSet[1].resolve(2);
+ firstSet[2].resolve(3);
+ firstSet[3].resolve(4);
+ }));
+
+ it('rejects Promise.all with the first rejected promise', asyncHelper((done) => {
+ let promises = [
+ new ParsePromise(),
+ new ParsePromise(),
+ new ParsePromise()
+ ];
+
+ ParsePromise.all(promises).then(() => {
+ // this should not be reached
+ }, (error) => {
+ expect(error).toBe('an error');
+ done();
+ });
+ promises[0].resolve(1);
+ promises[1].reject('an error');
+ promises[2].resolve(3);
+ }));
+
+ it('resolves Promise.race with the first resolved result', asyncHelper((done) => {
+ let firstSet = [
+ new ParsePromise(),
+ new ParsePromise(),
+ new ParsePromise()
+ ];
+
+ let secondSet = [4, 5, ParsePromise.error()];
+
+ ParsePromise.race(firstSet).then((result) => {
+ expect(result).toBe(2);
+
+ return ParsePromise.race(secondSet);
+ }).then((result) => {
+ expect(result).toBe(4);
+ done();
+ });
+ firstSet[1].resolve(2);
+ firstSet[0].resolve(1);
+ }));
+
+ it('rejects Promise.race with the first rejected reason', asyncHelper((done) => {
+ let promises = [
+ new ParsePromise(),
+ new ParsePromise(),
+ new ParsePromise()
+ ];
+
+ ParsePromise.race(promises).fail((error) => {
+ expect(error).toBe('error 2');
+ done();
+ });
+ promises[1].reject('error 2');
+ promises[0].resolve('error 1');
+ }));
+
+ it('can implement continuations', asyncHelper((done) => {
+ let count = 0;
+ let loop = () => {
+ count++;
+ return ParsePromise.as();
+ }
+ ParsePromise._continueWhile(
+ () => { return count < 5 },
+ loop
+ ).then(() => {
+ expect(count).toBe(5);
+ done();
+ });
+ }));
+}
+
+describe('Promise (A Compliant)', () => {
+ beforeEach(() => {
+ ParsePromise.disableAPlusCompliant();
+ });
+
+ promiseTests();
+});
+
+describe('Promise (A+ Compliant)', () => {
+ beforeEach(() => {
+ ParsePromise.enableAPlusCompliant();
+ });
+
+ promiseTests();
});
From 984a095ca1b292ba54699320ef55411529c13885 Mon Sep 17 00:00:00 2001
From: Andrew Imm
+ * let Parse = require('parse/node');
+ * let LiveQueryClient = Parse.LiveQueryClient;
+ * let client = new LiveQueryClient({
+ * applicationId: '',
+ * serverURL: '',
+ * javascriptKey: '',
+ * masterKey: ''
+ * });
+ *
+ *
+ * Open - When we establish the WebSocket connection to the LiveQuery server, you'll get this event.
+ *
+ * client.on('open', () => {
+ *
+ * });
+ *
+ * Close - When we lose the WebSocket connection to the LiveQuery server, you'll get this event.
+ *
+ * client.on('close', () => {
+ *
+ * });
+ *
+ * Error - When some network error or LiveQuery server error happens, you'll get this event.
+ *
+ * client.on('error', (error) => {
+ *
+ * });
+ *
+ *
+ */
+export default class LiveQueryClient extends events.EventEmitter {
+ attempts: number;
+ id: number;
+ requestId: number;
+ applicationId: string;
+ serverURL: string;
+ javascriptKey: ?string;
+ masterKey: ?string;
+ sessionToken: ?string;
+ connectPromise: Object;
+ subscriptions: Map;
+ socket: any;
+ state: string;
+
+ constructor({
+ applicationId,
+ serverURL,
+ javascriptKey,
+ masterKey,
+ sessionToken
+ }: LiveQueryConstructorArg) {
+ super();
+
+ if (!serverURL || serverURL.indexOf('ws') !== 0) {
+ throw new Error('You need to set a proper Parse LiveQuery server url before using LiveQueryClient');
+ }
+
+ this.attempts = 1;;
+ this.id = 0;
+ this.requestId = 1;
+ this.serverURL = serverURL;
+ this.applicationId = applicationId;
+ this.javascriptKey = javascriptKey;
+ this.masterKey = masterKey;
+ this.sessionToken = sessionToken;
+ this.connectPromise = new ParsePromise();
+ this.subscriptions = new Map();
+ this.state = CLIENT_STATE.INITIALIZED;
+ }
+
+ shouldOpen(): any {
+ return this.state === CLIENT_STATE.INITIALIZED || this.state === CLIENT_STATE.DISCONNECTED;
+ }
+
+ /**
+ * Subscribes to a ParseQuery
+ *
+ * If you provide the sessionToken, when the LiveQuery server gets ParseObject's
+ * updates from parse server, it'll try to check whether the sessionToken fulfills
+ * the ParseObject's ACL. The LiveQuery server will only send updates to clients whose
+ * sessionToken is fit for the ParseObject's ACL. You can check the LiveQuery protocol
+ * here for more details. The subscription you get is the same subscription you get
+ * from our Standard API.
+ *
+ * @method subscribe
+ * @param {Object} query - the ParseQuery you want to subscribe to
+ * @param {string} sessionToken (optional)
+ * @return {Object} subscription
+ */
+ subscribe(query: Object, sessionToken: ?string): Object {
+ if (!query) {
+ return;
+ }
+ let where = query.toJSON().where;
+ let className = query.className;
+ let subscribeRequest = {
+ op: OP_TYPES.SUBSCRIBE,
+ requestId: this.requestId,
+ query: {
+ className,
+ where
+ }
+ };
+
+ if (sessionToken) {
+ subscribeRequest.sessionToken = sessionToken;
+ }
+
+ let subscription = new LiveQuerySubscription(this.requestId, query, sessionToken);
+ this.subscriptions.set(this.requestId, subscription);
+ this.requestId += 1;
+ this.connectPromise.then(() => {
+ this.socket.send(JSON.stringify(subscribeRequest));
+ });
+
+ // adding listener so process does not crash
+ // best practice is for developer to register their own listener
+ subscription.on('error', () => {});
+
+ return subscription;
+ }
+
+ /**
+ * After calling unsubscribe you'll stop receiving events from the subscription object.
+ *
+ * @method unsubscribe
+ * @param {Object} subscription - subscription you would like to unsubscribe from.
+ */
+ unsubscribe(subscription: Object) {
+ if (!subscription) {
+ return;
+ }
+
+ this.subscriptions.delete(subscription.id);
+ let unsubscribeRequest = {
+ op: OP_TYPES.UNSUBSCRIBE,
+ requestId: subscription.id
+ }
+ this.connectPromise.then(() => {
+ this.socket.send(JSON.stringify(unsubscribeRequest));
+ });
+ }
+
+ /**
+ * After open is called, the LiveQueryClient will try to send a connect request
+ * to the LiveQuery server.
+ *
+ * @method open
+ */
+ open() {
+ let WebSocketImplementation = this._getWebSocketImplementation();
+ if (!WebSocketImplementation) {
+ this.emit(CLIENT_EMMITER_TYPES.ERROR, 'Can not find WebSocket implementation');
+ return;
+ }
+
+ if (this.state !== CLIENT_STATE.RECONNECTING) {
+ this.state = CLIENT_STATE.CONNECTING;
+ }
+
+ // Get WebSocket implementation
+ this.socket = new WebSocketImplementation(this.serverURL);
+
+ // Bind WebSocket callbacks
+ this.socket.onopen = () => {
+ this._handleWebSocketOpen();
+ };
+
+ this.socket.onmessage = (event) => {
+ this._handleWebSocketMessage(event);
+ };
+
+ this.socket.onclose = () => {
+ this._handleWebSocketClose();
+ };
+
+ this.socket.onerror = (error) => {
+ console.log("error on socket");
+ this._handleWebSocketError(error);
+ };
+ }
+
+ resubscribe() {
+ this.subscriptions.forEach((subscription, requestId) => {
+ let query = subscription.query;
+ let where = query.toJSON().where;
+ let className = query.className;
+ let sessionToken = subscription.sessionToken;
+ let subscribeRequest = {
+ op: OP_TYPES.SUBSCRIBE,
+ requestId,
+ query: {
+ className,
+ where
+ }
+ };
+
+ if (sessionToken) {
+ subscribeRequest.sessionToken = sessionToken;
+ }
+
+ this.connectPromise.then(() => {
+ this.socket.send(JSON.stringify(subscribeRequest));
+ });
+ });
+ }
+
+ /**
+ * This method will close the WebSocket connection to this LiveQueryClient,
+ * cancel the auto reconnect and unsubscribe all subscriptions based on it.
+ *
+ * @method close
+ */
+ close() {
+ if (this.state === CLIENT_STATE.INITIALIZED || this.state === CLIENT_STATE.DISCONNECTED) {
+ return;
+ }
+ this.state = CLIENT_STATE.DISCONNECTED;
+ this.socket.close();
+ // Notify each subscription about the close
+ for (let subscription of this.subscriptions.values()) {
+ subscription.emit(SUBSCRIPTION_EMMITER_TYPES.CLOSE);
+ }
+ this._handleReset();
+ this.emit(CLIENT_EMMITER_TYPES.CLOSE);
+ }
+
+ _getWebSocketImplementation(): any {
+ let WebSocketImplementation;
+ if (process.env.PARSE_BUILD === 'node') {
+ WebSocketImplementation = require('ws');
+ } else if (process.env.PARSE_BUILD === 'browser') {
+ if (window.WebSocket) {
+ WebSocketImplementation = WebSocket;
+ }
+ }
+ return WebSocketImplementation;
+ }
+
+ // ensure we start with valid state if connect is called again after close
+ _handleReset() {
+ this.attempts = 1;;
+ this.id = 0;
+ this.requestId = 1;
+ this.connectPromise = new ParsePromise();
+ this.subscriptions = new Map();
+ }
+
+ _handleWebSocketOpen() {
+ this.attempts = 1;
+ let connectRequest = {
+ op: OP_TYPES.CONNECT,
+ applicationId: this.applicationId,
+ javascriptKey: this.javascriptKey,
+ masterKey: this.masterKey,
+ sessionToken: this.sessionToken
+ };
+ this.socket.send(JSON.stringify(connectRequest));
+ }
+
+ _handleWebSocketMessage(event: any) {
+ let data = event.data;
+ if (typeof data === 'string') {
+ data = JSON.parse(data);
+ }
+ let subscription = null;
+ if (data.requestId) {
+ subscription =
+ this.subscriptions.get(data.requestId);
+ }
+ switch(data.op) {
+ case OP_EVENTS.CONNECTED:
+ if (this.state === CLIENT_STATE.RECONNECTING) {
+ this.resubscribe();
+ }
+ this.emit(CLIENT_EMMITER_TYPES.OPEN);
+ this.id = data.clientId;
+ this.connectPromise.resolve();
+ this.state = CLIENT_STATE.CONNECTED;
+ break;
+ case OP_EVENTS.SUBSCRIBED:
+ if (subscription) {
+ subscription.emit(SUBSCRIPTION_EMMITER_TYPES.OPEN);
+ }
+ break;
+ case OP_EVENTS.ERROR:
+ if (data.requestId) {
+ if (subscription) {
+ subscription.emit(SUBSCRIPTION_EMMITER_TYPES.ERROR, data.error);
+ }
+ } else {
+ this.emit(CLIENT_EMMITER_TYPES.ERROR, data.error);
+ }
+ break;
+ case OP_EVENTS.UNSUBSCRIBED:
+ // We have already deleted subscription in unsubscribe(), do nothing here
+ break;
+ default:
+ // create, update, enter, leave, delete cases
+ let className = data.object.className;
+ // Delete the extrea __type and className fields during transfer to full JSON
+ delete data.object.__type;
+ delete data.object.className;
+ let parseObject = new ParseObject(className);
+ parseObject._finishFetch(data.object);
+ if (!subscription) {
+ break;
+ }
+ subscription.emit(data.op, parseObject);
+ }
+ }
+
+ _handleWebSocketClose() {
+ if (this.state === CLIENT_STATE.DISCONNECTED) {
+ return;
+ }
+ this.state = CLIENT_STATE.CLOSED;
+ this.emit(CLIENT_EMMITER_TYPES.CLOSE);
+ // Notify each subscription about the close
+ for (let subscription of this.subscriptions.values()) {
+ subscription.emit(SUBSCRIPTION_EMMITER_TYPES.CLOSE);
+ }
+ this._handleReconnect();
+ }
+
+ _handleWebSocketError(error: any) {
+ this.emit(CLIENT_EMMITER_TYPES.ERROR, error);
+ for (let subscription of this.subscriptions.values()) {
+ subscription.emit(SUBSCRIPTION_EMMITER_TYPES.ERROR);
+ }
+ this._handleReconnect();
+ }
+
+ _handleReconnect() {
+ // if closed or currently reconnecting we stop attempting to reconnect
+ if (this.state === CLIENT_STATE.DISCONNECTED || this.state === CLIENT_STATE.RECONNECTING) {
+ return;
+ }
+ this.state = CLIENT_STATE.RECONNECTING;
+ let time = generateInterval(this.attempts);
+ console.info('attempting to reconnect after ' + time + 'ms');
+ setTimeout((() => {
+ this.attempts++;
+ this.connectPromise = new ParsePromise();
+ this.open();
+ }).bind(this), time);
+ }
+}
diff --git a/src/LiveQuerySubscription.js b/src/LiveQuerySubscription.js
new file mode 100644
index 000000000..7c326a43a
--- /dev/null
+++ b/src/LiveQuerySubscription.js
@@ -0,0 +1,109 @@
+/**
+ * Copyright (c) 2015-present, Parse, LLC.
+ * All rights reserved.
+ *
+ * This source code is licensed under the BSD-style license found in the
+ * LICENSE file in the root directory of this source tree. An additional grant
+ * of patent rights can be found in the PATENTS file in the same directory.
+ *
+ */
+
+import events from 'events';
+import CoreManager from './CoreManager';
+
+/**
+ * Creates a new LiveQuery Subscription.
+ * Extends events.EventEmitter
+ * cloud functions.
+ *
+ * @constructor
+ * @param {string} id - subscription id
+ * @param {string} query - query to subscribe to
+ * @param {string} sessionToken - optional session token
+ *
+ *
+ * subscription.on('open', () => {
+ *
+ * });
Create Event - When a new ParseObject is created and it fulfills the ParseQuery you subscribe, + * you'll get this event. The object is the ParseObject which is created. + * + *
+ * subscription.on('create', (object) => { + * + * });+ * + *
Update Event - When an existing ParseObject which fulfills the ParseQuery you subscribe + * is updated (The ParseObject fulfills the ParseQuery before and after changes), + * you'll get this event. The object is the ParseObject which is updated. + * Its content is the latest value of the ParseObject. + * + *
+ * subscription.on('update', (object) => { + * + * });+ * + *
Enter Event - When an existing ParseObject's old value doesn't fulfill the ParseQuery + * but its new value fulfills the ParseQuery, you'll get this event. The object is the + * ParseObject which enters the ParseQuery. Its content is the latest value of the ParseObject. + * + *
+ * subscription.on('enter', (object) => { + * + * });+ * + * + *
Update Event - When an existing ParseObject's old value fulfills the ParseQuery but its new value + * doesn't fulfill the ParseQuery, you'll get this event. The object is the ParseObject + * which leaves the ParseQuery. Its content is the latest value of the ParseObject. + * + *
+ * subscription.on('leave', (object) => { + * + * });+ * + * + *
Delete Event - When an existing ParseObject which fulfills the ParseQuery is deleted, you'll + * get this event. The object is the ParseObject which is deleted. + * + *
+ * subscription.on('delete', (object) => { + * + * });+ * + * + *
Close Event - When the client loses the WebSocket connection to the LiveQuery + * server and we stop receiving events, you'll get this event. + * + *
+ * subscription.on('close', () => { + * + * });+ * + * + */ +export default class Subscription extends events.EventEmitter { + constructor(id, query, sessionToken) { + super(); + this.id = id; + this.query = query; + this.sessionToken = sessionToken; + } + + /** + * @method unsubscribe + */ + unsubscribe() { + var liveQueryClient = CoreManager.getLiveQueryController().getDefaultLiveQueryClient(); + liveQueryClient.unsubscribe(this); + this.emit('close'); + } +} diff --git a/src/Parse.js b/src/Parse.js index 341f5a46e..eaef49f8f 100644 --- a/src/Parse.js +++ b/src/Parse.js @@ -80,6 +80,14 @@ Object.defineProperty(Parse, 'serverURL', { CoreManager.set('SERVER_URL', value); } }); +Object.defineProperty(Parse, 'liveQueryServerURL', { + get() { + return CoreManager.get('LIVEQUERY_SERVER_URL'); + }, + set(value) { + CoreManager.set('LIVEQUERY_SERVER_URL', value); + } +}); /** End setters **/ Parse.ACL = require('./ParseACL'); @@ -110,6 +118,8 @@ Parse.Role = require('./ParseRole'); Parse.Session = require('./ParseSession'); Parse.Storage = require('./Storage'); Parse.User = require('./ParseUser'); +Parse.LiveQuery = require('./ParseLiveQuery'); +Parse.LiveQueryClient = require('./LiveQueryClient'); Parse._request = function(...args) { return CoreManager.getRESTController().request.apply(null, args); diff --git a/src/ParseLiveQuery.js b/src/ParseLiveQuery.js new file mode 100644 index 000000000..5aec9643e --- /dev/null +++ b/src/ParseLiveQuery.js @@ -0,0 +1,152 @@ +import events from 'events'; +import url from 'url'; +import LiveQueryClient from './LiveQueryClient'; +import CoreManager from './CoreManager'; + +function open() { + var LiveQueryController = CoreManager.getLiveQueryController(); + LiveQueryController.open(); +} + +function close() { + var LiveQueryController = CoreManager.getLiveQueryController(); + LiveQueryController.close(); +} + +/** + * + * We expose three events to help you monitor the status of the WebSocket connection: + * + *
Open - When we establish the WebSocket connection to the LiveQuery server, you'll get this event. + * + *
+ * Parse.LiveQuery.on('open', () => { + * + * });+ * + *
Close - When we lose the WebSocket connection to the LiveQuery server, you'll get this event. + * + *
+ * Parse.LiveQuery.on('close', () => { + * + * });+ * + *
Error - When some network error or LiveQuery server error happens, you'll get this event. + * + *
+ * Parse.LiveQuery.on('error', (error) => { + * + * });+ * + * @class Parse.LiveQuery + * @static + * + */ +let LiveQuery = new events.EventEmitter(); + +/** + * After open is called, the LiveQuery will try to send a connect request + * to the LiveQuery server. + * + * @method open + */ +LiveQuery.open = open; + +/** + * When you're done using LiveQuery, you can call Parse.LiveQuery.close(). + * This function will close the WebSocket connection to the LiveQuery server, + * cancel the auto reconnect, and unsubscribe all subscriptions based on it. + * If you call query.subscribe() after this, we'll create a new WebSocket + * connection to the LiveQuery server. + * + * @method close + */ + +LiveQuery.close = close; +// Register a default onError callback to make sure we do not crash on error +LiveQuery.on('error', () => { +}); + +export default LiveQuery; + +let getSessionToken = () => { + let currentUser = CoreManager.getUserController().currentUser(); + let sessionToken; + if (currentUser) { + sessionToken = currentUser.getSessionToken(); + } + return sessionToken; +}; + +let getLiveQueryClient = () => { + return CoreManager.getLiveQueryController().getDefaultLiveQueryClient(); +}; + +let defaultLiveQueryClient; +let DefaultLiveQueryController = { + setDefaultLiveQueryClient(liveQueryClient: any) { + defaultLiveQueryClient = liveQueryClient; + }, + getDefaultLiveQueryClient(): any { + if (defaultLiveQueryClient) { + return defaultLiveQueryClient; + } + + let liveQueryServerURL = CoreManager.get('LIVEQUERY_SERVER_URL'); + + if (liveQueryServerURL && liveQueryServerURL.indexOf('ws') !== 0) { + throw new Error('You need to set a proper Parse LiveQuery server url before using LiveQueryClient'); + } + + // If we can not find Parse.liveQueryServerURL, we try to extract it from Parse.serverURL + if (!liveQueryServerURL) { + let host = url.parse(CoreManager.get('SERVER_URL')).host; + liveQueryServerURL = 'ws://' + host; + CoreManager.set('LIVEQUERY_SERVER_URL', liveQueryServerURL); + } + + let applicationId = CoreManager.get('APPLICATION_ID'); + let javascriptKey = CoreManager.get('JAVASCRIPT_KEY'); + let masterKey = CoreManager.get('MASTER_KEY'); + // Get currentUser sessionToken if possible + defaultLiveQueryClient = new LiveQueryClient({ + applicationId, + serverURL: liveQueryServerURL, + javascriptKey, + masterKey, + sessionToken: getSessionToken(), + }); + // Register a default onError callback to make sure we do not crash on error + defaultLiveQueryClient.on('error', (error) => { + LiveQuery.emit('error', error); + }); + defaultLiveQueryClient.on('open', () => { + LiveQuery.emit('open'); + }); + defaultLiveQueryClient.on('close', () => { + LiveQuery.emit('close'); + }); + return defaultLiveQueryClient; + }, + open() { + let liveQueryClient = getLiveQueryClient(); + liveQueryClient.open(); + }, + close() { + let liveQueryClient = getLiveQueryClient(); + liveQueryClient.close(); + }, + subscribe(query: any): any { + let liveQueryClient = getLiveQueryClient(); + if (liveQueryClient.shouldOpen()) { + liveQueryClient.open(); + } + return liveQueryClient.subscribe(query, getSessionToken()); + }, + unsubscribe(subscription: any) { + let liveQueryClient = getLiveQueryClient(); + return liveQueryClient.unsubscribe(subscription); + } +}; + +CoreManager.setLiveQueryController(DefaultLiveQueryController); diff --git a/src/ParseQuery.js b/src/ParseQuery.js index 63296f239..e84b77757 100644 --- a/src/ParseQuery.js +++ b/src/ParseQuery.js @@ -987,6 +987,17 @@ export default class ParseQuery { return this; } + /** + * Subscribe this query to get liveQuery updates + * @method subscribe + * @return {LiveQuerySubscription} Returns the liveQuerySubscription, it's an event emitter + * which can be used to get liveQuery updates. + */ + subscribe(): any { + let controller = CoreManager.getLiveQueryController(); + return controller.subscribe(this); + } + /** * Constructs a Parse.Query that is the OR of the passed in queries. For * example: diff --git a/src/__tests__/LiveQueryClient-test.js b/src/__tests__/LiveQueryClient-test.js new file mode 100644 index 000000000..fcf57f730 --- /dev/null +++ b/src/__tests__/LiveQueryClient-test.js @@ -0,0 +1,416 @@ +/** + * Copyright (c) 2015-present, Parse, LLC. + * All rights reserved. + * + * This source code is licensed under the BSD-style license found in the + * LICENSE file in the root directory of this source tree. An additional grant + * of patent rights can be found in the PATENTS file in the same directory. + */ + +jest.dontMock('../LiveQueryClient'); +jest.dontMock('../arrayContainsObject'); +jest.dontMock('../canBeSerialized'); +jest.dontMock('../CoreManager'); +jest.dontMock('../decode'); +jest.dontMock('../encode'); +jest.dontMock('../equals'); +jest.dontMock('../escape'); +jest.dontMock('../ObjectStateMutations'); +jest.dontMock('../parseDate'); +jest.dontMock('../ParseError'); +jest.dontMock('../ParseFile'); +jest.dontMock('../ParseGeoPoint'); +jest.dontMock('../ParseObject'); +jest.dontMock('../ParseOp'); +jest.dontMock('../ParsePromise'); +jest.dontMock('../RESTController'); +jest.dontMock('../SingleInstanceStateController'); +jest.dontMock('../TaskQueue'); +jest.dontMock('../unique'); +jest.dontMock('../UniqueInstanceStateController'); +jest.dontMock('../unsavedChildren'); +jest.dontMock('../ParseACL'); +jest.dontMock('../ParseQuery'); +jest.dontMock('../LiveQuerySubscription'); + +var LiveQueryClient = require('../LiveQueryClient'); +var ParseObject = require('../ParseObject'); +var ParseQuery = require('../ParseQuery'); +var events = require('events'); + +describe('LiveQueryClient', () => { + it('can connect to server', () => { + var liveQueryClient = new LiveQueryClient({ + applicationId: 'applicationId', + serverURL: 'ws://test', + javascriptKey: 'javascriptKey', + masterKey: 'masterKey', + sessionToken: 'sessionToken' + }); + // Mock _getWebSocketImplementation + liveQueryClient._getWebSocketImplementation = function() { + return jest.genMockFunction(); + } + // Mock handlers + liveQueryClient._handleWebSocketOpen = jest.genMockFunction(); + liveQueryClient._handleWebSocketMessage = jest.genMockFunction(); + liveQueryClient._handleWebSocketClose = jest.genMockFunction(); + liveQueryClient._handleWebSocketError = jest.genMockFunction(); + + liveQueryClient.open(); + + // Verify inner state + expect(liveQueryClient.state).toEqual('connecting'); + // Verify handlers + liveQueryClient.socket.onopen({}); + expect(liveQueryClient._handleWebSocketOpen).toBeCalled(); + liveQueryClient.socket.onmessage({}); + expect(liveQueryClient._handleWebSocketMessage).toBeCalled(); + liveQueryClient.socket.onclose(); + expect(liveQueryClient._handleWebSocketClose).toBeCalled(); + liveQueryClient.socket.onerror(); + expect(liveQueryClient._handleWebSocketError).toBeCalled(); + }); + + it('can handle WebSocket open message', () => { + var liveQueryClient = new LiveQueryClient({ + applicationId: 'applicationId', + serverURL: 'ws://test', + javascriptKey: 'javascriptKey', + masterKey: 'masterKey', + sessionToken: 'sessionToken' + }); + liveQueryClient.socket = { + send: jest.genMockFunction() + }; + + liveQueryClient._handleWebSocketOpen(); + + expect(liveQueryClient.socket.send).toBeCalled(); + var messageStr = liveQueryClient.socket.send.mock.calls[0][0]; + var message = JSON.parse(messageStr); + expect(message.op).toEqual('connect'); + expect(message.applicationId).toEqual('applicationId'); + expect(message.javascriptKey).toEqual('javascriptKey'); + expect(message.masterKey).toEqual('masterKey'); + expect(message.sessionToken).toEqual('sessionToken'); + }); + + it('can handle WebSocket connected response message', () => { + var liveQueryClient = new LiveQueryClient({ + applicationId: 'applicationId', + serverURL: 'ws://test', + javascriptKey: 'javascriptKey', + masterKey: 'masterKey', + sessionToken: 'sessionToken' + }); + var data = { + op: 'connected', + clientId: 1 + }; + var event = { + data: JSON.stringify(data) + } + // Register checked in advance + var isChecked = false; + liveQueryClient.on('open', function(dataAgain) { + isChecked = true; + }); + + liveQueryClient._handleWebSocketMessage(event); + + expect(isChecked).toBe(true); + expect(liveQueryClient.id).toBe(1); + expect(liveQueryClient.connectPromise._resolved).toBe(true); + expect(liveQueryClient.state).toEqual('connected'); + }); + + it('can handle WebSocket subscribed response message', () => { + var liveQueryClient = new LiveQueryClient({ + applicationId: 'applicationId', + serverURL: 'ws://test', + javascriptKey: 'javascriptKey', + masterKey: 'masterKey', + sessionToken: 'sessionToken' + }); + // Add mock subscription + var subscription = new events.EventEmitter(); + liveQueryClient.subscriptions.set(1, subscription); + var data = { + op: 'subscribed', + clientId: 1, + requestId: 1 + }; + var event = { + data: JSON.stringify(data) + } + // Register checked in advance + var isChecked = false; + subscription.on('open', function(dataAgain) { + isChecked = true; + }); + + liveQueryClient._handleWebSocketMessage(event); + + expect(isChecked).toBe(true); + }); + + it('can handle WebSocket error response message', () => { + var liveQueryClient = new LiveQueryClient({ + applicationId: 'applicationId', + serverURL: 'ws://test', + javascriptKey: 'javascriptKey', + masterKey: 'masterKey', + sessionToken: 'sessionToken' + }); + var data = { + op: 'error', + clientId: 1, + error: 'error' + }; + var event = { + data: JSON.stringify(data) + } + // Register checked in advance + var isChecked = false; + liveQueryClient.on('error', function(error) { + isChecked = true; + expect(error).toEqual('error'); + }); + + liveQueryClient._handleWebSocketMessage(event); + + expect(isChecked).toBe(true); + }); + + it('can handle WebSocket event response message', () => { + var liveQueryClient = new LiveQueryClient({ + applicationId: 'applicationId', + serverURL: 'ws://test', + javascriptKey: 'javascriptKey', + masterKey: 'masterKey', + sessionToken: 'sessionToken' + }); + // Add mock subscription + var subscription = new events.EventEmitter(); + liveQueryClient.subscriptions.set(1, subscription); + let object = new ParseObject('Test'); + object.set('key', 'value'); + var data = { + op: 'create', + clientId: 1, + requestId: 1, + object: object._toFullJSON() + }; + var event = { + data: JSON.stringify(data) + } + // Register checked in advance + var isChecked = false; + subscription.on('create', function(parseObject) { + isChecked = true; + expect(parseObject.get('key')).toEqual('value'); + expect(parseObject.get('className')).toBeUndefined(); + expect(parseObject.get('__type')).toBeUndefined(); + }); + + liveQueryClient._handleWebSocketMessage(event); + + expect(isChecked).toBe(true); + }); + + it('can handle WebSocket close message', () => { + var liveQueryClient = new LiveQueryClient({ + applicationId: 'applicationId', + serverURL: 'ws://test', + javascriptKey: 'javascriptKey', + masterKey: 'masterKey', + sessionToken: 'sessionToken' + }); + // Add mock subscription + var subscription = new events.EventEmitter(); + liveQueryClient.subscriptions.set(1, subscription); + // Register checked in advance + var isChecked = false; + subscription.on('close', function() { + isChecked = true; + }); + var isCheckedAgain = false; + liveQueryClient.on('close', function() { + isCheckedAgain = true; + }); + + liveQueryClient._handleWebSocketClose(); + + expect(isChecked).toBe(true); + expect(isCheckedAgain).toBe(true); + }); + + it('can handle reconnect', () => { + var liveQueryClient = new LiveQueryClient({ + applicationId: 'applicationId', + serverURL: 'ws://test', + javascriptKey: 'javascriptKey', + masterKey: 'masterKey', + sessionToken: 'sessionToken' + }); + + liveQueryClient.open = jest.genMockFunction(); + + let attempts = liveQueryClient.attempts; + liveQueryClient._handleReconnect(); + expect(liveQueryClient.state).toEqual('reconnecting'); + + jest.runOnlyPendingTimers(); + + expect(liveQueryClient.attempts).toEqual(attempts + 1); + expect(liveQueryClient.open).toBeCalled(); + }); + + it('can handle WebSocket error message', () => { + var liveQueryClient = new LiveQueryClient({ + applicationId: 'applicationId', + serverURL: 'ws://test', + javascriptKey: 'javascriptKey', + masterKey: 'masterKey', + sessionToken: 'sessionToken' + }); + var error = {}; + var isChecked = false; + liveQueryClient.on('error', function(errorAgain) { + isChecked = true; + expect(errorAgain).toEqual(error); + }); + + liveQueryClient._handleWebSocketError(error); + + expect(isChecked).toBe(true); + }); + + it('can subscribe', () => { + var liveQueryClient = new LiveQueryClient({ + applicationId: 'applicationId', + serverURL: 'ws://test', + javascriptKey: 'javascriptKey', + masterKey: 'masterKey', + sessionToken: 'sessionToken' + }); + liveQueryClient.socket = { + send: jest.genMockFunction() + }; + var query = new ParseQuery('Test'); + query.equalTo('key', 'value'); + + var subscription = liveQueryClient.subscribe(query); + liveQueryClient.connectPromise.resolve(); + + expect(subscription).toBe(liveQueryClient.subscriptions.get(1)); + expect(liveQueryClient.requestId).toBe(2); + var messageStr = liveQueryClient.socket.send.mock.calls[0][0]; + var message = JSON.parse(messageStr); + expect(message).toEqual({ + op: 'subscribe', + requestId: 1, + query: { + className: 'Test', + where: { + key: 'value' + } + } + }); + }); + + it('can unsubscribe', () => { + var liveQueryClient = new LiveQueryClient({ + applicationId: 'applicationId', + serverURL: 'ws://test', + javascriptKey: 'javascriptKey', + masterKey: 'masterKey', + sessionToken: 'sessionToken' + }); + liveQueryClient.socket = { + send: jest.genMockFunction() + }; + var subscription = { + id: 1 + } + liveQueryClient.subscriptions.set(1, subscription); + + liveQueryClient.unsubscribe(subscription); + liveQueryClient.connectPromise.resolve(); + + expect(liveQueryClient.subscriptions.size).toBe(0); + var messageStr = liveQueryClient.socket.send.mock.calls[0][0]; + var message = JSON.parse(messageStr); + expect(message).toEqual({ + op: 'unsubscribe', + requestId: 1 + }); + }); + + it('can resubscribe', () => { + var liveQueryClient = new LiveQueryClient({ + applicationId: 'applicationId', + serverURL: 'ws://test', + javascriptKey: 'javascriptKey', + masterKey: 'masterKey', + sessionToken: 'sessionToken' + }); + liveQueryClient.socket = { + send: jest.genMockFunction() + }; + var query = new ParseQuery('Test'); + query.equalTo('key', 'value'); + var subscription = liveQueryClient.subscribe(query); + liveQueryClient.connectPromise.resolve(); + + liveQueryClient.resubscribe(); + + expect(liveQueryClient.requestId).toBe(2); + var messageStr = liveQueryClient.socket.send.mock.calls[0][0]; + var message = JSON.parse(messageStr); + expect(message).toEqual({ + op: 'subscribe', + requestId: 1, + query: { + className: 'Test', + where: { + key: 'value' + } + } + }); + }); + + it('can close', () => { + var liveQueryClient = new LiveQueryClient({ + applicationId: 'applicationId', + serverURL: 'ws://test', + javascriptKey: 'javascriptKey', + masterKey: 'masterKey', + sessionToken: 'sessionToken' + }); + liveQueryClient.state = 'connected'; + liveQueryClient.socket = { + close: jest.genMockFunction() + } + var subscription = new events.EventEmitter(); + liveQueryClient.subscriptions.set(1, subscription); + // Register checked in advance + var isChecked = false; + subscription.on('close', function() { + isChecked = true; + }); + var isCheckedAgain = false; + liveQueryClient.on('close', function() { + isCheckedAgain = true; + }); + + liveQueryClient.close(); + + expect(liveQueryClient.subscriptions.size).toBe(0); + expect(isChecked).toBe(true); + expect(isCheckedAgain).toBe(true); + expect(liveQueryClient.socket.close).toBeCalled(); + expect(liveQueryClient.state).toBe('disconnected'); + }); +}); From b918929fc43979ae4e832aac130d670d7caa1493 Mon Sep 17 00:00:00 2001 From: Andrew Imm