diff --git a/scout-brain/lib/actions/token.js b/scout-brain/lib/actions/token.js index f0629f73a06..1a5e02d2bb3 100644 --- a/scout-brain/lib/actions/token.js +++ b/scout-brain/lib/actions/token.js @@ -3,7 +3,6 @@ var jwt = require('jsonwebtoken'); var boom = require('boom'); var debug = require('debug')('scout-brain:token'); var _ = require('underscore'); -var assert = require('assert'); var getDeployment = require('./deployment').get; var createSession = require('./session').create; @@ -26,6 +25,7 @@ function verify(token, fn) { } function mount(tokenData, ctx, next) { + debug('mounting token data'); if (!tokenData.session_id) { return next(boom.badRequest('Bad token: missing session id')); } @@ -57,23 +57,19 @@ function mount(tokenData, ctx, next) { 'to `' + ctx.instance_id + '` but it is not in ' + 'deployment `' + tokenData.deployment_id + '`')); } + } - debug('getting connection for session', tokenData.session_id); - getSession(tokenData.session_id, function(err, connection) { - if (err) return next(err); - - ctx.mongo = connection; - return next(); - }); + debug('getting connection for session', tokenData.session_id); + getSession(tokenData.session_id, function(err, connection) { + if (err) return next(err); - } else { - getSession(tokenData.session_id, function(err, connection) { - if (err) return next(err); + if (!connection) { + return next(boom.forbidden('No session for this token.')); + } - ctx.mongo = connection; - return next(); - }); - } + ctx.mongo = connection; + return next(); + }); }); } diff --git a/scout-client/lib/client.js b/scout-client/lib/client.js index 0ebbb47bc4f..55acf237bb6 100644 --- a/scout-client/lib/client.js +++ b/scout-client/lib/client.js @@ -124,8 +124,8 @@ Client.prototype.connect = function() { return this; } this.token = new Token(this.config) - .on('readable', this.onTokenReadable.bind(this)) - .on('error', this.onTokenError.bind(this)); + .on('data', this.onTokenReadable.bind(this)) + .on('error', this.onError.bind(this)); return this; }; @@ -558,7 +558,7 @@ Client.prototype.read = function(path, params, fn) { if (!this.readable) { debug('%s not readable. queueing read', this._id, path, params); - return this.on('readable', this.read.bind(this, path, params, fn)); + return this.once('readable', this.read.bind(this, path, params, fn)); } if (typeof params === 'function') { @@ -618,14 +618,14 @@ Client.prototype.ender = function(fn) { * @api private */ Client.prototype.onTokenReadable = function() { - debug('token now readable'); - this.readable = true; - this.context.set(this.token.session); if (!this.io) { this._initSocketio(); } - this.emit('readable', this.token.session); - debug('emitted readable on client'); + debug('authenticating with scout-server socket.io transport...'); + this.io.emit('authenticate', { + token: this.token.toString() + }); + if (!this.original) { this.emit('change'); @@ -642,22 +642,10 @@ Client.prototype.onTokenReadable = function() { } }; -/** - * We couldn't get a token. - * - * @api private - */ -Client.prototype.onTokenError = function(err) { - debug('Could not get token. Server not running?', err); - this.emit('error', err); -}; - Client.prototype._initSocketio = function() { if (this.io) return; - this.io = socketio(this.config.scout, { - query: 'token=' + this.token.toString() - }); + this.io = socketio(this.config.scout); this.io.on('reconnecting', this.emit.bind(this, 'reconnecting')) .on('reconnect', this.onReconnect.bind(this)) .on('reconnect_attempt', this.emit.bind(this, 'reconnect_attempt')) @@ -665,7 +653,16 @@ Client.prototype._initSocketio = function() { .on('disconnect', this.emit.bind(this, 'disconnect')); this.io.on('connect', function() { debug('connected to scout-server socket'); - }); + this.io.on('authenticated', function() { + debug('Success! Now authenticated with scout-server socket.io transport'); + + this.readable = true; + this.context.set(this.token.session); + debug('now ready for use'); + this.emit('readable', this.token.session); + }.bind(this)); + }.bind(this)); + this.io.on('error', this.onError.bind(this)); }; /** @@ -689,11 +686,12 @@ Client.prototype.onUnload = function() { * @param {Error} err * @api private */ -Client.prototype.onTokenError = function(err) { - this.dead = err; - if (err >= 500) { - this.dead.message += ' (scout-server dead at ' + this.config.scout + '?)'; +Client.prototype.onError = function(err) { + if (err.message === 'Origin is not allowed by Access-Control-Allow-Origin') { + err.message = 'scout-server not running at ' + this.config.scout + '?'; } + // @todo: Exponential back-off to retry connecting. + this.dead = err; this.emit('error', err); }; @@ -701,8 +699,6 @@ Client.prototype.onReconnect = function() { debug('reconnected. getting new token'); this.token = new Token(this.config) .on('readable', function() { - this.readable = true; - this.context.set(this.token.session); this._initSocketio(); }.bind(this)) .on('error', this.emit.bind(this, 'error')); @@ -736,7 +732,7 @@ Client.prototype.createReadStream = function(_id, data) { return debug('proxy already transferred'); } }); - this.on('readable', function() { + this.once('readable', function() { debug('client readable'); var src = client.createReadStream(_id, data); src.on('data', proxy.emit.bind(proxy, 'data')); @@ -752,6 +748,10 @@ Client.prototype.createReadStream = function(_id, data) { data = data || {}; var stream = ss.createStream(this.io); ss(this.io).emit(_id, stream, data); - return stream.pipe(EJSON.createParseStream()); + + var parser = EJSON.createParseStream(); + var res = stream.pipe(parser); + stream.on('error', res.emit.bind(res, 'error')); + return res; } }; diff --git a/scout-client/lib/token.js b/scout-client/lib/token.js index bbc9bcce5ee..d2da80bac75 100644 --- a/scout-client/lib/token.js +++ b/scout-client/lib/token.js @@ -11,18 +11,11 @@ function Token(config) { this.config = config; this.expirationRedLine = 15 * 1000; this.session = {}; - this.readable = false; process.nextTick(function() { - this.bake(function(err, res) { + this.bake(function(err) { if (err) return this.emit('error', err); - - this.session = res; this.schedule(); - - this.readable = true; - debug('emit readable!'); - this.emit('readable'); }.bind(this)); }.bind(this)); } @@ -47,12 +40,12 @@ Token.prototype.close = function(fn) { clearTimeout(this.refreshTimeout); debug('closing token'); request.del(this.config.scout + '/api/v1/token') - .set('Accept', 'application/json') - .set('Authorization', 'Bearer ' + this.session.token) - .end(function(err, res) { - debug('response from token close'); - fn(err, res); - }); + .set('Accept', 'application/json') + .set('Authorization', 'Bearer ' + this.session.token) + .end(function(err, res) { + debug('response from token close'); + fn(err, res); + }); }; Token.prototype.bake = function(done) { @@ -60,8 +53,9 @@ Token.prototype.bake = function(done) { seed: this.config.seed }; - if (this.config.timeout) + if (this.config.timeout) { payload.timeout = this.config.timeout; + } if (this.config.auth) { Object.keys(this.config.auth).map(function(name) { @@ -73,36 +67,43 @@ Token.prototype.bake = function(done) { } debug('getting token for', this.config.seed, payload); request.post(this.config.scout + '/api/v1/token') - .send(payload) - .set('Accept', 'application/json') - .end(function(err, res) { - if (err) return done(err); - - if (!err && res.status >= 400) { - err = new Error(res.body ? res.body.message : res.text); - err.code = res.status; - Error.captureStackTrace(err, Token.prototype.bake); - return done(err); - } - - if (!res.body.expires_at || !res.body.created_at) { - return done(new Error('Malformed response. Missing expires_at or created_at')); - } - - if (new Date(res.body.expires_at) - Date.now() < (1 * 60 * 1000)) { - return done(new Error('Got an expires that is less than a minute from now.')); - } - - done(null, res.body); - }.bind(this)); + .send(payload) + .set('Accept', 'application/json') + .end(function(err, res) { + if (err) { + if (res && res.body) { + err.message += ': ' + res.body.message; + } + console.error('Error getting token:', err); + return done(err); + } + + if (!err && res.status >= 400) { + err = new Error(res.body ? res.body.message : res.text); + err.code = res.status; + Error.captureStackTrace(err, Token.prototype.bake); + return done(err); + } + + if (!res.body.expires_at || !res.body.created_at) { + return done(new Error('Malformed response. Missing expires_at or created_at')); + } + + if (new Date(res.body.expires_at) - Date.now() < (1 * 60 * 1000)) { + return done(new Error('Got an expires that is less than a minute from now.')); + } + + this.session = res.body; + this.emit('data', this.session); + + done(null, res.body); + }.bind(this)); }; Token.prototype.refresh = function() { - this.bake(function(err, res) { + this.bake(function(err) { if (err) return this.emit('error', err); - if (!res) return this.emit('error', new Error('Empty response with no error')); - this.session = res; debug('token refreshed successfully'); return this.schedule(); }.bind(this)); @@ -114,5 +115,6 @@ Token.prototype.schedule = function() { return this; } var ms = (new Date(this.session.expires_at) - Date.now()) - this.expirationRedLine; + debug('scheduling token refresh %dms from now', ms); this.refreshTimeout = setTimeout(this.refresh.bind(this), ms); }; diff --git a/scout-server/lib/index.js b/scout-server/lib/index.js index b1ad9261aec..5bb7069c785 100644 --- a/scout-server/lib/index.js +++ b/scout-server/lib/index.js @@ -14,29 +14,30 @@ var urldecode = require('body-parser').urlencoded({ app.server = require('http').createServer(app); app.config = require('mongoscope-config'); -if (process.env.NODE_ENV = 'development') { - app.use(require('connect-livereload')({ - port: 35729, - include: ['./'] - })); - var livereload = require('tiny-lr')(); - var watch = require('watch'); - - livereload.listen(35729, '127.0.0.1'); - - watch.watchTree(__dirname + '/../', { - filter: function(filename) { - return !(/node_modules/.test(filename)); - }, - ignoreDotFiles: true - }, function(files) { - livereload.changed({ - body: { - files: files - } - }); - }); -} +// @todo: this should be moved to scout-electron now. +// if (process.env.NODE_ENV = 'development') { +// app.use(require('connect-livereload')({ +// port: 35729, +// include: ['./'] +// })); +// var livereload = require('tiny-lr')(); +// var watch = require('watch'); + +// livereload.listen(35729, '127.0.0.1'); + +// watch.watchTree(__dirname + '/../', { +// filter: function(filename) { +// return !(/node_modules/.test(filename)); +// }, +// ignoreDotFiles: true +// }, function(files) { +// livereload.changed({ +// body: { +// files: files +// } +// }); +// }); +// } app.use(require('./middleware/watch-event-loop-blocking')); diff --git a/scout-server/lib/io.js b/scout-server/lib/io.js index c3b98121d91..b64bdfbae0f 100644 --- a/scout-server/lib/io.js +++ b/scout-server/lib/io.js @@ -18,12 +18,16 @@ var _idToDocument = require('./streams/id-to-document'); var EJSON = require('mongodb-extended-json'); var typedParams = require('./middleware/typed-params'); -io.use(require('socketio-jwt').authorize({ + +io.on('connection', require('socketio-jwt').authorize({ secret: config.get('token:secret').toString('utf-8'), - handshake: true -})); + timeout: 15000 +})).on('authenticated', function(socket) { + debug('authenticated with token data', socket.decoded_token); +}); function prepare(socket, req, done) { + debug('preparing socket.io request'); req.params = _.extend({ ns: req.ns, size: req.size, @@ -35,10 +39,7 @@ function prepare(socket, req, done) { var tasks = {}; tasks.token = function(next) { - brain.loadToken(socket.decoded_token, req, function() { - debug('load token returned', arguments); - next(); - }); + brain.loadToken(socket.decoded_token, req, next); }; if (req.params.ns) { @@ -55,8 +56,8 @@ function prepare(socket, req, done) { done(); }); } - async.series(tasks, function(err) { + debug('socket.io request now prepared'); if (err) return done(err); done(); }); @@ -65,7 +66,9 @@ function prepare(socket, req, done) { io.on('connection', function(socket) { ss(socket).on('collection:sample', function(stream, req) { - prepare(socket, req, function() { + prepare(socket, req, function(err) { + if (err) return stream.emit('error', err); + debug('collection:sample got req %j', Object.keys(req)); var db = req.mongo.db(req.params.database_name); createSampleStream(db, req.params.collection_name, { @@ -79,5 +82,4 @@ io.on('connection', function(socket) { .pipe(stream); }); }); - debug('token data %j', socket.decoded_token); }); diff --git a/scout-server/lib/middleware/mongodb-boom.js b/scout-server/lib/middleware/mongodb-boom.js index 197fe8d6f91..6931062f28c 100644 --- a/scout-server/lib/middleware/mongodb-boom.js +++ b/scout-server/lib/middleware/mongodb-boom.js @@ -32,6 +32,8 @@ function decodeDriverError(err, msg, fn) { err = boom.badRequest(msg); } else if (/(target namespace exists|already exists)/.test(err.message)) { return boom.conflict('Collection already exists'); + } else if (/server .* sockets closed/.test(msg)) { + err = boom.serverTimeout('Too many connections to MongoDB'); } else { // Have a case where we're not properly validating invalid // replicaset commands on a deployment with no replicaset.else if (/valid replicaset|No primary found in set/.test(msg)) { diff --git a/scout-ui/src/home/index.js b/scout-ui/src/home/index.js index 4e0294d8b2e..a53dbd43f63 100644 --- a/scout-ui/src/home/index.js +++ b/scout-ui/src/home/index.js @@ -25,9 +25,14 @@ var CollectionView = AmpersandView.extend({ app.statusbar.watch(this, this.schema); this.schema.ns = this.model._id; + this.listenTo(this.schema, 'error', this.onError); this.schema.fetch(); }, template: require('./collection.jade'), + onError: function(schema, err) { + // @todo: Figure out a good way to handle this (server is probably crashed). + console.error('Error getting schema: ', err); + }, subviews: { fields: { waitFor: 'schema.fields', diff --git a/scout-ui/src/models/index.js b/scout-ui/src/models/index.js index a6d1e9a0d73..441db8719b5 100644 --- a/scout-ui/src/models/index.js +++ b/scout-ui/src/models/index.js @@ -17,6 +17,14 @@ window.scout = client; var wrapError = require('./wrap-error'); +/** + * Catch-all for any client errors so we just log them instead of + * stopping the app. + */ +client.on('error', function(err) { + console.error(err); +}); + var SampledSchema = Schema.extend({ fetch: function(options) { options = _.defaults((options || {}), { @@ -28,7 +36,7 @@ var SampledSchema = Schema.extend({ wrapError(this, options); var model = this; - var detect = this.stream() + var parser = this.stream() .on('error', function(err) { options.error(err, 'error', err.message); }) @@ -40,7 +48,9 @@ var SampledSchema = Schema.extend({ model.trigger('request', model, {}, options); process.nextTick(function() { - client.sample(model.ns, options).pipe(detect); + client.sample(model.ns, options) + .on('error', parser.emit.bind(parser, 'error')) + .pipe(parser); }); } });