Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

azmq hangs on outstanding async_receive calls #99

Closed
ahundt opened this issue Jul 29, 2015 · 34 comments
Closed

azmq hangs on outstanding async_receive calls #99

ahundt opened this issue Jul 29, 2015 · 34 comments

Comments

@ahundt
Copy link

ahundt commented Jul 29, 2015

The following minimal example generates a few extra receive calls. The expected behavior is that the sockets and io_service should be possible to shut down and destroy while an async_receive call is outstanding. However, the actual behavior of the code below is that the application hangs and cannot exit due to what I believe is an azmq bug:

#include <mutex>
#include <iostream>
#include <memory>
#include <thread>


#include <azmq/socket.hpp>

/// Send messages between a client and server asynchronously. 
///
/// @see overview of zmq socket types https://sachabarbs.wordpress.com/2014/08/21/zeromq-2-the-socket-types-2/
/// @see bounce is based on https://github.com/zeromq/azmq/blob/master/test/socket/main.cpp
void bounce(std::shared_ptr<azmq::socket> sendP, std::shared_ptr<azmq::socket> receiveP, bool shouldReceive = true) {
            std::shared_ptr<std::atomic<int> > recv_countP(std::make_shared<std::atomic<int>>(0));
            std::shared_ptr<std::atomic<int> > send_countP(std::make_shared<std::atomic<int>>(0));

            constexpr int messagesToSend = 1000;
    int receiveAttempts = 0;

    for (int x = 0; (x<messagesToSend) || ((*recv_countP) < messagesToSend); ++x) {

        /////////////////////////////////////////
        // Client sends to server asynchronously!
        {
             sendP->async_send(boost::asio::buffer(&x, 4), [x,sendP,send_countP] (boost::system::error_code const& ec, size_t bytes_transferred) {
                    if(ec) std::cout << "SendFlatBuffer error! todo: figure out how to handle this\n";
                    else {
                      std::cout << "sent: " << x << "\n";
                      (*send_countP)++;
                    }

                });
        }

        //////////////////////////////////////////////
        // Server receives from client asynchronously!
        while
        (
                  shouldReceive && (*recv_countP < *send_countP)
               && (*recv_countP < messagesToSend)
               && (receiveAttempts < (*send_countP)-(*recv_countP))
        )
        {
            std::shared_ptr<int> recvBufP(std::make_shared<int>(0));
            BOOST_VERIFY(*recvBufP == 0);
            receiveP->async_receive(boost::asio::buffer(recvBufP.get(),sizeof(*recvBufP)), [recvBufP,receiveP,recv_countP, messagesToSend](boost::system::error_code const ec, size_t bytes_transferred) {

                if(ec) std::cout << "start_async_receive_buffers error! todo: figure out how to handle this\n";
                else std::cout << "received: " << *recvBufP << " recv_count:" << *recv_countP << "\n";
                // make rbp the size of the actual amount of data read
                (*recv_countP)++;
            });
            receiveAttempts++;
        }
        receiveAttempts = 0;

        //std::this_thread::sleep_for( std::chrono::milliseconds(1) );
    }
    sendP->get_io_service().stop();
    receiveP->get_io_service().stop();
    sendP.reset();
    receiveP.reset();
}


