Skip to content

Commit

Permalink
Be a bit more explicit about thread termination
Browse files Browse the repository at this point in the history
  • Loading branch information
Mikko Koppanen committed Sep 28, 2011
1 parent f186926 commit a98433d
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 74 deletions.
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ SET(MODULE_NAME pzq)

SET(CMAKE_MODULE_PATH "${CMAKE_SOURCE_DIR}/cmake/" ${CMAKE_MODULE_PATH})

IF (APPLE)
IF(${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
IF(EXISTS "/opt/local/var/macports/")
LIST (APPEND CMAKE_PREFIX_PATH "/opt/local")
LIST (APPEND CMAKE_LIBRARY_PATH "/opt/local/lib")
Expand Down Expand Up @@ -33,7 +33,7 @@ TARGET_LINK_LIBRARIES(${MODULE_NAME} ${Boost_LIBRARIES})
TARGET_LINK_LIBRARIES(${MODULE_NAME} ${ZeroMQ_LIBRARIES})
TARGET_LINK_LIBRARIES(${MODULE_NAME} ${kyotocabinet_LIBRARIES})

IF (LINUX)
IF(${CMAKE_SYSTEM_NAME} MATCHES "Linux")
TARGET_LINK_LIBRARIES(${MODULE_NAME} uuid)
ENDIF()

Expand Down
119 changes: 60 additions & 59 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,104 +125,105 @@ int main (int argc, char *argv [])
signal (SIGTERM, time_to_go);

// Init new zeromq context
zmq::context_t *context = new zmq::context_t (1);
zmq::context_t context (1);

{
pzq::device_t receiver, sender;

int linger = 1000;
uint64_t hwm = 1;
{
int linger = 1000;
uint64_t hwm = 1;

try {
// Wire the receiver
boost::shared_ptr<pzq::socket_t> receiver_in (new pzq::socket_t (*context, ZMQ_ROUTER));
boost::shared_ptr<pzq::socket_t> receiver_in (new pzq::socket_t (context, ZMQ_ROUTER));
receiver_in.get ()->setsockopt (ZMQ_LINGER, &linger, sizeof (int));
receiver_in.get ()->setsockopt (ZMQ_HWM, &hwm, sizeof (uint64_t));
receiver_in.get ()->bind (receiver_dsn.c_str ());

boost::shared_ptr<pzq::socket_t> receiver_out (new pzq::socket_t (*context, ZMQ_PAIR));
boost::shared_ptr<pzq::socket_t> receiver_out (new pzq::socket_t (context, ZMQ_PAIR));
receiver_out.get ()->setsockopt (ZMQ_LINGER, &linger, sizeof (int));
receiver_out.get ()->setsockopt (ZMQ_HWM, &hwm, sizeof (uint64_t));
receiver_out.get ()->bind ("inproc://receiver-inproc");

// Wire the sender
boost::shared_ptr<pzq::socket_t> sender_in (new pzq::socket_t (*context, ZMQ_PAIR));
boost::shared_ptr<pzq::socket_t> sender_in (new pzq::socket_t (context, ZMQ_PAIR));
sender_in.get ()->setsockopt (ZMQ_LINGER, &linger, sizeof (int));
sender_in.get ()->setsockopt (ZMQ_HWM, &hwm, sizeof (uint64_t));
sender_in.get ()->bind ("inproc://sender-inproc");

boost::shared_ptr<pzq::socket_t> sender_out (new pzq::socket_t (*context, ZMQ_DEALER));
boost::shared_ptr<pzq::socket_t> sender_out (new pzq::socket_t (context, ZMQ_DEALER));
sender_out.get ()->setsockopt (ZMQ_LINGER, &linger, sizeof (int));
sender_out.get ()->setsockopt (ZMQ_HWM, &hwm, sizeof (uint64_t));
sender_out.get ()->bind (sender_dsn.c_str ());

// Start the receiver device
receiver.set_sockets (receiver_in, receiver_out);
receiver.start ();

// Start the sender device
sender.set_sockets (sender_in, sender_out);
sender.start ();
} catch (std::exception &e) {
std::cerr << "Error starting listening sockets: " << e.what () << std::endl;
return 1;
}

boost::shared_ptr<pzq::datastore_t> store (new pzq::datastore_t ());
store.get ()->set_sync_divisor (sync_divisor);
store.get ()->open (filename, inflight_size);
try {
// Start the receiver device
receiver.set_sockets (receiver_in, receiver_out);
receiver.start ();

// Start the sender device
sender.set_sockets (sender_in, sender_out);
sender.start ();
} catch (std::exception &e) {
std::cerr << "Error starting listening sockets: " << e.what () << std::endl;
return 1;
}

try {
// Start the store manager
pzq::manager_t manager;
boost::shared_ptr<pzq::datastore_t> store (new pzq::datastore_t ());
store.get ()->set_sync_divisor (sync_divisor);
store.get ()->open (filename, inflight_size);

boost::shared_ptr<pzq::socket_t> manager_in (new pzq::socket_t (*context, ZMQ_PAIR));
boost::shared_ptr<pzq::socket_t> manager_in (new pzq::socket_t (context, ZMQ_PAIR));
manager_in.get ()->setsockopt (ZMQ_LINGER, &linger, sizeof (int));
manager_in.get ()->setsockopt (ZMQ_HWM, &hwm, sizeof (uint64_t));
manager_in.get ()->connect ("inproc://receiver-inproc");

boost::shared_ptr<pzq::socket_t> manager_out (new pzq::socket_t (*context, ZMQ_PAIR));
boost::shared_ptr<pzq::socket_t> manager_out (new pzq::socket_t (context, ZMQ_PAIR));
manager_out.get ()->setsockopt (ZMQ_LINGER, &linger, sizeof (int));
manager_out.get ()->setsockopt (ZMQ_HWM, &hwm, sizeof (uint64_t));
manager_out.get ()->connect ("inproc://sender-inproc");

boost::shared_ptr<pzq::socket_t> monitor (new pzq::socket_t (*context, ZMQ_ROUTER));
boost::shared_ptr<pzq::socket_t> monitor (new pzq::socket_t (context, ZMQ_ROUTER));
monitor.get ()->setsockopt (ZMQ_LINGER, &linger, sizeof (int));
monitor.get ()->setsockopt (ZMQ_HWM, &hwm, sizeof (uint64_t));
monitor.get ()->bind (monitor_dsn.c_str ());

// Reaper for expired messages
pzq::expiry_visitor_t reaper (store);
reaper.set_frequency (reaper_frequency);
reaper.set_ack_timeout (ack_timeout);
reaper.start ();

// Syncing to disk
pzq::sync_t sync (store);
sync.set_frequency (sync_frequency);
sync.start ();

manager.set_datastore (store);
manager.set_ack_timeout (ack_timeout);
manager.set_sockets (manager_in, manager_out, monitor);

manager.start ();

while (keep_running)
{
boost::this_thread::sleep (boost::posix_time::seconds (1));
try {
// Start the store manager
pzq::manager_t manager;

// Reaper for expired messages
pzq::expiry_visitor_t reaper (store);
reaper.set_frequency (reaper_frequency);
reaper.set_ack_timeout (ack_timeout);
reaper.start ();

// Syncing to disk
pzq::sync_t sync (store);
sync.set_frequency (sync_frequency);
sync.start ();

manager.set_datastore (store);
manager.set_ack_timeout (ack_timeout);
manager.set_sockets (manager_in, manager_out, monitor);

manager.start ();

while (keep_running)
{
boost::this_thread::sleep (boost::posix_time::seconds (1));
}
manager.stop ();
sender.stop ();
receiver.stop ();
reaper.stop ();
sync.stop ();
} catch (std::exception &e) {
std::cerr << "Error starting store manager: " << e.what () << std::endl;
return 1;
}
manager.stop ();
sender.stop ();
receiver.stop ();
reaper.stop ();
sync.stop ();
} catch (std::exception &e) {
std::cerr << "Error starting store manager: " << e.what () << std::endl;
return 1;
store.reset ();
}
store.reset ();
}
delete context;
return 0;
}
29 changes: 16 additions & 13 deletions src/thread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,20 @@ namespace pzq {
m_mutex.lock ();
m_running = false;
m_mutex.unlock ();

m_thread->interrupt ();

// Try to join the thread
if (!m_thread->timed_join (boost::posix_time::seconds (2))) {
pthread_t handle = m_thread->native_handle ();

// Try to send SIGINT to the thread
pthread_kill (handle, SIGINT);

if (!m_thread->timed_join (boost::posix_time::seconds (2))) {
pthread_cancel (handle);
}
}
}

void start ()
Expand All @@ -69,19 +83,8 @@ namespace pzq {

virtual ~thread_t ()
{
stop ();
m_thread->interrupt ();

// Try to join the thread
if (!m_thread->timed_join (boost::posix_time::seconds (2))) {
pthread_t handle = m_thread->native_handle ();

// Try to send SIGINT to the thread
pthread_kill (handle, SIGINT);

if (!m_thread->timed_join (boost::posix_time::seconds (2))) {
pthread_cancel (handle);
}
if (m_running) {
stop ();
}
delete m_thread;
}
Expand Down

0 comments on commit a98433d

Please sign in to comment.