Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 90 additions & 1 deletion tests/context.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#include <catch.hpp>
#include <zmq.hpp>
#include <zmq_addon.hpp>

#if (__cplusplus >= 201703L)
static_assert(std::is_nothrow_swappable<zmq::context_t>::value,
Expand All @@ -18,6 +18,26 @@ TEST_CASE("context create, close and destroy", "[context]")
CHECK(NULL == (void *) context);
}

TEST_CASE("context shutdown", "[context]")
{
zmq::context_t context;
context.shutdown();
CHECK(NULL != (void *) context);
context.close();
CHECK(NULL == (void *) context);
}

TEST_CASE("context shutdown often", "[context]")
{
zmq::context_t context;
context.shutdown();
context.shutdown();
context.shutdown();
CHECK(NULL != (void *) context);
context.close();
CHECK(NULL == (void *) context);
}

#ifdef ZMQ_CPP11
TEST_CASE("context swap", "[context]")
{
Expand All @@ -26,4 +46,73 @@ TEST_CASE("context swap", "[context]")
using std::swap;
swap(context1, context2);
}

TEST_CASE("context - use socket after shutdown", "[context]")
{
zmq::context_t context;
zmq::socket_t sock(context, zmq::socket_type::rep);
context.shutdown();
try
{
sock.connect("inproc://test");
zmq::message_t msg;
sock.recv(msg, zmq::recv_flags::dontwait);
REQUIRE(false);
}
catch (const zmq::error_t& e)
{
REQUIRE(e.num() == ETERM);
}
}

TEST_CASE("context - create socket after shutdown", "[context]")
{
zmq::context_t context;
context.shutdown();
// sockets created after shutdown dont return ETERM
zmq::socket_t sock(context, zmq::socket_type::rep);
sock.connect("inproc://test");
zmq::message_t msg;
sock.recv(msg, zmq::recv_flags::dontwait);
}

TEST_CASE("context - create socket after shutdown - shutdown_guard", "[context]")
{
zmq::context_t context;
zmq::shutdown_guard sg{context};
try
{
zmq::socket_t sock(context, zmq::socket_type::rep);
sock.connect("inproc://test");
zmq::message_t msg;
sock.recv(msg); // blocks until shutdown
}
catch (const zmq::error_t& e)
{
REQUIRE(e.num() == ETERM);
}
}

TEST_CASE("context - create socket in thread after shutdown - shutdown_guard", "[context]")
{
zmq::context_t context;
auto thread = std::thread([&]{
try
{
// this may or may not start before the shutdown() but that is OK
std::this_thread::sleep_for(std::chrono::milliseconds(5));
zmq::socket_t sock(context, zmq::socket_type::rep);
sock.connect("inproc://test");
zmq::message_t msg;
sock.recv(msg); // blocks until shutdown
}
catch (const zmq::error_t& e)
{
REQUIRE(e.num() == ETERM);
}
});

zmq::shutdown_guard sg{context};
thread.join();
}
#endif
14 changes: 14 additions & 0 deletions zmq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,7 @@ class context_t

~context_t() ZMQ_NOTHROW { close(); }

// Terminates context (see also shutdown()).
void close() ZMQ_NOTHROW
{
if (ptr == ZMQ_NULLPTR)
Expand All @@ -691,6 +692,19 @@ class context_t
ptr = ZMQ_NULLPTR;
}

// Shutdown context in preparation for termination (close()).
// Causes all blocking socket operations and any further
// operations to return with ETERM.
// Operation on sockets constructed after
// this call will however NOT return with ETERM.
void shutdown() ZMQ_NOTHROW
{
if (ptr == ZMQ_NULLPTR)
return;
int rc = zmq_ctx_shutdown(ptr);
ZMQ_ASSERT(rc == 0);
}

// Be careful with this, it's probably only useful for
// using the C api together with an existing C++ api.
// Normally you should never need to use this.
Expand Down
59 changes: 59 additions & 0 deletions zmq_addon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@
#include <sstream>
#include <stdexcept>
#ifdef ZMQ_CPP11
#include <condition_variable>
#include <functional>
#include <mutex>
#include <thread>
#include <unordered_map>
#endif

Expand Down Expand Up @@ -572,6 +575,62 @@ class active_poller_t
#endif // defined(ZMQ_BUILD_DRAFT_API) && defined(ZMQ_CPP11) && defined(ZMQ_HAVE_POLLER)


#ifdef ZMQ_CPP11
/* A context shutdown scope guard.

Starts a context shutdown loop in another thread
until destructed or explicitly stopped.
Useful for unblocking blocking function calls
and stopping further socket operations
in multi-threaded applications before terminating
the context. Example usage:

{
zmq::shutdown_guard sg{ctx};
threads.join(); // join all threads using sockets
} // stops sg
ctx.close(); // terminate context
*/
class shutdown_guard
{
public:
// Call ctx.shutdown() repeatedly on iv intervals until stopped.
explicit shutdown_guard(zmq::context_t& ctx,
std::chrono::microseconds iv = std::chrono::microseconds(5000))
{
thrd = std::thread([&, iv]{
while (true)
{
ctx.shutdown();
std::unique_lock<std::mutex> lock{mtx};
if (cv.wait_for(lock, iv, [this]{ return do_stop; }))
return; // stop requested
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I somehow don't get the point on the use case for this. Why does this need to be a loop which might call shutdown multiple times? When would a single call to shutdown not be sufficient?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might not be required if 1 socket is created before shutdown, see zeromq/libzmq#3792. A better workaround/hack is maybe to create a temporary socket in context_t::shutdown before calling zmq_ctx_shutdown?

});
}

~shutdown_guard() noexcept { stop(); }

void stop() noexcept
{
if (!thrd.joinable())
return;
{
std::lock_guard<std::mutex> lock{mtx};
do_stop = true;
}
cv.notify_one();
thrd.join();
}

private:
std::thread thrd;
std::mutex mtx;
std::condition_variable cv;
bool do_stop{false};
};
#endif

} // namespace zmq

#endif // __ZMQ_ADDON_HPP_INCLUDED__