Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 11 additions & 15 deletions scout-brain/lib/actions/token.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ var jwt = require('jsonwebtoken');
var boom = require('boom');
var debug = require('debug')('scout-brain:token');
var _ = require('underscore');
var assert = require('assert');

var getDeployment = require('./deployment').get;
var createSession = require('./session').create;
Expand All @@ -26,6 +25,7 @@ function verify(token, fn) {
}

function mount(tokenData, ctx, next) {
debug('mounting token data');
if (!tokenData.session_id) {
return next(boom.badRequest('Bad token: missing session id'));
}
Expand Down Expand Up @@ -57,23 +57,19 @@ function mount(tokenData, ctx, next) {
'to `' + ctx.instance_id + '` but it is not in ' +
'deployment `' + tokenData.deployment_id + '`'));
}
}

debug('getting connection for session', tokenData.session_id);
getSession(tokenData.session_id, function(err, connection) {
if (err) return next(err);

ctx.mongo = connection;
return next();
});
debug('getting connection for session', tokenData.session_id);
getSession(tokenData.session_id, function(err, connection) {
if (err) return next(err);

} else {
getSession(tokenData.session_id, function(err, connection) {
if (err) return next(err);
if (!connection) {
return next(boom.forbidden('No session for this token.'));
}

ctx.mongo = connection;
return next();
});
}
ctx.mongo = connection;
return next();
});
});
}

Expand Down
60 changes: 30 additions & 30 deletions scout-client/lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ Client.prototype.connect = function() {
return this;
}
this.token = new Token(this.config)
.on('readable', this.onTokenReadable.bind(this))
.on('error', this.onTokenError.bind(this));
.on('data', this.onTokenReadable.bind(this))
.on('error', this.onError.bind(this));
return this;
};

