Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pub/sub bugs #43

Closed
ak47xp opened this issue Jul 17, 2010 · 5 comments
Closed

pub/sub bugs #43

ak47xp opened this issue Jul 17, 2010 · 5 comments

Comments

@ak47xp
Copy link

ak47xp commented Jul 17, 2010

There are bugs in the v2.0.7 code that result in problems with receiving multi-part PUB/SUB messages as described in the article. The simplest case is a two-part message with the first part containing a topic and the second part containing a payload.

  1. First received message becomes lost since the topic part is skipped over in the code that accounts for a new peer (pgm_receiver.cpp),

// We have to move data to the beginning of the first message.
data += offset;
received -= offset;

Payload part is then treated as a topic part and gets discarded by the filter.

  1. If the publisher terminates without waiting for zmq thread to drain its send queue and then restarts, this may result in concatenation of the topic part belonging to last message from the old instance and payload part of first message from the new instance. Topic part of the new message gets ignored and the payload part gets attached to the stale topic part, which, apparently, can linger in the decoder. Note that this may assign the payload to a wrong topic if the publisher restarts with a message to a topic different from the previous.
@hurtonm
Copy link
Member

hurtonm commented Jul 17, 2010

Can you please pull from my repository and test?
http://github.com/hurtonm/zeromq2/commits/multipart_messages_encoder_fix

@ak47xp
Copy link
Author

ak47xp commented Jul 17, 2010

This seems to have fixed the head loss issue, however concatenation issue is still there.

Receiver program:

#include "iostream"
#include "zmq.hpp"
#define NUM_MSG         2048
int main (int argc, char* argv[])
{
        int i, num;
        long more = 1;
        size_t more_size = sizeof(more);

        if (argc < 2) { std::cout << "Receiver arguments: " << std::endl;
                return 1;
        }

        num = (argc < 3) ? NUM_MSG : atoi(argv[2]);

        zmq::context_t ctx (1);
        zmq::socket_t s (ctx, ZMQ_SUB);
        s.bind ("epgm://eth0;224.10.10.10:5555");

        s.setsockopt (ZMQ_SUBSCRIBE, argv[1], std::strlen(argv[1]));

        std::cout << "Subscribed to [" << argv[1] << "], length " << std::strlen(argv[1]) << std::endl;

        for (i = 0; i < num; i++) {
                zmq::message_t hdr;
                zmq::message_t msg;

                s.recv (&hdr);
                s.getsockopt(ZMQ_RCVMORE, &more, &more_size);
                if (more) {
                        s.recv (&msg);
                        std::cout << "Topic [" << ((const char *)hdr.data()) << "] : "
                                  << i << " = " << ((const char *)msg.data())  << std::endl;
                } else
                        std::cout << "Topic [" << argv[1] << "] : "
                                  << i << " header only [" << ((const char *)hdr.data()) << "]" << std::endl;
        }
        return 0;
}

Sender program:

#include "iostream"
#include "sstream"
#include "zmq.hpp"
#define NUM_MSG         1
int main (int argc, char* argv[])
{
    int  i, num;
    long rate = 10000;

    if (argc < 3) {
        std::cout << "Sender arguments:   " << std::endl;
        return 1;
    }

    num =  (argc < 4) ? NUM_MSG : atoi(argv[3]);

    zmq::context_t ctx (1);
    zmq::socket_t s (ctx, ZMQ_PUB);
    s.setsockopt (ZMQ_RATE, &rate, sizeof (rate));
    s.connect ("epgm://eth0;224.10.10.10:5555");

    for (i = 0; i < num; i++) {
           std::ostringstream oss;
           oss << argv[2] << "[" << i << "]";
           zmq::message_t hdr (argv[1], std::strlen(argv[1]) + 1, NULL);
           zmq::message_t msg (oss.str().size() + 1);

           strcpy((char*)msg.data(), oss.str().c_str());

           std::cout << "Sending '" << ((const char*)msg.data()) <<
                        "' to [" << ((const char*)hdr.data()) << "]" << std::endl;
           s.send (hdr, ZMQ_SNDMORE);
           s.send (msg);
    }

    return 0;
}

Test:

Start receiver to wait for a large number of messages, then make sender transmit the same number. Once in a while this leads to the receiver not getting all of the sent messages. When this happens, restart the sender to send a single message to a different topic.

Start receiver:

[ak@linux src]$ /tmp/zrm test 16384

Start sender:

