Skip to content

Commit

Permalink
solved a race condition in unsubscribe within subscribes after receiv…
Browse files Browse the repository at this point in the history
…ing a message and unsubscribing inside message received callback.
  • Loading branch information
stephenlb committed May 23, 2013
1 parent d0e5eb9 commit eb8eb88
Show file tree
Hide file tree
Showing 21 changed files with 410 additions and 1,620 deletions.
50 changes: 23 additions & 27 deletions core/pubnub-common.js
Expand Up @@ -199,7 +199,7 @@ function PN_API(setup) {
, PUB_QUEUE = [] , PUB_QUEUE = []
, SUB_CALLBACK = 0 , SUB_CALLBACK = 0
, SUB_CHANNEL = 0 , SUB_CHANNEL = 0
, SUB_RECEIVER = [] , SUB_RECEIVER = 0
, SUB_RESTORE = 0 , SUB_RESTORE = 0
, SUB_BUFF_WAIT = 0 , SUB_BUFF_WAIT = 0
, TIMETOKEN = 0 , TIMETOKEN = 0
Expand All @@ -219,11 +219,18 @@ function PN_API(setup) {
} }


function each_channel(callback) { function each_channel(callback) {
var count = 0;

each( generate_channel_list(CHANNELS), function(channel) { each( generate_channel_list(CHANNELS), function(channel) {
var chan = CHANNELS[channel]; var chan = CHANNELS[channel];

if (!chan) return; if (!chan) return;
callback(chan);
count++;
(callback||function(){})(chan);
} ); } );

return count;
} }


// Announce Leave Event // Announce Leave Event
Expand Down Expand Up @@ -421,9 +428,6 @@ function PN_API(setup) {
if (READY) SELF['LEAVE']( channel, 0 ); if (READY) SELF['LEAVE']( channel, 0 );
CHANNELS[channel] = 0; CHANNELS[channel] = 0;
} ); } );

// ReOpen Connection if Any Channels Left
if (READY) CONNECT();
}, },


/* /*
Expand All @@ -434,16 +438,16 @@ function PN_API(setup) {
*/ */
'subscribe' : function( args, callback ) { 'subscribe' : function( args, callback ) {
var channel = args['channel'] var channel = args['channel']
, callback = callback || args['callback'] , callback = callback || args['callback']
, callback = callback || args['message'] , callback = callback || args['message']
, connect = args['connect'] || function(){} , connect = args['connect'] || function(){}
, reconnect = args['reconnect'] || function(){} , reconnect = args['reconnect'] || function(){}
, disconnect = args['disconnect'] || function(){} , disconnect = args['disconnect'] || function(){}
, presence = args['presence'] || 0 , presence = args['presence'] || 0
, noheresync = args['noheresync'] || 0 , noheresync = args['noheresync'] || 0
, backfill = args['backfill'] || 0 , backfill = args['backfill'] || 0
, sub_timeout = args['timeout'] || SUB_TIMEOUT , sub_timeout = args['timeout'] || SUB_TIMEOUT
, windowing = args['windowing'] || SUB_WINDOWING , windowing = args['windowing'] || SUB_WINDOWING
, restore = args['restore']; , restore = args['restore'];


// Restore Enabled? // Restore Enabled?
Expand Down Expand Up @@ -544,7 +548,8 @@ function PN_API(setup) {
if (!channels) return; if (!channels) return;


// Connect to PubNub Subscribe Servers // Connect to PubNub Subscribe Servers
SUB_RECEIVER.push( xdr({ _reset_offline();
SUB_RECEIVER = xdr({
timeout : sub_timeout, timeout : sub_timeout,
callback : jsonp, callback : jsonp,
fail : function() { SELF['time'](_test_connection) }, fail : function() { SELF['time'](_test_connection) },
Expand Down Expand Up @@ -605,20 +610,13 @@ function PN_API(setup) {


timeout( CONNECT, windowing ); timeout( CONNECT, windowing );
} }
})); });
}
function CLOSE_PREVIOUS_SUB() {
while (SUB_RECEIVER.length) {
(SUB_RECEIVER.shift())();
}
} }


CONNECT = function() { CONNECT = function() {
CLOSE_PREVIOUS_SUB();
_connect(); _connect();
}; };



// Reduce Status Flicker // Reduce Status Flicker
if (!READY) return READY_BUFFER.push(CONNECT); if (!READY) return READY_BUFFER.push(CONNECT);


Expand Down Expand Up @@ -684,9 +682,7 @@ function PN_API(setup) {
} }


function _reset_offline() { function _reset_offline() {
while (SUB_RECEIVER.length) { SUB_RECEIVER && SUB_RECEIVER();
(SUB_RECEIVER.shift())();
}
} }


if (!UUID) UUID = SELF['uuid'](); if (!UUID) UUID = SELF['uuid']();
Expand Down
50 changes: 23 additions & 27 deletions modern/pubnub.js
Expand Up @@ -200,7 +200,7 @@ function PN_API(setup) {
, PUB_QUEUE = [] , PUB_QUEUE = []
, SUB_CALLBACK = 0 , SUB_CALLBACK = 0
, SUB_CHANNEL = 0 , SUB_CHANNEL = 0
, SUB_RECEIVER = [] , SUB_RECEIVER = 0
, SUB_RESTORE = 0 , SUB_RESTORE = 0
, SUB_BUFF_WAIT = 0 , SUB_BUFF_WAIT = 0
, TIMETOKEN = 0 , TIMETOKEN = 0
Expand All @@ -220,11 +220,18 @@ function PN_API(setup) {
} }


function each_channel(callback) { function each_channel(callback) {
var count = 0;

each( generate_channel_list(CHANNELS), function(channel) { each( generate_channel_list(CHANNELS), function(channel) {
var chan = CHANNELS[channel]; var chan = CHANNELS[channel];

if (!chan) return; if (!chan) return;
callback(chan);
count++;
(callback||function(){})(chan);
} ); } );

return count;
} }


// Announce Leave Event // Announce Leave Event
Expand Down Expand Up @@ -422,9 +429,6 @@ function PN_API(setup) {
if (READY) SELF['LEAVE']( channel, 0 ); if (READY) SELF['LEAVE']( channel, 0 );
CHANNELS[channel] = 0; CHANNELS[channel] = 0;
} ); } );

// ReOpen Connection if Any Channels Left
if (READY) CONNECT();
}, },


/* /*
Expand All @@ -435,16 +439,16 @@ function PN_API(setup) {
*/ */
'subscribe' : function( args, callback ) { 'subscribe' : function( args, callback ) {
var channel = args['channel'] var channel = args['channel']
, callback = callback || args['callback'] , callback = callback || args['callback']
, callback = callback || args['message'] , callback = callback || args['message']
, connect = args['connect'] || function(){} , connect = args['connect'] || function(){}
, reconnect = args['reconnect'] || function(){} , reconnect = args['reconnect'] || function(){}
, disconnect = args['disconnect'] || function(){} , disconnect = args['disconnect'] || function(){}
, presence = args['presence'] || 0 , presence = args['presence'] || 0
, noheresync = args['noheresync'] || 0 , noheresync = args['noheresync'] || 0
, backfill = args['backfill'] || 0 , backfill = args['backfill'] || 0
, sub_timeout = args['timeout'] || SUB_TIMEOUT , sub_timeout = args['timeout'] || SUB_TIMEOUT
, windowing = args['windowing'] || SUB_WINDOWING , windowing = args['windowing'] || SUB_WINDOWING
, restore = args['restore']; , restore = args['restore'];


// Restore Enabled? // Restore Enabled?
Expand Down Expand Up @@ -545,7 +549,8 @@ function PN_API(setup) {
if (!channels) return; if (!channels) return;


// Connect to PubNub Subscribe Servers // Connect to PubNub Subscribe Servers
SUB_RECEIVER.push( xdr({ _reset_offline();
SUB_RECEIVER = xdr({
timeout : sub_timeout, timeout : sub_timeout,
callback : jsonp, callback : jsonp,
fail : function() { SELF['time'](_test_connection) }, fail : function() { SELF['time'](_test_connection) },
Expand Down Expand Up @@ -606,20 +611,13 @@ function PN_API(setup) {


timeout( CONNECT, windowing ); timeout( CONNECT, windowing );
} }
})); });
}
function CLOSE_PREVIOUS_SUB() {
while (SUB_RECEIVER.length) {
(SUB_RECEIVER.shift())();
}
} }


CONNECT = function() { CONNECT = function() {
CLOSE_PREVIOUS_SUB();
_connect(); _connect();
}; };



// Reduce Status Flicker // Reduce Status Flicker
if (!READY) return READY_BUFFER.push(CONNECT); if (!READY) return READY_BUFFER.push(CONNECT);


Expand Down Expand Up @@ -685,9 +683,7 @@ function PN_API(setup) {
} }


function _reset_offline() { function _reset_offline() {
while (SUB_RECEIVER.length) { SUB_RECEIVER && SUB_RECEIVER();
(SUB_RECEIVER.shift())();
}
} }


if (!UUID) UUID = SELF['uuid'](); if (!UUID) UUID = SELF['uuid']();
Expand Down

0 comments on commit eb8eb88

Please sign in to comment.