Skip to content

Commit

Permalink
fix(start): transport started when HTTP server started
Browse files Browse the repository at this point in the history
  • Loading branch information
thepian committed Sep 16, 2015
1 parent 6070de8 commit 159e8d5
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 91 deletions.
22 changes: 9 additions & 13 deletions lib/socketstream.js
Expand Up @@ -188,25 +188,21 @@ function stream(httpServer) {
api.log.info('Starting SocketStream %s in %s mode...'.green, version, env);
var server = getServer();
server.httpServer = httpServer;
}

var loaded = false;

function load() {
if (loaded) {return;}
loaded = true;

var server = getServer();

// Bind responders to websocket
ws.load(server.httpServer, server.responders, server.eventTransport, api.session.options);
}

// Append SocketStream middleware to stack
http.load(client.options.dirs['static'], client.options.dirs['assets'], session.store.get(), session.options);
var loaded = false;

// Load Client Asset Manager
client.load();
function load() {
if (!loaded) {
// Append SocketStream middleware to stack
http.load(client.options.dirs['static'], client.options.dirs['assets'], session.store.get(), session.options);

// Load Client Asset Manager
client.load();
}
}

function unload() {
Expand Down
14 changes: 7 additions & 7 deletions lib/websocket/event_dispatcher.js
Expand Up @@ -7,9 +7,12 @@ var subscriptions = require('./subscriptions');

module.exports = function(eventTransport, wsTransport, emitter) {
return eventTransport.listen(function(obj) {
var cb, send;
send = wsTransport.event();
cb = (function() {
var send = wsTransport.event();

// Emit message to the event responder (always Responder ID 0)
return emitter.emit('0', obj, {}, andThen());

function andThen() {
switch (obj.t) {
case 'all':
return function(msg) {
Expand All @@ -28,10 +31,7 @@ module.exports = function(eventTransport, wsTransport, emitter) {
return sendToMultiple(send, msg, obj.users, 'user');
};
}
})();

// Emit message to the event responder (always Responder ID 0)
return emitter.emit('0', obj, {}, cb);
}
});
};

Expand Down
10 changes: 5 additions & 5 deletions lib/websocket/index.js
Expand Up @@ -9,26 +9,26 @@ var EventEmitter2 = require('eventemitter2').EventEmitter2,
});

module.exports = function(ss) {

// Return API
var transport = require('./transport')(ss, emitter);

return {
transport: transport,
unload: function() {},
load: function(httpServer, responders, eventTransport, sessionOptions) {
var thisTransport = transport.load(httpServer, sessionOptions);

// Dispatch incoming events to websocket clients
// Dispatch incoming events to websocket clients
require('./event_dispatcher')(eventTransport, thisTransport, emitter);

// Listen to incoming requests and invoke server.request
// Listen to incoming requests and invoke server.request
for (var id in responders) {
var responder = responders[id];
emitter.on(id, responder.interfaces.websocket);
}

// Return active WS transport
// Return active WS transport
return thisTransport;
}
};
Expand Down
25 changes: 12 additions & 13 deletions lib/websocket/transport.js
Expand Up @@ -5,28 +5,27 @@
'use strict';

module.exports = function(ss, emitter) {
var config, transport;
transport = null;
config = {};
var config = {}, transport;

return {
use: function(nameOrModule, cfg) {
var modPath;
if (cfg == null) {
cfg = {};
}
config = cfg;
return (transport = (function() {
if (typeof nameOrModule === 'function') {
return nameOrModule;
transport = null;

if (typeof nameOrModule === 'function') {
return nameOrModule;
} else {
modPath = './transports/' + nameOrModule;
if (require.resolve(modPath)) {
return (transport = require(modPath));
} else {
modPath = './transports/' + nameOrModule;
if (require.resolve(modPath)) {
return require(modPath);
} else {
throw new Error('Unable to find the \'' + nameOrModule + '\' websocket transport internally');
}
throw new Error('Unable to find the \'' + nameOrModule + '\' websocket transport internally');
}
})());
}
},
load: function(httpServer, sessionOptions) {
if (transport == null) {
Expand Down
101 changes: 50 additions & 51 deletions lib/websocket/transports/engineio/index.js
@@ -1,8 +1,7 @@
// Engine.IO server-side wrapper
'use strict';
var c = function(){console.log.apply(console, arguments);};
var
cookieParser = require('cookie-parser'),

var cookieParser = require('cookie-parser'),
fs = require('fs'),
qs = require('querystring'),
engine = require('engine.io'),
Expand All @@ -25,49 +24,55 @@ var processSession = function(socket, secret) {
};

module.exports = function(ss, messageEmitter, httpServer, config, sessionOptions){
var clientFileName, engineioClient, code, ws;
config = config || {};
config.server = config.server || {};
config.client = config.client || {};

clientFileName = '/client.js';

// Send Engine.IO client-side code
engineioClient = fs.readFileSync(__dirname + clientFileName, 'utf8');
var clientFileName = '/client.js';
var engineioClient = fs.readFileSync(__dirname + clientFileName, 'utf8');
ss.client.send('lib', 'engine.io-client', engineioClient, {minified: false}); // tested this with minified: true ; worked fine.

// Send socketstream-transport module
code = fs.readFileSync(__dirname + '/wrapper.js', 'utf8');
var code = fs.readFileSync(__dirname + '/wrapper.js', 'utf8');
ss.client.send('mod', 'socketstream-transport', code);

// Tell the SocketStream client to use this transport, passing any client-side config along to the wrapper
ss.client.send('code', 'transport', "require('socketstream').assignTransport(" + JSON.stringify(config.client) + ");");

// Return API for sending events
// Note the '0' message prefix (which signifies the responderId) is reserved for sending events
var api = { event: eventFn };

// don't set up server for CLI and test
if (httpServer == null) return;
if (httpServer == null) return api;

// Create a new Engine.IO server and bind to /ws
ws = engine.attach(httpServer, config.server);
var ws = engine.attach(httpServer, config.server);
// ws.installHandlers(httpServer, {prefix: '/ws'});

// Handle incoming connections
ws.on('connection', function(socket) {
ws.on('connection', onConnection);

return api;

function onConnection(socket) {

if (processSession(socket, sessionOptions.secret)) {
// Store this here before it gets cleaned up after the websocket upgrade
socket.remoteAddress = socket.request.connection.remoteAddress;
// Store this here before it gets cleaned up after the websocket upgrade
socket.remoteAddress = socket.request.connection.remoteAddress;

// Get real IP if behind proxy
var xForwardedFor = socket.request.headers['x-forwarded-for'];
if (xForwardedFor) {
socket.remoteAddress = xForwardedFor.split(',')[0];
}
// Get real IP if behind proxy
var xForwardedFor = socket.request.headers['x-forwarded-for'];
if (xForwardedFor) {
socket.remoteAddress = xForwardedFor.split(',')[0];
}

// Allow this connection to be addressed by the socket ID
openSocketsById[socket.id] = socket;
ss.session.find(socket.sessionId, socket.id, function(session){
socket.send('X|OK');
});
// Allow this connection to be addressed by the socket ID
openSocketsById[socket.id] = socket;
ss.session.find(socket.sessionId, socket.id, function(session){
socket.send('X|OK');
});

// changed from data
socket.on('message', function(msg) {
Expand Down Expand Up @@ -98,39 +103,33 @@ module.exports = function(ss, messageEmitter, httpServer, config, sessionOptions
}
});
}
});

// Return API for sending events
// Note the '0' message prefix (which signifies the responderId) is reserved for sending events
return {

event: function() {
}

return {
function eventFn() {
return {

// Send the same message to every open socket
all: function(msg) {
for (var id in openSocketsById) {
if (openSocketsById.hasOwnProperty(id)) {
openSocketsById[id].send('0|' + msg + '|null');
}
// Send the same message to every open socket
all: function(msg) {
for (var id in openSocketsById) {
if (openSocketsById.hasOwnProperty(id)) {
openSocketsById[id].send('0|' + msg + '|null');
}
},

// Send an event to a specific socket
// Note: 'meta' is typically the name of the channel
socketId: function(id, msg, meta) {
var socket = openSocketsById[id];
if (socket) {
return socket.send('0|' + msg + '|' + meta);
} else {
return false;
}

}

},

// Send an event to a specific socket
// Note: 'meta' is typically the name of the channel
socketId: function(id, msg, meta) {
var socket = openSocketsById[id];
if (socket) {
return socket.send('0|' + msg + '|' + meta);
} else {
return false;
}
}
}
}
}

function c(){console.log.apply(console, arguments);}

3 changes: 2 additions & 1 deletion test/unit/client/formatters.test.js
Expand Up @@ -185,7 +185,8 @@ var ss = require( '../../fixtures/socketstream'),
ss.api.bundler.packAssetSet('js', client,
function(files) {
files[3].content.should.equal('require.define("/abc/index",function(){window.a="formatter index.a"});');
var outs = logHook.off();
// var outs =
logHook.off();
//outs.should.match(/Minified .\/abc\/index.a from 0.121 KB to 0.076 KB/);
},
done);
Expand Down
2 changes: 1 addition & 1 deletion test/unit/tasks/index.test.js
Expand Up @@ -78,5 +78,5 @@ describe('start tasks plan', function () {

should(typeof plan.callback).be.equal('function');
plan.targets.should.eql(['pack-all']);
})
});
});

0 comments on commit 159e8d5

Please sign in to comment.