int main(int argc, char* argv[])
{

    std::string localhost("127.0.0.1");
    std::string localport("9998");
    std::string remotehost("127.0.0.1");
    std::string remoteport("9998");

    std::cout << "argc: " << argc << "\n";

    if (argc != 5 && argc != 1 && argc != 3)
    {
      std::cerr << "Usage: " << argv[0] << " <localip> <localport> <remoteip> <remoteport>\n";
      return 1;
    }

    bool shouldReceive = true;

    if(argc == 3){
      remotehost = std::string(argv[1]);
      remoteport = std::string(argv[2]);
      shouldReceive = false;
    }

    if(argc == 5){
      localhost = std::string(argv[1]);
      localport = std::string(argv[2]);
      remotehost = std::string(argv[3]);
      remoteport = std::string(argv[4]);
      shouldReceive = true;
    }

    std::cout << "using: "  << argv[0] << " ";
    if(shouldReceive) std::cout <<  localhost << " " << localport << " ";
    std::cout <<  remotehost << " " << remoteport << "\n";
    {
    boost::asio::io_service io_service;
    std::shared_ptr<azmq::socket> sendsocket = std::make_shared<azmq::socket>(io_service, ZMQ_DEALER);
    std::shared_ptr<azmq::socket> recvsocket = std::make_shared<azmq::socket>(io_service, ZMQ_DEALER);
    recvsocket->bind("tcp://" + localhost + ":" + localport);
    sendsocket->connect("tcp://"+ remotehost + ":" + remoteport);



    std::thread thr(bounce,sendsocket,recvsocket,shouldReceive);
    std::thread ios_t;

    {
        boost::asio::io_service::work work(io_service);
        ios_t = std::thread([&] {
            io_service.run();
        });
        sendsocket.reset();
        recvsocket.reset();
        thr.join();
    }

    io_service.stop();
    ios_t.join();
    }

  return 0;
}

I'm running d1f609d on OS X with zmq zeromq: stable 4.1.2 (bottled), HEAD from homebrew.

@oliora
Copy link

oliora commented Aug 8, 2015

@ahundt I've tried to understand a code and I'm a bit confused. What is the purpose of having three threads (main, thr and ios_t), using boost::asio::io_service::work and calling io_service::stop() for the same io_service three time. Probably it is better if you add text description of your scenario.

@ahundt
Copy link
Author

ahundt commented Aug 8, 2015

  • ios_t handles sending/receiving on the socket, running the socket's io_service.
  • thr calls bounce, handling initiating async calls to keep the socket's io_service busy.
  • main() thread actually doesn't really do anything but wait for the others to complete to verify the program exits properly.

I could probably just call bounce() from the main() thread just before io_service.stop(), and drop thr, but that's not important, the issue is the program never exits!

Looking at it again, I think an earlier version also didn't necessarily quit after n iterations. Instead the main() thread waited on ctrl-c. 3 threads instead of 2 is an artifact of that.

Also, the reason I created some boost::asio::io_service::work is easy to answer, sometimes io_service.run() exited immediately because there wasn't any work in the queue for ios_t yet!

@ahundt
Copy link
Author

ahundt commented Aug 8, 2015

Also, this test tries to do something very simple:

  • send n integer messages to itself
  • receive n integer messages (but may call receive more than n times)
  • exit gracefully when done

The reason receive could be called more times than send is because the sender could die unexpectedly if they are two separate machines! Even if that is the case, the program should exit properly when the time comes to do so, such as if I had set a timer that decided no more sends would occur.

@oliora
Copy link

oliora commented Aug 9, 2015

@ahundt regarding signal handling, to handle it properly you need to register your signal handler. Note that in this signal handler you should do nothing except setting an atomic variable.
I recommend you to check http://www.linuxprogrammingblog.com/all-about-linux-signals and http://stackoverflow.com/questions/17942034/simple-linux-signal-handling first.
C++ API to to handle signals: http://en.cppreference.com/w/cpp/utility/program/signal
Handling signals with Boost.Asio: http://www.boost.org/doc/libs/1_58_0/doc/html/boost_asio/overview/signals.html

@oliora
Copy link

oliora commented Aug 9, 2015

@ahundt regarding scenarioI have more questions:

  • should application quit after receiving all messages that was sent or after signal is received?

@ahundt
Copy link
Author

ahundt commented Aug 10, 2015

I expect the io_service to be destroyed successfully and the application to quit even if there are outstanding async_receive calls. Essentially, you should be able to stop the application at any time should a user request it modulo some short delay if necessary.

@ahundt
Copy link
Author

ahundt commented Aug 10, 2015

I believe standard asio exhibits this behavior. If I do something like the following pseudocode:

my_handler(){
}

int main() {
    {
   io_service ios;
   asio::udp::socket sock;
   // ...snip...
   async_receive(my_handler,buffer);
   io_service.run_one();
  }
  return 0;
}

The program should exit successfully. I think something similar to the above, though it might need some changes to make it correct, would not exit with azmq because the io_service destruction would fail due to the outstanding async_receive call.

@oliora
Copy link

oliora commented Aug 10, 2015

@ahundt signals handling it's not as simple as you think, especially in multi-thread process. Please read the links that I posted above.

@ahundt
Copy link
Author

ahundt commented Aug 11, 2015

I'm confused, how do signals relate to this issue? The problem is that azmq hangs when there are outstanding async_receive calls. I have no problems with signal handlers.

Sorry for the stray comments where I mention signal handlers. Those should have been removed, and I have taken them out now.

@ahundt
Copy link
Author

ahundt commented Aug 11, 2015

Could you try running the code I posted originally for the "minimal example"? It will never exit due to a bug in azmq. It will hang at the } right above return 0;.

@ahundt
Copy link
Author

ahundt commented Aug 11, 2015

I've updated the original description with details about the expected behavior and the actual behavior of the code. Please let me know if the issue is still unclear. I'm sorry that I made it so confusing before.

@oliora
Copy link

oliora commented Aug 11, 2015

Thank you. I like that it's clear now that it's not related to signals at all :)

@oliora
Copy link

oliora commented Aug 11, 2015

The reason the code hangs is not because of AZMQ bug but because shared_ptr to sockets are captured to lambda and after all they are not destroyed to a moment when ZMQ context is terminated. ZMQ context termination blocks if there is still sockets exist (http://api.zeromq.org/4-1:zmq-term).
To get rid of hanging, just remove sendP and receiveP from lambda capture lists. If you need to use sockets inside lambda then capture weak_ptr to socket rather than shared_ptr.

@oliora
Copy link

oliora commented Aug 11, 2015

Issue #100 created. Using of socket::close() in this situation would help to avoid hanging.

@ahundt
Copy link
Author

ahundt commented Aug 11, 2015

ah yes, good point w.r.t. weak pointers! thanks! I also like how you described issue #100 that sounds like a very appropriate feature to have.

@ahundt
Copy link
Author

ahundt commented Aug 11, 2015

You may want some version of my code here, perhaps simplified, to serve as a unit test for an example when socket::close() is implemented.

@oliora
Copy link

oliora commented Aug 12, 2015

You should understand that reading with timeout is the same for azmq socket as for standard asio socket. There are several ways to implement this and the solution highly depends on a way you do networking in your application. Usually you need a timer (like steady_timer) but in some cases you can skip it and wait with timeout on future.

Sorry, I don't have time to check links.

@rodgert
Copy link

rodgert commented Aug 12, 2015

Can I close this issue out then?

I will implement close() in next week or so.

@ahundt
Copy link
Author

ahundt commented Aug 16, 2015

sure, I haven't tested code with weak_ptr yet, but if I come across an issue I'll just make note here when the time comes. Thanks for the help!

@rodgert
Copy link

rodgert commented Aug 16, 2015

In general, with Asio under C++11 I tend to use the idiom of capturing by
weak_ptr<> in async tasks, then using the failure of weak_ptr<>.lock() to
acquire a shared_ptr<> as a signal to exit the async task gracefully.

We can re-open if necessary, and I am hopeful that i will have a patch
adding .close() and .flush() to the socket implementation either this week
or next.

On Sun, Aug 16, 2015 at 9:54 AM, Andrew Hundt notifications@github.com
wrote:

sure, I haven't tested code with weak_ptr yet, but if I come across an
issue I'll just make note here when the time comes. Thanks for the help!


Reply to this email directly or view it on GitHub
#99 (comment).

@ahundt
Copy link
Author

ahundt commented Aug 16, 2015

great! is there an example of your typical usage style, perhaps in the tests? I'd love to use it as a reference

@ahundt
Copy link
Author

ahundt commented Aug 16, 2015

I might even be able to fix up my code at the top and make it into an example you could include for new users.

@rodgert
Copy link

rodgert commented Aug 16, 2015

Unfortunately all code I have with examples like this is the IP of my
former employer. I will try to put something together and add it to the
samples shortly (or perhaps a unit test for close/flush when I get to that).

On Sun, Aug 16, 2015 at 12:41 PM, Andrew Hundt notifications@github.com
wrote:

great! is there an example of your typical usage style, perhaps in the
tests? I'd love to use it as a reference


Reply to this email directly or view it on GitHub
#99 (comment).

@ahundt
Copy link
Author

ahundt commented Mar 19, 2016

@rodgert bump I'm still dealing with this issue and would love if I could get a quick example!

@zmb3
Copy link

zmb3 commented Apr 13, 2016

Is a timer necessary in order to read with a timeout? Should sock.set_option(azmq::socket::rcv_timeo(500)) work?

@oliora
Copy link

oliora commented Apr 14, 2016

@zmb3 this will work only for a synchronous receive. For an asynchronous one a separate mechanism like asio timer is required.

@oliora
Copy link

oliora commented Apr 14, 2016

@ahundt looks like @rodgert is busy with other projects now. Are you asking about an example of using weak_ptr to gracefully handle socket destruction in an async operation completion handler?

@zmb3
Copy link

zmb3 commented Apr 14, 2016

Thanks @oliora - I eventually came to the same conclusion.

@ahundt
Copy link
Author

ahundt commented Apr 15, 2016

@oliora yeah an example with weak_ptr would be perfect.

For the async receive presumably a timer would keep getting extended on each successful receive and then if it does fire call .close(), correct? Thanks for your feedback!

@oliora
Copy link

oliora commented Apr 19, 2016

@ahundt the on-timer action is application specific. Simple apps would like to just call .close(). Apps that have some protocol layer on top of ZeroMQ may use different approach. E.g. our app has a layer that implements request-response model with multiple active requests in a time. The app doesn't close connection on specific response timeout, just removes timed-out request from the list of active requests so the response (if come later) will be ignored.

@rodgert
Copy link

rodgert commented May 17, 2016

Sorry, been off the air for a while. Where does this stand? Is there an issue here?

@ahundt The documentation on std::shared_ptr std::weak_ptr at http://en.cppreference.com/w/cpp/memory seem fairly self evident how they work and don't really have anything to do with AZMQ per-se. My point originally was that if you held a socket via std::shared_ptr (or perhaps some accompanying state type, e.g. a "server"), instead of capturing that shared_ptr in a lambda passed to a handler (thus bumping the reference and keeping the instance alive inadvertently), you capture it as a std::weak_ptr. The lambda capture will at that point no longer prevent the socket from being destroyed, and when the lambda is invoked, it would usually have an error_code indicating that the handler had been terminated, and even if for some reason it didn't indicate an error, a call to std::weak_ptr::lock() to get the instance you wanted to have available in the handler, would indicate that the socket had been destroyed, so the handler should just gracefully exit.

@ahundt
Copy link
Author

ahundt commented May 18, 2016

yes, I think I understand this now. I have more time at the moment so over the next week I plan to try modifying my test to pass a weak_ptr to lambda capture, then when the lambda is called and only if it is valid convert it to a shared_ptr again. This should allow things to exit. I'll test this out in a small test function.

I don't remember my original expectations, but I guess perhaps my original expectation may have been that all the handlers would be called with an error code if things were trying to go down.

@rodgert
Copy link

rodgert commented May 18, 2016

Termination is hard :)

Ideally all handlers should get called with an error code, but only if the
socket actually gets destroyed.

On Wednesday, May 18, 2016, Andrew Hundt notifications@github.com wrote:

yes, I think I understand this now. I have more time at the moment so over
the next week I plan to try modifying my test to pass a weak_ptr to lambda
capture, then when the lambda is called and only if it is valid convert it
to a shared_ptr again. This should allow things to exit. I'll test this out
in a small test function.

I don't remember my original expectations, but I guess perhaps my original
expectation may have been that all the handlers would be called with an
error code if things were trying to go down.


You are receiving this because you were mentioned.
Reply to this email directly or view it on GitHub
#99 (comment)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants