Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Crash in zmq::msg_t::data { check () } During zmq::xsub_t::match Call #4735

Open
hao47825148 opened this issue Aug 29, 2024 · 0 comments
Open

Comments

@hao47825148
Copy link

Issue description

I encountered a crash when using ZeroMQ, specifically within the zmq::msg_t::data function as it's called by zmq::xsub_t::match. Below is the backtrace captured with GDB, which shows the program aborting after a failed assertion check in ZeroMQ's error handling.

Backtrace:

#0  0x0000ffff929a3330 in raise () from /lib64/libc.so.6
#1  0x0000ffff92990b54 in abort () from /lib64/libc.so.6
#2  0x0000ffff96e842ac in zmq::zmq_abort (errmsg_=0xffff96ed6220 "check ()") at src/err.cpp:88
#3  0x0000ffff96e9b664 in zmq::msg_t::data (this=0xffff0e2b14a8) at src/msg.cpp:327
#4  0x0000ffff96ecd564 in zmq::xsub_t::match (this=0xffff0e2b0e00, msg_=0xffff0e2b14a8) at src/xsub.cpp:228
#5  0x0000ffff96ecd7bc in zmq::xsub_t::xhas_in (this=0xffff0e2b0e00) at src/xsub.cpp:212
#6  0x0000ffff96eb3b1c in zmq::socket_base_t::has_in (this=0xffff0e2b0e00) at src/socket_base.cpp:442
#7  0x0000ffff96eb3b1c in zmq::socket_base_t::has_in (this=0xffff0e2b0e00) at src/socket_base.cpp:442
#8  zmq::socket_base_t::getsockopt (this=0xffff0e2b0e00, 
    option_=option_@entry=15, optval_=optval_@entry=0xffff0bc1556c, 
    optvallen_=optvallen_@entry=0xffff0bc15570) at src/socket_base.cpp:459
        rc = <optimized out>
        sync_lock = {_mutex = 0x0}
#9  0x0000ffff96ecde10 in zmq_getsockopt (s_=<optimized out>, 
    option_=option_@entry=15, optval_=optval_@entry=0xffff0bc1556c, 
    optvallen_=optvallen_@entry=0xffff0bc15570) at src/zmq.cpp:267
        s = <optimized out>
#10 0x0000ffff96ecf630 in zmq_poll (items_=0xffff0bc156d0, nitems_=2, 
    timeout_=100) at src/zmq.cpp:949
        zmq_events_size = 4
        zmq_events = 0
        i = 1
        timeout = <optimized out>
        clock = {_last_tsc = 13320203365, _last_time = 13320}
        now = 0
        end = 0
        pollfds = {_static_buf = {{fd = 240, events = 1, revents = 1}, {
              fd = 242, events = 1, revents = 0}, {fd = -1711063040, 
              events = -1, revents = 0}, {fd = 2132691952, events = -1, 
              revents = 0}, {fd = -1711405236, events = -1, revents = 0}, {
              fd = -1711419368, events = -1, revents = 0}, {fd = -1711417008, 
              events = -1, revents = 0}, {fd = 197220080, events = -1, 
              revents = 0}, {fd = 0, events = 0, revents = 0}, {
              fd = 1852731235, events = 25445, revents = 116}, {
              fd = 1242584800, events = -1, revents = 0}, {fd = 1242584768, 
              events = -1, revents = 0}, {fd = 2020293344, events = -1, 
              revents = 0}, {fd = 2000, events = -256, revents = 0}, {
              fd = -1711063040, events = -1, revents = 0}, {fd = 248382608, 
              events = 0, revents = 0}}, _buf = 0xffff0bc15588}
        first_pass = true
        nevents = 0
#11 0x0000ffff99fe0894 in ZmqSubscriber ::listen_thread()  from /opt/gaia/lib/libtest.so
  • libzmq version (commit hash if unreleased): 4.3.2
  • OS: Linux

Minimal test code / Steps to reproduce the issue

class ZmqSubscriber {
public:
    ZmqSubscriber() {}

    // Initialization function, creates sockets in the main thread
    int init() {
        ZmqContext::ctx_lock();
        // Subscriber socket created in the main thread
        _subscriber = zmq::socket_t(_zmq_context, ZMQ_SUB);
        _subscriber.setsockopt(ZMQ_SNDHWM, 8192);
        _subscriber.setsockopt(ZMQ_RCVHWM, 8192);
        _subscriber.setsockopt(ZMQ_LINGER, 0);

        std::stringstream proc_ss;
        uint64_t this_p = (uint64_t)this;
        proc_ss << "inproc://event_" << this_p;
        // Event publisher socket created in the main thread
        _event_pub = zmq::socket_t(*ZmqContext::instance()->ctx(), ZMQ_PUB);
        _event_pub.bind(proc_ss.str());

        // Event subscriber socket created in the main thread
        _event_sub = zmq::socket_t(*ZmqContext::instance()->ctx(), ZMQ_SUB);
        _event_sub.connect(proc_ss.str());
        _event_sub.setsockopt(ZMQ_SUBSCRIBE, "", 0);
        _event_sub.setsockopt(ZMQ_LINGER, 0);

        ZmqContext::ctx_unlock();

        _running = true;
        // Listener thread started for asynchronous processing
        _listen_thread = os::Thread(&ZmqSubscriber::listen_thread, this);
        return 0;
    };

    int release() {
        if (_running) {
            _running = false;
            if (_listen_thread.joinable()) {
                _listen_thread.join();
            }
            _subscriber.close();
            _event_pub.close();
            _event_sub.close();
        }
        return 0;
    };

private:
    // Background listener thread to handle all incoming events
    void listen_thread() {
        
        zmq::pollitem_t zmq_pool_item[] = {{_subscriber, 0, ZMQ_POLLIN, 0},
                                           {_event_sub, 0, ZMQ_POLLIN, 0}};

        while (_running) {
            try {
                int rc = zmq::poll(zmq_pool_item, 2, 100);
                if (rc < 0) {
                    continue;
                }

                if (zmq_pool_item[0].revents & ZMQ_POLLIN) {
                    process_msg(); // Process received messages
                }
                if (zmq_pool_item[1].revents & ZMQ_POLLIN) {
                    process_event(); // Process received events
                }
            } catch (const std::exception &e) {
                break;
            }
        }
    };

    void process_msg();
    void process_event();

private:
    zmq::socket_t _subscriber;   // Subscriber socket
    zmq::socket_t _event_pub;    // Event publisher socket
    zmq::socket_t _event_sub;    // Event subscriber socket
};

In the ZmqSubscriber class, sockets are created in the main thread within the init() method. This includes a subscriber socket for receiving data, an event publisher socket for sending out events, and an event subscriber socket for receiving internal events. Each socket is configured with specific options, such as ZMQ_SNDHWM, ZMQ_RCVHWM, and ZMQ_LINGER, before being bound or connected to specific in-process endpoints.

After the sockets are set up, the listen_thread is started. This background thread is dedicated to handling all incoming events on these sockets. It continuously polls the sockets for data using zmq::poll, and processes messages or events whenever they become available.

What's the actual result? (include assertion message & call stack if applicable)

I know that sockets are not thread-safe, and I have ensured that all socket operations are handled within the listen_thread to maintain thread safety.
However, I am still experiencing aborts.

Thank you in advance for your support and assistance!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant