Skip to content

Commit

Permalink
[client-app] (deltas P6) Split local and cloud deltas
Browse files Browse the repository at this point in the history
Summary:
This commit splits apart the `AccountDeltaConnection` class, which was
in charge of listening to both cloud /and/ local deltas by way of an
artificial interface, `DeltaStreamingInMemoryConnection`.

Splitting this into 2 modules with separate responsibilities will hopefully
make this code easier to reason about and reduce some cruft and unnecessary
indirection.

Specifically, this commit makes it so:

- `DeltaConnectionManager` is only in charge of starting and ending `DeltaStreamingConnection`s, which are solely in charge of listening to deltas from the cloud api
- `LocalSyncDeltaEmitter` no longer unnecessarily emits events for the `deltas` package to listen to but rather directly processes and saves those deltas from the K2 db to edgehill.db
- `LocalSyncDeltaEmitter` is also in charge of keeping track of the latest received cursor, under its own JSONBlob key in edgehill.db. This migrates localSync cursors saved under the old key.
- `LocalSyncDeltaEmitter` is now instantiated and managed from within the `SyncProcessManager` as opposed to the `SyncWorker`. Apart from removing extra state from the `SyncWorker`, this removes dependencies on the client-app environment from the sync-worker.
- `DeltaStreamingInMemoryConnection` and `AccountDeltaConnection` are now gone

(Sorry for the big diff! This one was a little hard to split up without landing something broken)

Depends on D4121

Test Plan: manual + unit tests planned in upcoming diff

Reviewers: halla, mark, evan, spang

Reviewed By: evan

Differential Revision: https://phab.nylas.com/D4122
  • Loading branch information
jstejada committed Mar 8, 2017
1 parent 049a3c2 commit d60a23c
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 281 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import _ from 'underscore';
import {AccountStore} from 'nylas-exports'
import AccountDeltaConnection from './account-delta-connection';
import DeltaStreamingConnection from './delta-streaming-connection';


class DeltaConnectionStore {
constructor() {
this._accountConnections = [];
this._connections = [];
this._unsubscribe = () => {}
}

Expand All @@ -18,10 +18,6 @@ class DeltaConnectionStore {
this._unsubscribe()
}

_existingConnectionsForAccount(account) {
return _.find(this._accountConnections, c => c.account().id === account.id);
}

async _ensureConnections() {
if (NylasEnv.inSpecMode()) { return; }

Expand All @@ -33,25 +29,27 @@ class DeltaConnectionStore {
this._isBuildingDeltaConnections = true;

try {
const originalConnections = this._accountConnections;
const currentConnections = []
const currentConnections = this._connections;
const nextConnections = []
for (const account of AccountStore.accounts()) {
const existingDeltaConnection = this._existingConnectionsForAccount(account)
if (existingDeltaConnection) {
currentConnections.push(existingDeltaConnection);
const existingConnection = (
currentConnections
.find(conn => conn.account().id === account.id)
)
if (existingConnection) {
nextConnections.push(existingConnection);
continue
}

const newDeltaConnection = new AccountDeltaConnection(account);
await newDeltaConnection.loadStateFromDatabase()
newDeltaConnection.start()
currentConnections.push(newDeltaConnection);
const newDeltaConnection = new DeltaStreamingConnection(account);
await newDeltaConnection.start()
nextConnections.push(newDeltaConnection);
}
const oldDeltaConnections = _.difference(originalConnections, currentConnections);
const oldDeltaConnections = _.difference(currentConnections, nextConnections);
for (const deltaConnection of oldDeltaConnections) {
deltaConnection.end()
}
this._accountConnections = currentConnections;
this._connections = nextConnections;
} finally {
this._isBuildingDeltaConnections = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ const BASE_RETRY_DELAY = 1000;
class DeltaStreamingConnection {
constructor(account) {
this._account = account
this._state = {cursor: null, status: null}
this._state = null
this._longConnection = null
this._writeStateDebounced = _.debounce(this._writeState, 100)
this._unsubscribers = []
Expand All @@ -33,8 +33,15 @@ class DeltaStreamingConnection {
}
}

start() {
account() {
return this._account
}

async start() {
try {
if (!this._state) {
this._state = await this._loadState()
}
const {cursor = 0} = this._state
this._longConnection = new NylasLongConnection({
api: N1CloudAPI,
Expand Down Expand Up @@ -70,28 +77,11 @@ class DeltaStreamingConnection {
}

end() {
this._state = null
this._disposeListeners()
this._longConnection.end()
}

async loadStateFromDatabase() {
let json = await DatabaseStore.findJSONBlob(`DeltaStreamingConnectionStatus:${this._account.id}`)

if (!json) {
// Migrate from old storage key
const oldState = await DatabaseStore.findJSONBlob(`NylasSyncWorker:${this._account.id}`)
if (!oldState) { return; }
const {deltaCursors = {}, deltaStatus = {}} = oldState
json = {
cursor: deltaCursors.n1Cloud || null,
status: deltaStatus.n1Cloud || null,
}
}

if (!json) { return }
this._state = json;
}

_setupListeners() {
this._unsubscribers = [
Actions.retryDeltaConnection.listen(this.restart, this),
Expand All @@ -104,17 +94,6 @@ class DeltaStreamingConnection {
this._unsubscribers = []
}

_writeState() {
return DatabaseStore.inTransaction(t =>
t.persistJSONBlob(`DeltaStreamingConnectionStatus:${this._account.id}`, this._state)
);
}

_setCursor = (cursor) => {
this._state.cursor = cursor;
this._writeStateDebounced();
}

_onOnlineStatusChanged = () => {
if (OnlineStatusStore.isOnline()) {
this.restart()
Expand Down Expand Up @@ -175,6 +154,38 @@ class DeltaStreamingConnection {

setTimeout(() => this.restart(), this._backoffScheduler.nextDelay());
}

_setCursor = (cursor) => {
this._state.cursor = cursor;
this._writeStateDebounced();
}

async _loadState() {
const json = await DatabaseStore.findJSONBlob(`DeltaStreamingConnectionStatus:${this._account.id}`)
if (json) {
return json
}

// Migrate from old storage key
const oldState = await DatabaseStore.findJSONBlob(`NylasSyncWorker:${this._account.id}`)
if (!oldState) {
return {cursor: null, status: null};
}

const {deltaCursors = {}, deltaStatus = {}} = oldState
return {
cursor: deltaCursors.n1Cloud,
status: deltaStatus.n1Cloud,
}
}

async _writeState() {
if (!this._state) { return }
await DatabaseStore.inTransaction(t =>
t.persistJSONBlob(`DeltaStreamingConnectionStatus:${this._account.id}`, this._state)
);
}

}

export default DeltaStreamingConnection

This file was deleted.

2 changes: 1 addition & 1 deletion packages/client-app/src/nylas-env.es6
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ export default class NylasEnvConstructor {

this.timer = remote.getGlobal('application').timer;

this.localSyncEmitter = new Emitter();
this.globalWindowEmitter = new Emitter();

if (!this.inSpecMode()) {
this.actionBridge = new ActionBridge(ipcRenderer);
Expand Down

0 comments on commit d60a23c

Please sign in to comment.