Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

inproc pub/sub must bind before connect with libuv #4463

Open
aeiouaoeiuv opened this issue Nov 21, 2022 · 0 comments
Open

inproc pub/sub must bind before connect with libuv #4463

aeiouaoeiuv opened this issue Nov 21, 2022 · 0 comments

Comments

@aeiouaoeiuv
Copy link

Issue description

inproc pub/sub must bind before connect was fixed mentioned by this comment since version 4.0.
When I am trying to use inproc pub/sub with libuv, this bug comes out again.

Environment

  • libzmq version (commit hash if unreleased): 4.3.4
  • OS: ubuntu 22.04 amd64
  • zmq.hpp: 4.9.0
  • libuv: 1.42.0

Minimal test code / Steps to reproduce the issue

1.main.cpp

#include <chrono>
#include <functional>
#include <iostream>
#include <memory>
#include <thread>

#include "uv.h"
#include "zmq.hpp"

using namespace std::placeholders;

class Job {
public:
  void Run() {
    std::thread(&Job::ThreadPub, this).detach();             // pub thread runs first
    std::this_thread::sleep_for(std::chrono::seconds(2));
    std::thread(&Job::ThreadSub, this).detach();             // sub thread runs after

    while (true) {
      std::this_thread::sleep_for(std::chrono::seconds(100));
    }
  }

private:
  zmq::context_t ctx_;

  void ThreadPub() {
    std::cout << "pub thread start" << std::endl;
    zmq::socket_t sock(ctx_, zmq::socket_type::pub);
    sock.bind("inproc://what");

    while (true) {
      std::this_thread::sleep_for(std::chrono::seconds(1));
      zmq::const_buffer zmq_buf("topicabcd", 9);
      sock.send(zmq_buf, zmq::send_flags::none);
      std::cout << "pub data" << std::endl;
    }
  }

  static void PollCallback(uv_poll_t *handle, int status, int events) {
    auto sock = static_cast<zmq::socket_t *>(handle->data);
    char buf[256];
    while (true) {
      auto events = sock->get(zmq::sockopt::events);
      if (!(events & ZMQ_POLLIN)) {
        return;
      }

      zmq::mutable_buffer buf(&buf, sizeof(buf));
      auto recv_result = sock->recv(buf, zmq::recv_flags::none);

      std::cout << "sub msg length: " << recv_result.value().size << std::endl;
    }
  }

  void ThreadSub() {
    std::cout << "sub thread start" << std::endl;
    auto sock = std::make_shared<zmq::socket_t>(ctx_, zmq::socket_type::sub);

    sock->connect("inproc://what");
    std::string topic = "topic";
    sock->set(zmq::sockopt::subscribe, topic);
    int fd = sock->get(zmq::sockopt::fd);

    uv_loop_t loop;
    uv_loop_init(&loop);
    uv_poll_t poll_handle;
    poll_handle.data = sock.get();
    uv_poll_init(&loop, &poll_handle, fd);
    uv_poll_start(&poll_handle, UV_READABLE, PollCallback);
    uv_run(&loop, UV_RUN_DEFAULT);
  }
};

int main(int argc, char *argv[]) {
  Job job;
  job.Run();

  return 0;
}

test.tar.gz is a demo can quickly test this bug. libuv.so(v1.42.0) and libzmq.a(v4.3.4) are also added for compilation.

What's the actual result? (include assertion message & call stack if applicable)

the main.cpp above runs output like this:

# ./build/test
pub thread start
pub data
sub thread start
pub data
pub data
......

That means sub thread subscribe nothing.
But, if we start ThreadSub() first before ThreadPub(), ThreadSub() started to subscribe data.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant