Skip to content

Commit

Permalink
Fix bug with disconnected subscribers. The pipe queue was keeping the…
Browse files Browse the repository at this point in the history
…m, and automatic unsubscribe messages were failing in manual mode because m_lastPipe was not set correctly. "Tested in production."
  • Loading branch information
buybackoff committed Nov 17, 2015
1 parent 198a728 commit d53e7f8
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 16 deletions.
2 changes: 1 addition & 1 deletion src/NetMQ/Core/Patterns/Utils/MultiTrie.cs
Expand Up @@ -155,7 +155,7 @@ private bool RemoveHelper([NotNull] Pipe pipe, [NotNull] byte[] buffer, int buff
// Remove the subscription from this node.
if (m_pipes != null && m_pipes.Remove(pipe) && m_pipes.Count == 0)
{
func(null, buffer, bufferSize, arg);
func(pipe, buffer, bufferSize, arg);
m_pipes = null;
}

Expand Down
28 changes: 13 additions & 15 deletions src/NetMQ/Core/Patterns/XPub.cs
Expand Up @@ -75,8 +75,7 @@ public XPubSession([NotNull] IOThread ioThread, bool connect, [NotNull] SocketBa
/// List of pending (un)subscriptions, ie. those that were already
/// applied to the trie, but not yet received by the user.
/// </summary>
private readonly Queue<Msg> m_pendingMessages;
private readonly Queue<Pipe> m_pendingPipes;
private readonly Queue<KeyValuePair<Msg,Pipe>> m_pendingMessages;

private static readonly MultiTrie.MultiTrieDelegate s_markAsMatching;
private static readonly MultiTrie.MultiTrieDelegate s_sendUnsubscription;
Expand Down Expand Up @@ -107,7 +106,7 @@ static XPub()
unsubMsg[0] = 0;
unsubMsg.Put(data, 1, size);
self.m_pendingMessages.Enqueue(unsubMsg);
self.m_pendingMessages.Enqueue(new KeyValuePair<Msg, Pipe>(unsubMsg, pipe));
}
};
}
Expand All @@ -122,8 +121,7 @@ public XPub([NotNull] Ctx parent, int threadId, int socketId)

m_subscriptions = new MultiTrie();
m_distribution = new Distribution();
m_pendingMessages = new Queue<Msg>();
m_pendingPipes = new Queue<Pipe>();
m_pendingMessages = new Queue<KeyValuePair<Msg, Pipe>>();
}

/// <summary>
Expand Down Expand Up @@ -174,8 +172,7 @@ protected override void XReadActivated(Pipe pipe)
{
if (m_manual)
{
m_pendingMessages.Enqueue(sub);
m_pendingPipes.Enqueue(pipe);
m_pendingMessages.Enqueue(new KeyValuePair<Msg,Pipe>(sub, pipe));
}
else
{
Expand All @@ -187,7 +184,7 @@ protected override void XReadActivated(Pipe pipe)
// passed to used on next recv call.
if (m_options.SocketType == ZmqSocketType.Xpub && (unique || m_verbose))
{
m_pendingMessages.Enqueue(sub);
m_pendingMessages.Enqueue(new KeyValuePair<Msg, Pipe>(sub, null));
}
else
{
Expand All @@ -198,13 +195,13 @@ protected override void XReadActivated(Pipe pipe)
}
else if (m_broadcastEnabled && size > 0 && sub[0] == 2)
{
m_pendingMessages.Enqueue(sub);
m_pendingPipes.Enqueue(pipe);
m_pendingMessages.Enqueue(new KeyValuePair<Msg, Pipe>(sub, pipe));
isBroadcast = true;
}
else // process message unrelated to sub/unsub
{
m_pendingMessages.Enqueue(sub);
// pipe is null here, no special treatment
m_pendingMessages.Enqueue(new KeyValuePair<Msg, Pipe>(sub, null));
}

}
Expand Down Expand Up @@ -359,19 +356,20 @@ protected override bool XRecv(ref Msg msg)
if (m_pendingMessages.Count == 0)
return false;
msg.Close();
msg = m_pendingMessages.Dequeue();
var msgPipe = m_pendingMessages.Dequeue();
msg = msgPipe.Key;
// must check if m_lastPipe == null to avoid dequeue at the second frame of a broadcast message
if (m_pendingPipes.Count > 0 && m_lastPipe == null)
if (msgPipe.Value != null && m_lastPipe == null)
{
if (m_broadcastEnabled && msg[0] == 2)
{
m_lastPipeIsBroadcast = true;
m_lastPipe = m_pendingPipes.Dequeue();
m_lastPipe = msgPipe.Value;
}
if (m_manual && (msg[0] == 0 || msg[0] == 1))
{
m_lastPipeIsBroadcast = false;
m_lastPipe = m_pendingPipes.Dequeue();
m_lastPipe = msgPipe.Value;
}
}
return true;
Expand Down

0 comments on commit d53e7f8

Please sign in to comment.