Skip to content

Commit

Permalink
Implemented publish / subscribe messages.
Browse files Browse the repository at this point in the history
  • Loading branch information
Tiago Alves committed Oct 16, 2014
1 parent f5448be commit ecef57c
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 5 deletions.
44 changes: 41 additions & 3 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var sharingEnd = require('./middleware/fileShare').sharingEnd;
var sharingCancel = require('./middleware/fileShare').sharingCancel;

var mediaStream = require('./middleware/mediaStream');
var publishMessage = require('./middleware/publishMessage');
var signalingMessage = require('./middleware/signalingMessage');

var RpcManager = require('./rpcManager/RpcManager');
Expand Down Expand Up @@ -222,6 +223,7 @@ MuzzleyClient.prototype.__getMiddleware = function (type) {
middlewareFunctions.push(this.idlingMonitor.middleware);
middlewareFunctions.push(hb);
middlewareFunctions.push(this.rpcManager.handleResponse.bind(this.rpcManager));
middlewareFunctions.push(publishMessage);
middlewareFunctions.push(transformControl);
middlewareFunctions.push(setupComponent);
middlewareFunctions.push(activityTerminated);
Expand All @@ -238,6 +240,7 @@ MuzzleyClient.prototype.__getMiddleware = function (type) {
middlewareFunctions.push(this.idlingMonitor.middleware);
middlewareFunctions.push(hb);
middlewareFunctions.push(this.rpcManager.handleResponse.bind(this.rpcManager));
middlewareFunctions.push(publishMessage);
middlewareFunctions.push(participantJoin);
middlewareFunctions.push(widgetAction);
middlewareFunctions.push(participantQuit);
Expand Down Expand Up @@ -513,7 +516,7 @@ MuzzleyClient.prototype.initApp = function(options, callback) {
loginApp: {
token: options.token
},
// FIXME! VERY BAD DESIGN DECISION
// TODO FIXME! VERY BAD DESIGN DECISION
create: options.create
};
return this.__initCommon(TYPE_APP, opts, callback);
Expand All @@ -527,7 +530,7 @@ MuzzleyClient.prototype.initUser = function(options, callback) {
loginUser: {
token: options.token
},
// FIXME! VERY BAD DESIGN DECISION
// TODO FIXME! VERY BAD DESIGN DECISION
join: options.join
};
return this.__initCommon(TYPE_USER, opts, callback);
Expand Down Expand Up @@ -574,8 +577,8 @@ MuzzleyClient.prototype.__initCommon = function(type, options, callback) {
}
return callback(err);
}

if (!performJoinOrCreate) {
self.trigger('connect'); // Backward compatibility
return callback();
}

Expand Down Expand Up @@ -688,6 +691,34 @@ MuzzleyClient.prototype.create = function (options, callback) {
});
};

MuzzleyClient.prototype.subscribe = function (options, callback) {
var self = this;
callback = callback || function () {};

var channel = options._channel || new Channel({ remoteCalls: self.remoteCalls });
self.remoteCalls.subscribe(options, function (err, muzzData) {
if (err) {
channel.trigger('error', err);
return callback(err);
}

self.__handleSubscribeResponse(options, channel, muzzData.d, callback);
});
return channel;
};

MuzzleyClient.prototype.publish = function (options, callback) {
var self = this;
callback = callback || function () {};

self.remoteCalls.publish(options, function (err, muzzData) {
if (err) {
return callback(err);
}
return callback(null, muzzData);
});
};

/**
* Private. Handles a channel creation response.
*
Expand Down Expand Up @@ -794,6 +825,13 @@ MuzzleyClient.prototype.__handleCreateResponse = function (options, channel, res
// }
};

MuzzleyClient.prototype.__handleSubscribeResponse = function (options, channel, responseData, callback) {
var self = this;
self.__handleChannelCreation(channel, responseData.channel);
channel.trigger('subscribe');
return callback(null, channel);
};

MuzzleyClient.prototype.__handleChannelReconnect = function () {
var self = this;
var channels = this.channelManager.getAll();
Expand Down
71 changes: 71 additions & 0 deletions lib/middleware/publishMessage.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
var messageTypes = require('../utils/messageTypes');
var PubSubMessage = require('../models/PubSubMessage');

//
// publishMessage middleware
// Intercepts custom publish messages
//
// Emits events:
// - message: function (type, data[, callback])
// - callback: function (success[, message, data]).
// Only if the message is an RPC request.
function publishMessage(muzzData, next){
var _this = this;

muzzData = muzzData || {};
if (!muzzData.h || !muzzData.h.t || !muzzData.d) {
return next(muzzData);
}

if (muzzData.a === 'publish') {
var msgType = muzzData.h.t;

var channel = _this.channelManager.get(muzzData.h.ch);

var pubSubMessage = new PubSubMessage({ raw: muzzData });

if (channel) {
switch (msgType) {
case messageTypes.MESSAGE_TYPE_REQUEST:
// RPC message, requires a response
channel.trigger('message', pubSubMessage, function (success, message, data) {

if (typeof success === 'boolean') {

if (typeof message === 'object' && arguments.length === 2) {
// (success, data) provided
data = message;
message = '';
}

} else if (typeof success === 'string') {
// (message, data) provided
data = message;
message = success;
success = true;
} else if (typeof success === 'object') {
// (data) provided
data = success;
message = '';
success = true;
} else {
// No arguments provided
data = {};
message = '';
success = true;
}

_this.remoteCalls.response(muzzData.h, success, message, data);
});
return;
case messageTypes.MESSAGE_TYPE_SIGNAL:
channel.trigger('message', pubSubMessage);
return;
}
}
}

return next(muzzData);
}

module.exports = publishMessage;
4 changes: 3 additions & 1 deletion lib/models/Channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ Channel.prototype.getCreationOptions = function () {
return this._creationOptions;
};


Channel.prototype.unsubscribe = function (callback) {
this._remoteCalls.unsubscribe(callback);
};

exports = module.exports = Channel;
32 changes: 32 additions & 0 deletions lib/models/PubSubMessage.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
var PubSubMessage = function (options) {
this.raw = options.raw;
};

PubSubMessage.prototype.getRawMessage = function () {
return this.raw;
};

PubSubMessage.prototype.getRawMessageData = function () {
return this.getRawMessage().d;
};

PubSubMessage.prototype.getNamespace = function () {
return this.getRawMessageData().ns;
};

PubSubMessage.prototype.getPayload = function () {
return this.getRawMessageData().p;
};

/**
* Get the information of the user that published the message.
* This info is only available for messages published by users
* and not apps.
*
* @return {object} The user info.
*/
PubSubMessage.prototype.getUser = function () {
return this.getRawMessageData().u;
};

exports = module.exports = PubSubMessage;
44 changes: 44 additions & 0 deletions lib/remoteCalls/RemoteCalls.js
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,50 @@ RemoteCalls.prototype.init = function (options, callback) {
this.__rpcRequest(initMessage, callback);
};

RemoteCalls.prototype.__getSubscribeMessage = function (options) {
var message = {
a: 'subscribe',
d: {
ns: options.namespace,
p: options.payload
}
};
return message;
};

RemoteCalls.prototype.subscribe = function (options, callback) {
var message = this.__getSubscribeMessage(options);
this.__rpcRequest(message, callback);
};

RemoteCalls.prototype.__getUnsubscribeMessage = function () {
var message = {
a: 'unsubscribe'
};
return message;
};

RemoteCalls.prototype.unsubscribe = function (callback) {
var message = this.__getUnsubscribeMessage();
this.__rpcRequest(message, callback);
};

RemoteCalls.prototype.__getPublishMessage = function (options) {
var message = {
a: 'publish',
d: {
ns: options.namespace,
p: options.payload
}
};
return message;
};

RemoteCalls.prototype.publish = function (options, callback) {
var message = this.__getPublishMessage(options);
this.__rpcRequest(message, callback);
};

/**
* Sends the `changeWidget` signaling message to one or multiple participants.
* function (widget[, params][, pid], callback).
Expand Down
4 changes: 4 additions & 0 deletions lib/rpcManager/RpcManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ RpcManager.prototype.handleResponse = function (muzzData, next) {
RpcManager.prototype.makeRequest = function (message, responseCallback){
var self = this;

if (typeof responseCallback !== 'function') {
responseCallback = function () {};
}

var correlationId = self.cidCount += 1;

// Timeout
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "muzzley-client",
"version": "0.4.0",
"version": "0.5.0",
"protocolVersion": "2.0.0",
"author": "muzzley <support@muzzley.com>",
"description": "The muzzley client library",
Expand Down

0 comments on commit ecef57c

Please sign in to comment.