Skip to content
This repository has been archived by the owner on Mar 21, 2018. It is now read-only.

Commit

Permalink
Always launch a new process for the UA service.
Browse files Browse the repository at this point in the history
  • Loading branch information
Mossop committed May 25, 2016
1 parent 403045b commit 63e4dd4
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 110 deletions.
46 changes: 10 additions & 36 deletions app/main/browser.js
Expand Up @@ -37,30 +37,27 @@ import * as hotkeys from './hotkeys';
import * as menu from './menu/index';
import * as instrument from '../services/instrument';
import registerAboutPages from './about-pages';
import { ProfileStorage } from '../services/storage';
import * as userAgentService from './user-agent-service';
const profileStoragePromise = ProfileStorage.open(path.join(__dirname, '..', '..'));
import userAgentClient from '../shared/user-agent-client';
import { UI_DIR, fileUrl } from './util';
import BUILD_CONFIG from '../../build-config';

import WebSocket from 'ws';
import * as endpoints from '../shared/constants/endpoints';

const BrowserWindow = electron.BrowserWindow; // create native browser window.
const app = electron.app; // control application life.
const ipc = electron.ipcMain;
const globalShortcut = electron.globalShortcut;
const userAgentPromise = userAgentClient.connect();

/**
* Top-level list of Browser Windows.
*/
const browserWindowIds = [];