Expand Down Expand Up @@ -558,7 +558,7 @@ Client.prototype.read = function(path, params, fn) {

if (!this.readable) {
debug('%s not readable. queueing read', this._id, path, params);
return this.on('readable', this.read.bind(this, path, params, fn));
return this.once('readable', this.read.bind(this, path, params, fn));
}

if (typeof params === 'function') {
Expand Down Expand Up @@ -618,14 +618,14 @@ Client.prototype.ender = function(fn) {
* @api private
*/
Client.prototype.onTokenReadable = function() {
debug('token now readable');
this.readable = true;
this.context.set(this.token.session);
if (!this.io) {
this._initSocketio();
}
this.emit('readable', this.token.session);
debug('emitted readable on client');
debug('authenticating with scout-server socket.io transport...');
this.io.emit('authenticate', {
token: this.token.toString()
});


if (!this.original) {
this.emit('change');
Expand All @@ -642,30 +642,27 @@ Client.prototype.onTokenReadable = function() {
}
};

/**
* We couldn't get a token.
*
* @api private
*/
Client.prototype.onTokenError = function(err) {
debug('Could not get token. Server not running?', err);
this.emit('error', err);
};

Client.prototype._initSocketio = function() {
if (this.io) return;

this.io = socketio(this.config.scout, {
query: 'token=' + this.token.toString()
});
this.io = socketio(this.config.scout);
this.io.on('reconnecting', this.emit.bind(this, 'reconnecting'))
.on('reconnect', this.onReconnect.bind(this))
.on('reconnect_attempt', this.emit.bind(this, 'reconnect_attempt'))
.on('reconnect_failed', this.emit.bind(this, 'reconnect_failed'))
.on('disconnect', this.emit.bind(this, 'disconnect'));
this.io.on('connect', function() {
debug('connected to scout-server socket');
});
this.io.on('authenticated', function() {
debug('Success! Now authenticated with scout-server socket.io transport');

this.readable = true;
this.context.set(this.token.session);
debug('now ready for use');
this.emit('readable', this.token.session);
}.bind(this));
}.bind(this));
this.io.on('error', this.onError.bind(this));
};

/**
Expand All @@ -689,20 +686,19 @@ Client.prototype.onUnload = function() {
* @param {Error} err
* @api private
*/
Client.prototype.onTokenError = function(err) {
this.dead = err;
if (err >= 500) {
this.dead.message += ' (scout-server dead at ' + this.config.scout + '?)';
Client.prototype.onError = function(err) {
if (err.message === 'Origin is not allowed by Access-Control-Allow-Origin') {
err.message = 'scout-server not running at ' + this.config.scout + '?';
}
// @todo: Exponential back-off to retry connecting.
this.dead = err;
this.emit('error', err);
};

Client.prototype.onReconnect = function() {
debug('reconnected. getting new token');
this.token = new Token(this.config)
.on('readable', function() {
this.readable = true;
this.context.set(this.token.session);
this._initSocketio();
}.bind(this))
.on('error', this.emit.bind(this, 'error'));
Expand Down Expand Up @@ -736,7 +732,7 @@ Client.prototype.createReadStream = function(_id, data) {
return debug('proxy already transferred');
}
});
this.on('readable', function() {
this.once('readable', function() {
debug('client readable');
var src = client.createReadStream(_id, data);
src.on('data', proxy.emit.bind(proxy, 'data'));
Expand All @@ -752,6 +748,10 @@ Client.prototype.createReadStream = function(_id, data) {
data = data || {};
var stream = ss.createStream(this.io);
ss(this.io).emit(_id, stream, data);
return stream.pipe(EJSON.createParseStream());

var parser = EJSON.createParseStream();
var res = stream.pipe(parser);
stream.on('error', res.emit.bind(res, 'error'));
return res;
}
};
82 changes: 42 additions & 40 deletions scout-client/lib/token.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,11 @@ function Token(config) {
this.config = config;
this.expirationRedLine = 15 * 1000;
this.session = {};
this.readable = false;

process.nextTick(function() {
this.bake(function(err, res) {
this.bake(function(err) {
if (err) return this.emit('error', err);

this.session = res;
this.schedule();

this.readable = true;
debug('emit readable!');
this.emit('readable');
}.bind(this));
}.bind(this));
}
Expand All @@ -47,21 +40,22 @@ Token.prototype.close = function(fn) {
clearTimeout(this.refreshTimeout);
debug('closing token');
request.del(this.config.scout + '/api/v1/token')
.set('Accept', 'application/json')
.set('Authorization', 'Bearer ' + this.session.token)
.end(function(err, res) {
debug('response from token close');
fn(err, res);
});
.set('Accept', 'application/json')
.set('Authorization', 'Bearer ' + this.session.token)
.end(function(err, res) {
debug('response from token close');
fn(err, res);
});
};

Token.prototype.bake = function(done) {
var payload = {
seed: this.config.seed
};

if (this.config.timeout)
if (this.config.timeout) {
payload.timeout = this.config.timeout;
}

if (this.config.auth) {
Object.keys(this.config.auth).map(function(name) {
Expand All @@ -73,36 +67,43 @@ Token.prototype.bake = function(done) {
}
debug('getting token for', this.config.seed, payload);
request.post(this.config.scout + '/api/v1/token')
.send(payload)
.set('Accept', 'application/json')
.end(function(err, res) {
if (err) return done(err);

if (!err && res.status >= 400) {
err = new Error(res.body ? res.body.message : res.text);
err.code = res.status;
Error.captureStackTrace(err, Token.prototype.bake);
return done(err);
}

if (!res.body.expires_at || !res.body.created_at) {
return done(new Error('Malformed response. Missing expires_at or created_at'));
}

if (new Date(res.body.expires_at) - Date.now() < (1 * 60 * 1000)) {
return done(new Error('Got an expires that is less than a minute from now.'));
}

done(null, res.body);
}.bind(this));
.send(payload)
.set('Accept', 'application/json')
.end(function(err, res) {
if (err) {
if (res && res.body) {
err.message += ': ' + res.body.message;
}
console.error('Error getting token:', err);
return done(err);
}

if (!err && res.status >= 400) {
err = new Error(res.body ? res.body.message : res.text);
err.code = res.status;
Error.captureStackTrace(err, Token.prototype.bake);
return done(err);
}

if (!res.body.expires_at || !res.body.created_at) {
return done(new Error('Malformed response. Missing expires_at or created_at'));
}

if (new Date(res.body.expires_at) - Date.now() < (1 * 60 * 1000)) {
return done(new Error('Got an expires that is less than a minute from now.'));
}

this.session = res.body;
this.emit('data', this.session);

done(null, res.body);
}.bind(this));
};

Token.prototype.refresh = function() {
this.bake(function(err, res) {
this.bake(function(err) {
if (err) return this.emit('error', err);
if (!res) return this.emit('error', new Error('Empty response with no error'));

this.session = res;
debug('token refreshed successfully');
return this.schedule();
}.bind(this));
Expand All @@ -114,5 +115,6 @@ Token.prototype.schedule = function() {
return this;
}
var ms = (new Date(this.session.expires_at) - Date.now()) - this.expirationRedLine;
debug('scheduling token refresh %dms from now', ms);
this.refreshTimeout = setTimeout(this.refresh.bind(this), ms);
};
47 changes: 24 additions & 23 deletions scout-server/lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,30 @@ var urldecode = require('body-parser').urlencoded({
app.server = require('http').createServer(app);
app.config = require('mongoscope-config');

if (process.env.NODE_ENV = 'development') {
app.use(require('connect-livereload')({
port: 35729,
include: ['./']
}));
var livereload = require('tiny-lr')();
var watch = require('watch');

livereload.listen(35729, '127.0.0.1');

watch.watchTree(__dirname + '/../', {
filter: function(filename) {
return !(/node_modules/.test(filename));
},
ignoreDotFiles: true
}, function(files) {
livereload.changed({
body: {
files: files
}
});
});
}
// @todo: this should be moved to scout-electron now.
// if (process.env.NODE_ENV = 'development') {
// app.use(require('connect-livereload')({
// port: 35729,
// include: ['./']
// }));
// var livereload = require('tiny-lr')();
// var watch = require('watch');

// livereload.listen(35729, '127.0.0.1');

// watch.watchTree(__dirname + '/../', {
// filter: function(filename) {
// return !(/node_modules/.test(filename));
// },
// ignoreDotFiles: true
// }, function(files) {
// livereload.changed({
// body: {
// files: files
// }
// });
// });
// }


app.use(require('./middleware/watch-event-loop-blocking'));
Expand Down
Loading