Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeche committed Jan 1, 2016
0 parents commit 1144cc1
Show file tree
Hide file tree
Showing 8 changed files with 784 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
@@ -0,0 +1 @@
node_modules
197 changes: 197 additions & 0 deletions index.js
@@ -0,0 +1,197 @@
'use strict';

const connect = require('./lib/client');
const ed = require('./lib/editor');
const diffFactory = require('./lib/diff');
const pkg = require('./package.json');

const EDITOR_ID = 'atom';

module.exports.activate = function() {
connect(pkg.config.websocketUrl, (err, client) => {
if (err) {
return console.error('Unable to setup LiveStyle connection:', err);
}

console.info('LiveStyle client connected');

client
.on('open', () => {
debug('connection opened');
clientId(client);
editorId(client);
})
.on('client-connect', () => editorId(client))
.on('identify-client', () => clientId(client))
.on('patcher-connect', () => initialContent(client, atom.workspace.getActiveTextEditor()))
.on('incoming-updates', data => {
let editor = ed.editorForUri(data.uri);
if (editor) {
client.send('apply-patch', ed.payload(editor, {
'patches': data.patches
}));
}
})
.on('patch', data => {
let editor = ed.editorForUri(data.uri);
if (editor) {
updateContent(client, editor, data);
}
})
.on('request-files', data => {
// TODO implement preprocessor dependency fetcher
})
.on('request-unsaved-changes', data => {
let files = data.files || [];
files.forEach(uri => {
let editor = ed.editorForUri(uri);
if (editor) {
sendUnsavedChanges(client, editor);
}
});
})
// supress 'error' event since in Node.js, in most cases it means unhandled exception
.on('error', err => console.error(err));

// observe editor life cycle
let diff = diffFactory(client);
let refresh = () => scheduleRefreshFiles(client);
atom.workspace.observeTextEditors(editor => {
refresh();
let justLoaded = true;
let callbacks = [
editor.onDidChange(() => {
if (ed.syntax(editor)) {
if (justLoaded) {
debug('set initial content for new editor');
initialContent(client, editor);
justLoaded = false;
} else if (!ed.isLocked(editor)) {
debug('editor did change');
diff(editor);
}
}
}),
editor.onDidSave(refresh),
editor.observeGrammar(refresh)
];
editor.onDidDestroy(() => {
callbacks.forEach(c => c.dispose());
callbacks = null;
refresh();
});
});
});
};

module.exports.deactivate = function() {
// TODO close server
console.log('deactivate');
};

/**
* Updates content of given editor with patched content from LiveStyle
* @param {TextEditor} editor
* @param {Object} payload
*/
function updateContent(client, editor, payload) {
if (!payload) {
return;
}

ed.lock(editor);
// unlock after some timeout to ensure that `onDidChange` event didn't
// triggered 'calculate-diff' event
setTimeout(() => ed.unlock(editor), 10);

if (payload.ranges.length && payload.hash === ed.hash(editor)) {
// integrity check: if editor content didn't changed since last patch
// request (e.g. content hashes are match), apply incremental updates

let buf = editor.getBuffer();
editor.transact(() => {
if (editor.hasMultipleCursors()) {
// reset multiple selections into a single cursor pos
let pos = editor.getCursorBufferPosition();
editor.setCursorBufferPosition(pos, {autoscroll: false});
}

let opt = {undo: 'skip'};
payload.ranges.forEach(r => {
buf.setTextInRange([
buf.positionForCharacterIndex(r[0]),
buf.positionForCharacterIndex(r[1])
], r[2], opt);
});

// select last range
let lastRange = payload.ranges[payload.ranges.length - 1];
editor.setSelectedBufferRange([
buf.positionForCharacterIndex(lastRange[0]),
buf.positionForCharacterIndex(lastRange[0] + lastRange[2].length)
]);
});
} else {
// user changed content since last patch request: replace whole content
editor.setText(payload.content || '');
}

// update initial content for current view in LiveStyle cache
initialContent(client, editor);
}

function sendUnsavedChanges(client, editor) {
if (editor.isModified()) {
var previous = editor.getBuffer().cachedDiskContents || '';
debug('send unsaved changes for', ed.fileUri(editor));
client.send('calculate-diff', ed.payload(editor, {previous}));
}
}

////////////////////////////////////////

function editorId(client) {
debug('send editor id');
client.send('editor-connect', {
id: EDITOR_ID,
title: 'Atom'
});
scheduleRefreshFiles(client);
}

function clientId(client) {
debug('send client id');
client.send('client-id', {id: EDITOR_ID});
}

function initialContent(client, editor) {
let syntax = ed.syntax(editor);
if (syntax) {
client.send('initial-content', ed.payload(editor));
}
}

function refreshFiles(client) {
let files = ed.all().map(editor => ed.fileUri(editor)).filter(unique);
debug('send file list', files);
client.send('editor-files', {id: EDITOR_ID, files});
}

var _refreshFilesTimer = null;
function scheduleRefreshFiles(client) {
if (!_refreshFilesTimer) {
_refreshFilesTimer = setImmediate(() => {
refreshFiles(client);
_refreshFilesTimer = null;
});
}
}