async function makeBrowserWindow(): Promise<electron.BrowserWindow> {
const profileStorage = await profileStoragePromise;
const userAgent = await userAgentPromise;

// TODO: don't abuse the storage layer's session ID generation to produce scopes.
const scope = await profileStorage.startSession();
const scope = await userAgent.startSession();

// Create the browser window.
const browser = new BrowserWindow({
Expand Down Expand Up @@ -216,41 +213,18 @@ ipc.on('open-menu', (event) => {
menu.buildWindowMenu(menuData).popup(bw);
});

let ws = undefined;

const menuData = {};

profileStoragePromise.then(async function(profileStorage) {
const { reused } = await userAgentService.start(profileStorage,
endpoints.UA_SERVICE_PORT,
{ debug: false, allowReuse: true });

if (reused) {
// In the future, let's do something like test a /status or /heartbeat endpoint for a version
// and well-known string to be sure that we're actually connecting to a UA service.
userAgentPromise.then(async function(userAgent) {
if (userAgent.clientCount > 1) {
console.log('Using an already running User Agent service ' +
`(running on ${endpoints.UA_SERVICE_PORT}).`);
`(running on ${endpoints.UA_SERVICE_PORT} with ` +
`${userAgent.clientCount} active clients).`);
} else {
console.log(`Started a new User Agent service (running on ${endpoints.UA_SERVICE_PORT}).`);
}

ws = new WebSocket(`${endpoints.UA_SERVICE_WS}/diffs`);

ws.on('open', () => {
// Nothing for now.
});

ws.on('message', (data, flags) => {
// flags.binary will be set if a binary data is received.
// flags.masked will be set if the data was masked.
if (flags.binary) {
return;
}
const command = JSON.parse(data);
if (!command) {
return;
}

userAgent.on('diff', (command) => {
if (command.type === 'initial') {
menuData.recentBookmarks = command.payload.recentStars;
menu.buildAppMenu(menuData);
Expand All @@ -262,4 +236,4 @@ profileStoragePromise.then(async function(profileStorage) {
menu.buildAppMenu(menuData);
}
});
});
}, console.error);
86 changes: 44 additions & 42 deletions app/main/user-agent-service.js
Expand Up @@ -64,23 +64,41 @@ function configure(app: any, storage: ProfileStorage) {
return { stars, recentStars };
}

const diffsClients = [];
router.ws('/diffs', async function(ws, _req) {
diffsClients.push(ws);

ws.send(JSON.stringify({ type: 'initial', payload: await initial() }));
const wsClients = [];
router.ws('/ws', async function(ws, _req) {
wsClients.push(ws);

ws.send(JSON.stringify({
message: 'protocol',
version: 'v1',
clientCount: wsClients.length,
}));

ws.send(JSON.stringify({
message: 'diff',
type: 'initial',
payload: await initial(),
}));

ws.on('close', () => {
const index = diffsClients.indexOf(ws);
const index = wsClients.indexOf(ws);
if (index > -1) {
diffsClients.splice(index, 1);
wsClients.splice(index, 1);
}

if (wsClients.length === 0) {
// When there are no more clients close the service.
stop();
}
});
});

function sendDiff(diff) {
diffsClients.forEach((ws) => {
ws.send(JSON.stringify(diff));
wsClients.forEach((ws) => {
ws.send(JSON.stringify({
message: 'diff',
...diff,
}));
});
}

Expand All @@ -93,7 +111,7 @@ function configure(app: any, storage: ProfileStorage) {
}

router.post('/session/start', wrap(async function(req, res) {
req.checkBody('scope').notEmpty().isInt();
req.checkBody('scope').optional().isInt();
req.checkBody('ancestor').optional().isInt();

const errors = req.validationErrors();
Expand Down Expand Up @@ -258,6 +276,19 @@ function configure(app: any, storage: ProfileStorage) {
});
}

let server;
function stop() {
return new Promise((res, rej) => {
server.close(err => {
if (err) {
rej(err);
} else {
res();
}
});
});
}

/**
* Exposes a function that starts up an Express static server
* to the `./fixtures` directory on port 8080.
Expand All @@ -269,20 +300,7 @@ function configure(app: any, storage: ProfileStorage) {
export function start(storage: ProfileStorage,
port: number,
options: ?Object = {}) {
const { debug, allowReuse } = options;

let server;
function stop() {
return new Promise((res, rej) => {
server.close(err => {
if (err) {
rej(err);
} else {
res();
}
});
});
}
const { debug } = options;

return new Promise((resolve) => {
const app = express();
Expand All @@ -297,24 +315,8 @@ export function start(storage: ProfileStorage,
// Sadly, app.listen does not return the HTTP server just yet.
// Therefore, we extract it manually below.
app.listen(port, '127.0.0.1', () => {
resolve({ app, getWss, stop, reused: false });
server = getWss()._server;
resolve();
});
server = getWss()._server;

if (allowReuse) {
// Let's assume we're already running our service.
server.on('error', (e) => {
if (e.code !== 'EADDRINUSE') {
throw e;
}
});

getWss().on('error', (e) => {
if (e.code !== 'EADDRINUSE') {
throw e;
}
resolve({ app, getWss, reused: true });
});
}
});
}
2 changes: 1 addition & 1 deletion app/shared/constants/endpoints.js
Expand Up @@ -12,4 +12,4 @@

export const UA_SERVICE_PORT = 9090;
export const UA_SERVICE_HTTP = `http://localhost:${UA_SERVICE_PORT}/v1`;
export const UA_SERVICE_WS = `ws://localhost:${UA_SERVICE_PORT}/v1`;
export const UA_SERVICE_WS = `ws://localhost:${UA_SERVICE_PORT}/v1/ws`;
134 changes: 134 additions & 0 deletions app/shared/user-agent-client.js
@@ -0,0 +1,134 @@
/*
Copyright 2016 Mozilla
Licensed under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of the
License at http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software distributed
under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
CONDITIONS OF ANY KIND, either express or implied. See the License for the
specific language governing permissions and limitations under the License.
*/

import path from 'path';
import EventEmitter from 'events';
import WebSocket from 'ws';
import request from 'request';
import { setTimeout } from 'timers';
import childProcess from 'child_process';
import * as endpoints from './constants/endpoints';

function uaRequest(service, options) {
return new Promise((resolve, reject) => {
request[options.method.toLowerCase()]({
url: `${endpoints.UA_SERVICE_HTTP}${service}`,
headers: {
Accept: 'application/json',
'Content-Type': 'application/json',
},
}, (err, response, body) => {
if (err) {
reject(err);
return;
}

resolve(JSON.parse(body));
});
});
}

class UserAgent extends EventEmitter {
constructor(ws, clientCount) {
super();

this.ws = ws;
this.clientCount = clientCount;

this.ws.on('message', (data) => {
data = JSON.parse(data);
const message = data.message;
delete data.message;
this.emit(message, data);
});
}

startSession() {
return uaRequest('/session/start', {
method: 'POST',
});
}
}

const timeout = (ms) => new Promise(resolve => setTimeout(resolve, ms));

function startService() {
// Start a UA service
const exec = 'node';
const args = ['server.js'];

console.log('Starting UA process');
childProcess.spawn(exec, args, {
detached: true,
cwd: path.join(__dirname, '..', 'main'),
});
}

function attemptConnect() {
return new Promise((resolve, reject) => {
const ws = new WebSocket(`${endpoints.UA_SERVICE_WS}`);

ws.on('open', () => {
console.log('Connected to UA process');

// The first message should be protocol information
ws.once('message', (data) => {
data = JSON.parse(data);

if (data.message !== 'protocol' || data.version !== 'v1') {
console.error(`Incorrect message ${JSON.stringify(data)}`);
reject();
return;
}
resolve(new UserAgent(ws, data.clientCount));
});
});

ws.on('error', (e) => {
reject(e);
});
});
}

async function backoffConnect() {
let timedout = false;
const timer = setTimeout(() => timedout = true, 5000);
let delay = 200;

while (!timedout) {
try {
const ua = await attemptConnect();
clearTimeout(timer);
return ua;
} catch (e) {
await timeout(delay);
delay *= 2;
}
}

throw new Error('Unable to connect to the UA service.');
}

export default {
async connect() {
console.log('Attempting to connect');
try {
return await attemptConnect();
} catch (e) {
if (e.code === 'ECONNREFUSED') {
startService();
return await backoffConnect();
}
throw e;
}
},
};

0 comments on commit 63e4dd4

Please sign in to comment.