[ak@linux zmq]$ /tmp/zsm test 1111111111111111111111111111 16384 >& /dev/zero

Receiver output until it stops:
...
Topic [test] : 12436 = 11111111111111111111111111111[12436]
Topic [test] : 12437 = 11111111111111111111111111111[12437]
Topic [test] : 12438 = 11111111111111111111111111111[12438]

Then, sender again to a different topic:

[ak@linux zmq]$ /tmp/zsm testing 222222222222222222222222222 1 >& /dev/zero

Receiver:
...
Topic [test] : 12439 = testing
Topic [test] : 12440 header only [22222222222222222222222222222[0]]
Topic [testing] : 12441 = 22222222222222222222222222222[1]

@hurtonm
Copy link
Member

hurtonm commented Jul 17, 2010

Thanks for elaborating. Will look into this.

@tsaubergine
Copy link

Hi -

I am also having this issue where the first N-1 parts of an N part multi-message are being lost on the first message sent over epgm.

Is this issue still being looked into or should I post another example? I am using zeromq version 2.1.4.

@tsaubergine
Copy link

Here's a basic C++ example that shows this problem. Start pgmtest2 (the subscriber), then start pgmtest1. pgmtest2 should show

01
02 
03
01 
02
03

but instead shows

03
01 
02
03 

pgmtest1.cpp (publisher)

int main (int argc, char *argv[])
{
    //  Prepare our context and publisher
    zmq::context_t context (1);
    zmq::socket_t publisher (context, ZMQ_PUB);

    try
    {
    publisher.connect("epgm://eth0;239.255.7.15:11142");
    }
    catch(std::exception& e)
    {
    std::cout << "error: " << e.what() << std::endl;
    }

    int64_t rate = 100;
    publisher.setsockopt(ZMQ_RATE, &rate, sizeof(rate));


    //  int i = 0;

    for(int i = 0, n = 2; i < n; ++i)
    {

    zmq_msg_t part1;
    int rc = zmq_msg_init_size (&part1, 1);
    assert (rc == 0);
    /* Fill in message content with unsigned char 1 */
    memset (zmq_msg_data (&part1), 1, 1);
    /* Send the message to the socket */
    rc = zmq_send (publisher, &part1, ZMQ_SNDMORE);
    assert (rc == 0);

    zmq_msg_t part2;
    rc = zmq_msg_init_size (&part2, 1);
    assert (rc == 0);
/* Fill in message content with unsigned char 2 */
    memset (zmq_msg_data (&part2), 2, 1);
/* Send the message to the socket */
    rc = zmq_send (publisher, &part2, ZMQ_SNDMORE);
    assert (rc == 0);

    zmq_msg_t part3;
    rc = zmq_msg_init_size (&part3, 1);
    assert (rc == 0);
/* Fill in message content with unsigned char 2 */
    memset (zmq_msg_data (&part3), 3, 1);
/* Send the message to the socket */
    rc = zmq_send (publisher, &part3, 0);
    assert (rc == 0);

    }

    return 0;
}

pgmtest2.cpp (subscriber)

    #include <zmq.hpp>
#include <iostream>
#include <sstream>
#include <stdio.h>

int main (int argc, char *argv[])
{
    zmq::context_t context (1);

    //  Socket to talk to server
    zmq::socket_t subscriber (context, ZMQ_SUB);

    try
    {
    subscriber.connect("epgm://eth0;239.255.7.15:11142");
    }
    catch(std::exception& e)
    {
    std::cout << "error: " << e.what() << std::endl;
    }

    subscriber.setsockopt(ZMQ_SUBSCRIBE, 0, 0);

    for(;;)
    {

    zmq::message_t update;
    subscriber.recv(&update);

    std::string in(static_cast<const char*>(update.data()), update.size());
    std::cout << static_cast<int>(in[0]) << std::endl;
    }

    return 0;
}

csrl pushed a commit to exosite-archive/zeromq2 that referenced this issue Dec 22, 2012
drahosp pushed a commit to LuaDist/libzmq that referenced this issue Feb 13, 2014
Updated zeromq3-x version to 3.2.2 for next release
mrvn pushed a commit to mrvn/libzmq that referenced this issue Jul 2, 2015
mrvn pushed a commit to mrvn/libzmq that referenced this issue Jul 2, 2015
Solution: allow brackets in tcp address. Fixes zeromq#43
benjdero pushed a commit to benjdero/libzmq that referenced this issue Feb 20, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants