diff --git a/client/src/events.js b/client/src/events.js index 16d2f9431..adceec8b7 100644 --- a/client/src/events.js +++ b/client/src/events.js @@ -16,12 +16,12 @@ function DisposableEvent(setupFunc) { function wrappedListener(eventhandler) { let remover = listener(eventhandler) - let wrappedRemover = () => { + registry.push(remover) + + return () => { removeFromArray(registry, remover) remover() } - registry.push(remover) - return wrappedRemover } wrappedListener.dispose = emptyAndCallAll(registry) @@ -43,32 +43,29 @@ function DisposableEvent(setupFunc) { // dispose: () => {/* disposes ham and eggs then does console log*/}, // } function MultiEvent(initializer) { - let registry = [] let multiEvent = {} - for (let propName in initializer) { - if (propName === 'dispose') { - continue - } - let event = DisposableEvent(initializer[propName]) - multiEvent[propName] = event - registry.push(event.dispose) - } + let registry = [] + let cleanupEvents = emptyAndCallAll(registry) - let disposeAll + // If the user specified a disposal function, we pass them the event // cleaner and return whatever they want to return if (initializer.dispose !== undefined) { - disposeAll = () => initializer.dispose(cleanupEvents) + multiEvent.dispose = () => initializer.dispose(cleanupEvents) } else { - disposeAll = cleanupEvents + multiEvent.dispose = cleanupEvents } - multiEvent.dispose = disposeAll - Object.keys(multiEvent).forEach(key => { - // Cleaning up any event will clean up all events - if (key !== 'dispose') { - multiEvent[key].dispose = disposeAll + + for (let key in initializer) { + if (key === 'dispose') { + continue } - }) + multiEvent[key] = DisposableEvent(initializer[key]) + registry.push(multiEvent[key].dispose) + // Cleaning up any event will clean up all events + multiEvent[key].dispose = multiEvent.dispose + } + return multiEvent } @@ -98,10 +95,6 @@ function removeFromArray(registry, callback) { function emptyAndCallAll(registry) { return () => { - let func = registry.pop() - while (func !== undefined) { - func() - func = registry.pop() - } + registry.splice(0).forEach((func) => func()) } } diff --git a/client/src/index.js b/client/src/index.js index 3f75d4f8e..112e5e265 100644 --- a/client/src/index.js +++ b/client/src/index.js @@ -5,8 +5,9 @@ require('babel-polyfill') const { setImmediate } = require('./utility.js') const { MultiEvent, promiseOnEvents } = require('./events.js') const { Collection, TermBase } = require('./ast.js') +const Subscription = require('./subscription.js') -const { WebSocket, Rx } = require('./shim.js') +const { WebSocket } = require('./shim.js') const { serialize, deserialize } = require('./serialization.js') module.exports = Fusion @@ -260,113 +261,3 @@ function FusionSocket(host, secure, path) { return connectedPromise.then(() => ws.send(protoMessage)) } } - - -// This is the object returned for changefeed queries -function Subscription({ onResponse, - onError, - endSubscription, - onConnected, - onDisconnected, - userOptions: userOptions = {} } = {}) { - let sub = {} - let broadcastAdded, - broadcastRemoved, - broadcastChanged, - broadcastSynced, - broadcastCompleted - sub.onConnected = onConnected - sub.onDisconnected = onDisconnected - sub.onError = onError - - Object.assign(sub, MultiEvent({ - onAdded(broadcast) { broadcastAdded = broadcast }, - onRemoved(broadcast) { broadcastRemoved = broadcast }, - onChanged(broadcast) { broadcastChanged = broadcast }, - onSynced(broadcast) { broadcastSynced = broadcast }, - onCompleted(broadcast) { broadcastCompleted = broadcast }, - dispose(cleanupSubscriptionEvents) { - return endSubscription().then(() => { - setImmediate(() => { - cleanupSubscriptionEvents() - onResponse.dispose() - onError.dispose() - }) - }) - }, - })) - - Object.keys(userOptions).forEach(key => { - switch (key) { - case 'onAdded': - case 'onRemoved': - case 'onChanged': - case 'onSynced': - case 'onError': - case 'onConnected': - case 'onDisconnected': - case 'onCompleted': - sub[key](userOptions[key]) - } - }) - - let isAdded = c => c.new_val != null && c.old_val == null - let isRemoved = c => c.new_val == null && c.old_val != null - let isChanged = c => c.new_val != null && c.old_val != null - - onResponse(response => { - // Response won't be an error since that's handled by the Fusion - // object - if (response.data !== undefined) { - response.data.forEach(change => { - if (isChanged(change)) { - if (sub.onChanged.listenerCount() == 0) { - broadcastRemoved(change.old_val) - broadcastAdded(change.new_val) - } else { - broadcastChanged(change) - } - } else if (isAdded(change)) { - broadcastAdded(change.new_val) - } else if (isRemoved(change)) { - broadcastRemoved(change.old_val) - } else { - console.error('Unknown object received on subscription: ', change) - } - }) - } - if (response.state === 'synced') { - broadcastSynced('synced') - } - if (response.state === 'complete') { - broadcastCompleted('complete') - } - }) - - // If the Rx module is available, create observables - if (Rx) { - Object.assign(sub, { - observeChanged: observe(sub.onChanged, onError, sub.onCompleted), - observeAdded: observe(sub.onAdded, onError, sub.onCompleted), - observeRemoved: observe(sub.onRemoved, onError, sub.onCompleted), - observeConnected: observe(sub.onConnected, onError, sub.onCompleted), - observeDisconnected: observe(sub.onDisconnected, onError, sub.onCompleted), - observeSynced: observe(sub.onSynced, onError, sub.onCompleted), - }) - } - - return sub - - function observe(next, error, completed, dispose = sub.dispose) { - return (maybeDispose = dispose) => Rx.Observable.create(observer => { - let disposeEvent = next(val => observer.onNext(val)) - let disposeError = error(err => observer.onError(err)) - let disposeCompleted = completed(() => observer.onCompleted()) - return () => maybeDispose(function cleanup() { - disposeEvent() - disposeError() - disposeCompleted() - }) - }) - } -} diff --git a/client/src/subscription.js b/client/src/subscription.js new file mode 100644 index 000000000..07e791995 --- /dev/null +++ b/client/src/subscription.js @@ -0,0 +1,165 @@ +'use strict' + +require('babel-polyfill') + +const { setImmediate } = require('./utility.js') +const { MultiEvent } = require('./events.js') +const { Rx } = require('./shim.js') + +module.exports = Subscription + +// This is the object returned for changefeed queries +function Subscription({ onResponse, + onError, + endSubscription, + onConnected, + onDisconnected, + userOptions: userOptions = {} } = {}) { + let sub = {} + let broadcastAdded, + broadcastRemoved, + broadcastChanged, + broadcastValue, + broadcastSynced, + broadcastCompleted + sub.onConnected = onConnected + sub.onDisconnected = onDisconnected + sub.onError = onError + + Object.assign(sub, MultiEvent({ + onAdded(broadcast) { broadcastAdded = broadcast }, + onRemoved(broadcast) { broadcastRemoved = broadcast }, + onChanged(broadcast) { broadcastChanged = broadcast }, + onValue(broadcast) { broadcastValue = broadcast }, + onSynced(broadcast) { broadcastSynced = broadcast }, + onCompleted(broadcast) { broadcastCompleted = broadcast }, + dispose(cleanupSubscriptionEvents) { + return endSubscription().then(() => { + setImmediate(() => { + cleanupSubscriptionEvents() + onResponse.dispose() + onError.dispose() + }) + }) + } + })) + + StateManager(sub, broadcastValue) + + Object.keys(userOptions).forEach(key => { + switch (key) { + case 'onAdded': + case 'onRemoved': + case 'onChanged': + case 'onValue': + case 'onSynced': + case 'onError': + case 'onConnected': + case 'onDisconnected': + case 'onCompleted': + sub[key](userOptions[key]) + } + }) + + let isAdded = c => c.new_val != null && c.old_val == null + let isRemoved = c => c.new_val == null && c.old_val != null + let isChanged = c => c.new_val != null && c.old_val != null + + onResponse(response => { + // Response won't be an error since that's handled by the Fusion + // object + if (response.data !== undefined) { + response.data.forEach(change => { + if (isChanged(change)) { + if (sub.onChanged.listenerCount() == 0) { + broadcastRemoved(change.old_val) + broadcastAdded(change.new_val) + } else { + broadcastChanged(change) + } + } else if (isAdded(change)) { + broadcastAdded(change.new_val) + } else if (isRemoved(change)) { + broadcastRemoved(change.old_val) + } else { + console.error('Unknown object received on subscription: ', change) + } + }) + } + if (response.state === 'synced') { + broadcastSynced('synced') + } + if (response.state === 'complete') { + broadcastCompleted('complete') + } + }) + + // If the Rx module is available, create observables + if (Rx) { + Object.assign(sub, { + observeChanged: observe(sub.onChanged, onError, sub.onCompleted), + observeAdded: observe(sub.onAdded, onError, sub.onCompleted), + observeRemoved: observe(sub.onRemoved, onError, sub.onCompleted), + observeValue: observe(sub.onValue, onError, sub.onCompleted), + observeConnected: observe(sub.onConnected, onError, sub.onCompleted), + observeDisconnected: observe(sub.onDisconnected, onError, sub.onCompleted), + observeSynced: observe(sub.onSynced, onError, sub.onCompleted), + }) + } + + return sub + + function observe(next, error, completed, dispose = sub.dispose) { + return (maybeDispose = dispose) => Rx.Observable.create(observer => { + let disposeEvent = next(val => observer.onNext(val)) + let disposeError = error(err => observer.onError(err)) + let disposeCompleted = completed(() => observer.onCompleted()) + return () => maybeDispose(function cleanup() { + disposeEvent() + disposeError() + disposeCompleted() + }) + }) + } +} + +function StateManager (subscription, emitter) { + let data = [] + + function changeHandler (row) { + let i = findIndex(data, (val) => val.id === row.id) + if (i === -1) { + data.push(row) + } + else { + data[i] = row; + } + + emitter(data.slice(0), i, row) + } + + function removeHandler (row) { + let i = findIndex(data, (val) => val.id === row.id) + if (i !== -1) { + data.splice(i, 1) + } + + emitter(data.slice(0), i, row) + } + + subscription.onAdded(changeHandler) + subscription.onChanged((change) => changeHandler(change.new_val)) + subscription.onRemoved(removeHandler) +} + +function findIndex(arr, predicate) { + let len = arr.length; + + for (let i = 0; i < len; i++) { + let value = arr[i]; + if (predicate(value, i, arr)) { + return i; + } + } + return -1; +}