function unique(value, i, array) {
return array.indexOf(value) === i;
}

function debug() {
let args = Array.prototype.slice.call(arguments, 0);
console.log.apply(console, ['%cLiveStyle', 'font-weight:bold;color:green'].concat(args));
}
105 changes: 105 additions & 0 deletions lib/client.js
@@ -0,0 +1,105 @@
/**
* A minimal LiveStyle server client. Unlike existing
* `livestyle/client`, this one will not reconnect when connection
* in dropped. Instead, it will start its own WeSocket server instance.
*/
'use strict';

const WebSocket = require('ws');
const parseUrl = require('url').parse;
const extend = require('xtend');
const createServer = require('./server');

var errCount = 0;
var defaultOptions = {
reconnectOnClose: true,
maxRetries: 5
};

var connect = module.exports = function(url, options, callback) {
if (typeof options === 'function') {
callback = options;
options = {};
}

callback = callback || noop;
options = extend(defaultOptions, options || {});

debug('connecting to', url);
var client = new WebSocket(url);

return client
.on('message', onMessage)
.once('open', function() {
debug('connection opened');
errCount = 0;
callback(null, wrapClient(client));
})
.once('close', function() {
// reconnect if connection dropped
var reconnect = !this._destroyed && options.reconnectOnClose;
debug('connection closed', reconnect ? ', reconnecting' : '');
if (reconnect) {
connect(url, options, callback);
}
})
.once('error', function(err) {
debug(err);
if (err.code === 'ECONNREFUSED') {
// ECONNREFUSED means there’s no active LiveStyle
// server, we should create our own instance and reconnect again
errCount++;
if (errCount < options.maxRetries) {
return createServer(parseUrl(url).port, function() {
this.removeListener('error', callback);
var c = connect(url, options, callback);
c.server = this;
})
.once('error', callback);
}
}

// unknown error, aborting
callback(err);
});
};

function noop() {}

function onMessage(message) {
try {
message = JSON.parse(message);
} catch (err) {
return debug('Error parsing message: %s', err.message);
}

this.emit('message-receive', message.name, message.data);
this.emit(message.name, message.data);
}

function wrapClient(client) {
var _send = client.send;
client.send = function(name, data) {
try {
_send.call(this, JSON.stringify({name, data}));
} catch(err) {
debug('Error while sending message:', err);
client.emit('error', err);
}
};

client.destroy = function() {
this.close();
this._destroyed = true;
if (this.server) {
this.server.destroy();
this.server = null;
}
};
return client;
}

function debug() {
let args = Array.prototype.slice.call(arguments, 0);
console.log.apply(console, ['%cLiveStyle Client', 'font-weight:bold;color:orange'].concat(args));
}
86 changes: 86 additions & 0 deletions lib/diff.js
@@ -0,0 +1,86 @@
/**
* Performs diff requests to LiveStyle patcher.
* When `diff()` method is called, sends `calculate-diff` request to patching
* server and wait until either `diff` or `error` response is received.
* Until that all other `diff()` requests are queued to lower the pressure
* to patcher and save system resources
*/
'use strict';

const ed = require('./editor');

// Duration, in milliseconds, after which performing diff lock considered obsolete
const waitTimeout = 10000;

module.exports = function(client) {
const _state = {
lockedBy: null,
created: 0,
pending: []
};

let nextQueued = release => {
if (release) {
debug('Release diff lock');
_state.lockedBy = null;
}

// make sure current command lock is still valid
if (_state.lockedBy && _state.created + waitTimeout < Date.now()) {
debug('Waiting response is obsolete, reset');
_state.lockedBy = null;
}

if (!_state.lockedBy && _state.pending.length) {
let uri = _state.pending.shift();
let editor = ed.editorForUri(uri);
if (!editor) {
// looks like view for pending diff is already closed, move to next one
debug('No view, move to next queued diff item');
return nextQueued();
}

debug('Send "calculate-diff" message');
_state.lockedBy = uri;
_state.created = Date.now();
client.send('calculate-diff', ed.payload(editor));
} else {
debug('Diff locked, waiting for response');
}
};

client
.on('diff', data => {
debug('Got diff response for', data.uri);
if (_state.lockedBy === data.uri) {
debug('Release diff lock, move to next item');
nextQueued(true);
}
})
.on('error', data => {
if (typeof data !== 'object' || data instanceof Error) {
// a system error, do not handle
return;
}

let origin = data.origin || {};
if (origin.name === 'calculate-diff' && _state.lockedBy && _state.lockedBy === origin.uri) {
nextQueued(true);
}
});

return function(editor) {
let uri = ed.fileUri(editor);
if (_state.pending.indexOf(uri) === -1) {
debug('Pending patch request for', uri);
_state.pending.push(uri);
}

nextQueued();
};
};

function debug() {
let args = Array.prototype.slice.call(arguments, 0);
console.log.apply(console, ['%cLiveStyle Patcher', 'font-weight:bold;color:blue'].concat(args));
}

0 comments on commit 1144cc1

Please sign in to comment.