Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Broadcast feature for XPub socket #401

Merged
merged 9 commits into from Nov 15, 2015

Conversation

buybackoff
Copy link
Contributor

These changes add a broadcast feature that is similar to SignalR's Clients.Others. If the broadcast option is enabled and an XSub socket sends a topic prefixed with 2, then that socket will never be matched on a subsequent Send by XPub. With this feature it is easy to turn a XPub into a message bus, when N XSub clients send a message to every other one - without additional Dealer/Router socket and without complicated check if a sender receives its own message back.

Without XPubBroadcast set to true, there is no change in behavior, so this change is backward compatible.

This PR depends on #400. It also removes unneeded copying of data in XSub by using the offset feature from #400.


namespace NetMQ.Tests
{
namespace NetMQ.Tests {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change the style of the project. Can you fix this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! Actually that test was not related to the PR, but to another issue I asked about. I accidentally added it.

@somdoron
Copy link
Member

Very nice. I think you have a bug, after a subscriber send a broadcast message this subscriber won't receive regular publisher messages. You should set the m_lastPipeIsBroadcast to false after calling XSend (it also good idea to nullify the m_lastPipe to avoid keeping a reference to terminated pipe.

@buybackoff
Copy link
Contributor Author

It is set to false when there is no more message parts here: https://github.com/buybackoff/netmq/blob/feature_broadcast/src/NetMQ/Core/Patterns/XPub.cs#L325

}

sub.Close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why you are not closing the Msg? there is cases the messages should be closed (it is not always enqueued)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@somdoron
Copy link
Member

Also can you rebased it on master so I will only display relevant commits? As this is a big change I want to make sure it doesn't break anything

@buybackoff
Copy link
Contributor Author

An implicit requirement is that a broadcast message is sent from withing publisher's ReceiveReady event. Given that poller is single-threaded, is it possible that a regular send could call XSend after a publisher receives a message but before the publisher sends it inside the event?

private void _publisher_ReceiveReady(object sender, NetMQSocketEventArgs e)
        {
            var socket = e.Socket;
            var id = new Msg();
            id.InitEmpty();
            socket.Receive(ref id);
            Debug.Assert(id.HasMore);
            var request = new Msg();
            request.InitEmpty();
            socket.Receive(ref request);

             // could regular pub call XSend here? They are inside a single thread, it should not be possible

            // resend a message to all clients that are subscribed to the id, except the sender
            _publisher.Send(ref id, true);
            _publisher.Send(ref request, false);
        }

}
}
else if (m_broadcastEnabled && size > 0 && sub.HasMore && sub[0] == 2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you force the HasMore?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because only the first part with the topic is prefixed. The second part, in general, could have 2 as the first byte.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that subscribe/unsubscribe should also work only with the first message. Now if the second message with payload starts with 0/1, a weird thing will happen.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that is not the correct way to check if this is the first frame. What if you have 3 frames? what if it is one frame?

So current NetMQ implementation doesn't support multi-messages from xsub to xpub (ZeroMQ is actually do supporting). Anyway I suggest we keep at the beginning for 1 frame and later add support to more frames.

@somdoron
Copy link
Member

It is not possible...

m_lastPipeIsBroadcast = true;
sub.Offset = sub.Offset + 1;
sub.Size = sub.Size - 1;
m_pending.Enqueue(sub);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we keep the prefix so it behave like other messages? like sub and unsub and the rest.

That way how can we tell if it a broadcast message, subscribe, unsubscribe or something else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regular subscribe/unsubscribe do not have a second frame. Regular send does not add a prefix to a topic. So in regular send, the first frame is a topic, and the second frame is a payload. In the case of the broadcast, topic is always prefixed with 2, but subscribers should not care if a message is a regular one or a broadcasted one. We only exclude the sender in case of a broadcast message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we keep the 2 prefix here, Match won't work and the subsequent logic will be very complicated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PubSub can also work with one frame only... the first part being a topic is not in the code or RFC. just convenient.

Anyway I'm talking about the XPub receive...

So before this change, if you call XPub receive you can get:
If it a subscribe it will be prefix with 0, ubsubscribe with 1 and the rest with something else

Now you actually get a message that you cannot tell what type is it, because you removed the prefix.

The user code that call for Send should remove the prefix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I remove prefix only when sub[0] == 2, so it doesn't affect sub/unsub messages. The only thing it does is marking the sender (last pipe) for exclusion, but the prefix is not needed for anything else. After these lines, the message should look like a regular one to everyone. The sender just won't receive it back.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, don't change the match logic. It will affect too many areas. The user should just remove the prefix before calling the send.

So the user check the prefix byte, if it is 2, remove the prefix and send the message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean that we should change topic here:

private void _publisher_ReceiveReady(object sender, NetMQSocketEventArgs e)
        {
            var socket = e.Socket;
            var id = new Msg();
            id.InitEmpty();
            socket.Receive(ref id);
            Debug.Assert(id.HasMore);
            var request = new Msg();
            request.InitEmpty();
            socket.Receive(ref request);

             // Remove the first byte if it is 2

            // resend a message to all clients that are subscribed to the id, except the sender
            _publisher.Send(ref id, true);
            _publisher.Send(ref request, false);
        }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. We can also add a method to Msg to do this quickly. Like TrimPrefix.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is trivial with these lines

sub.Offset = sub.Offset + 1;
sub.Size = sub.Size - 1;

I have committed the change and updated the test.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is internal, but we can fix this later if needed.

somdoron added a commit that referenced this pull request Nov 15, 2015
Broadcast feature for XPub socket
@somdoron somdoron merged commit 6d9f56e into zeromq:master Nov 15, 2015
@buybackoff
Copy link
Contributor Author

Yay! 😄 Now the clone pattern in NetMQ could be achieved much simpler than with several sockets like shown in the Guide.

@somdoron
Copy link
Member

Great contribution, thanks.
So go ahead and write clone example with this new feature and send a PR :-)

Also consider writing a documentation for this on our documentation page.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants