Permalink
Browse files

Extract engineio connection handler to ClientSession state model

  • Loading branch information...
1 parent 9d9c904 commit 1b4be6ce8f4eda0df8692c033a69db959cdcd5ff jden committed Dec 15, 2015
View
@@ -1,6 +1,7 @@
language: node_js
node_js:
- '0.10'
+- '4.2'
services:
- redis-server
script:
@@ -9,19 +9,37 @@ function ClientSession (name, id, accountName, version, transport) {
this.createdAt = Date.now()
this.lastModified = Date.now()
this.name = name
+ this.accountName = accountName
this.id = id
this.subscriptions = {}
this.presences = {}
- this.version = version
+ this.version = version || '0.0.0'
EventEmitter.call(this)
this.transport = transport
this._setupTransport()
+ clientsById[this.id] = this
}
+
inherits(ClientSession, EventEmitter)
ClientSession.prototype._initialize = function (set) {
+ var self = this
+
+ if (set) {
+ Object.keys(set).forEach(function (key) {
+ self[key] = set[key]
+ })
+ }
+
+ if (this.state.can('initialize')) {
+ this.state.initialize()
+ }
+ this.lastModified = Date.now()
+}
+ClientSession.prototype._cleanup = function () {
+ delete clientsById[this.id]
}
// Instance methods
@@ -34,7 +52,9 @@ ClientSession.prototype._initialize = function (set) {
// clientSession.send(message)
ClientSession.prototype.send = function (message) {
- this.transport.send(message)
+ var data = JSON.stringify(message)
+ log.info('#socket.message.outgoing', this.id, data)
+ this.transport.send(data)
}
// Persist subscriptions and presences when not already persisted in memory
@@ -77,11 +97,75 @@ ClientSession.prototype.readData = function (cb) {
ClientSession.prototype._setupTransport = function () {
var self = this
- this.transport && this.transport.on && this.transport.on('message', function emitClientMessage (message) {
- self.emit('message', message)
+ if (!this.transport || !this.transport.on) {
+ return
+ }
+
+ this.transport.on('message', function emitClientMessage (message) {
+ var decoded = self._decodeIncomingMessage(message)
+ if (!decoded) {
+ log.warn('#socket.message.incoming.decode - could not decode')
+ return
+ }
+ log.info('#socket.message.incoming', self.id, decoded)
+
+ switch (self.state.current) {
+ case 'initializing':
+ case 'ready':
+ self._initializeOnNameSync(decoded)
+ self.emit('message', decoded)
+ break
+ }
+ })
+
+ this.transport.on('close', function () {
+ log.info('#socket - disconnect', self.id)
+ self.state.end()
+ })
+}
+
+ClientSession.prototype._initializeOnNameSync = function (message) {
+ if (message.op !== 'nameSync') { return }
+
+ log.info('#socket.message - nameSync', message, this.id)
+
+ this.send({ op: 'ack', value: message && message.ack })
+
+ var association = message.options.association
+
+ ClientNamesById[association.id] = association.name
+ ClientIdsByName[association.name] = this
+
+ log.info('create: association name: ' + association.name +
+ '; association id: ' + association.id)
+
+ // (name, id, accountName, version, transport)
+
+ this._initialize({
+ name: association.name,
+ accountName: message.accountName,
+ clientVersion: message.options.clientVersion
})
}
+ClientSession.prototype._decodeIncomingMessage = function (message) {
+ var decoded
+ try {
+ decoded = JSON.parse(message)
+ } catch (e) {
+ log.warn('#clientSession.message - json parse error', e)
+ return
+ }
+
+ // Format check
+ if (!decoded || !decoded.op) {
+ log.warn('#socket.message - rejected', this.id, decoded)
+ return
+ }
+
+ return decoded
+}
+
ClientSession.prototype._logState = function () {
var subCount = Object.keys(this.subscriptions).length
var presCount = Object.keys(this.presences).length
@@ -153,23 +237,22 @@ function _cloneForStorage (messageIn) {
}
// TODO: move these to SessionManager
+var clientsById = {}
var ClientIdsByName = {} // keyed by name
var ClientNamesById = {} // keyed by id
// (String) => ClientSession
function getClientSessionBySocketId (id) {
- var name = ClientNamesById[id]
- if (name) {
- return ClientIdsByName[name]
- }
+ return clientsById[id]
}
// Set up client name/id association, and return new client instance
-function createClientSessionFromNameSyncMessage (message) {
+function createClientSessionFromNameSyncMessage (message, socket) {
var association = message.options.association
ClientNamesById[association.id] = association.name
+
var clientSession = new ClientSession(association.name, association.id,
- message.accountName, message.options.clientVersion)
+ message.accountName, message.options.clientVersion, socket)
ClientIdsByName[association.name] = clientSession
@@ -179,6 +262,11 @@ function createClientSessionFromNameSyncMessage (message) {
return clientSession
}
+function createClientSessionFromSocket (socket) {
+ return new ClientSession(undefined, socket.id, undefined, undefined, socket)
+}
+
module.exports = ClientSession
module.exports.get = getClientSessionBySocketId
module.exports.create = createClientSessionFromNameSyncMessage
+module.exports.createFromSocket = createClientSessionFromSocket
@@ -1,5 +1,4 @@
var StateMachine = require('javascript-state-machine')
-// var bind = require('lodash.bind')
var bind = function (fn, context) {
return function () {
@@ -16,15 +15,20 @@ module.exports.create = function createClientSessionStateMachine (clientSession)
{name: 'initialize', from: 'initializing', to: 'ready'},
{name: 'leave', from: 'ready', to: 'not ready'},
{name: 'comeback', from: 'not ready', to: 'ready'},
- {name: 'end', from: ['ready', 'not ready'], to: 'ended'},
- {name: 'abort', from: 'initializing', to: 'ended'}
+ {name: 'end', from: ['initializing', 'ready', 'not ready'], to: 'ended'}
],
callbacks: {
- oninitialize: bind(oninitialize, clientSession)
+ oninitialize: bind(oninitialize, clientSession),
+ onend: bind(onend, clientSession)
}
})
}
function oninitialize () {
- console.log('oninitialize', arguments)
+ this.emit('initialize')
+}
+
+function onend () {
+ this._cleanup()
+ this.emit('end')
}
@@ -1,7 +1,7 @@
var MiniEventEmitter = require('miniee')
var logging = require('minilog')('radar:resource')
var Stamper = require('../../stamper.js')
-
+var ClientSession = require('../../../client/client_session')
/*
Resources
@@ -100,11 +100,11 @@ Resource.prototype.redisIn = function (data) {
// Return a socket reference; eio server hash is "clients", not "sockets"
Resource.prototype.socketGet = function (id) {
logging.debug('DEPRECATED: use clientSessionGet instead')
- return this.server.socketServer.clients[id]
+ return this.getClientSession(id)
}
Resource.prototype.getClientSession = function (id) {
- return this.server.socketServer.clients[id]
+ return ClientSession.get(id)
}
Resource.prototype.ack = function (clientSession, sendAck) {
@@ -18,7 +18,7 @@ var LegacyAuthManager = function () {}
LegacyAuthManager.prototype.onMessage = function (clientSession, message, messageType, next) {
if (!this.isAuthorized(clientSession, message, messageType)) {
logging.warn('#clientSession.message - unauthorized', message, clientSession.id)
-
+ console.log('auth', clientSession.constructor.name)
clientSession.send({
op: 'err',
value: 'auth',
View
@@ -36,6 +36,7 @@
"async": "^1.5.0",
"callback_tracker": "0.1.0",
"engine.io": "1.4.2",
+ "javascript-state-machine": "^2.3.5",
"lodash.bind": "^3.1.0",
"miniee": "0.0.5",
"minilog": "2.0.8",
@@ -47,7 +48,9 @@
},
"devDependencies": {
"chai": "^3.4.1",
+ "chai-interface": "^2.0.2",
"mocha": "^2.3.3",
+ "proxyquire": "^1.7.3",
"radar_client": "^0.14.6",
"simple_sentinel": "^0.1.8",
"sinon": "^1.17.2",
Oops, something went wrong.

0 comments on commit 1b4be6c

Please sign in to comment.