Skip to content

Commit

Permalink
Writer threads must still complete loops, even if they error.
Browse files Browse the repository at this point in the history
There have been some lock-ups recently running planet-dump-ng in production (#24). Thanks to thread backtraces, it seems that a writer thread was dying (although there was no output?) and therefore no longer participating in the barrier to pump data from the reader, so the whole program was locking up.

The new behaviour is for the dying thread to still participate in pumping messages, but without the calls to the output writer. If the reader thread encounters an exception, it will abort. Hopefully this means that if a single writer dies, we get all the other output, and if the reader dies then we get a crash instead of a hang.
  • Loading branch information
zerebubuth committed Oct 11, 2021
1 parent 091209d commit b8d80d1
Showing 1 changed file with 21 additions and 10 deletions.
31 changes: 21 additions & 10 deletions src/copy_elements.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -252,22 +252,27 @@ template <> inline void write_elements<relation>(output_writer &writer, control_
template <typename T>
void writer_thread(int thread_index,
boost::exception_ptr exc,
boost::shared_ptr<output_writer> writer,
boost::shared_ptr<output_writer> writer,
boost::shared_ptr<control_block<T> > blk) {
const size_t block_size = block_size_trait<T>::value;

try {
do {
blk->pre_swap_barrier.wait();
blk->post_swap_barrier.wait();

write_elements<T>(*writer, *blk);


// if write_elements previously threw an exception, then don't call it
// again. but we need to continue going through the barrier loops, or all
// the other threads will lock up waiting for this thread.
if (exc == boost::exception_ptr()) {
write_elements<T>(*writer, *blk);
}

} while (blk->elements.size() == block_size);

} catch (...) {
exc = boost::current_exception();
std::cerr << "EXCEPTION: writer_thread(" << thread_index << "): "
std::cerr << "EXCEPTION: writer_thread(" << thread_index << "): "
<< boost::diagnostic_information(exc) << std::endl;
}

Expand All @@ -280,7 +285,8 @@ void writer_thread(int thread_index,
// this is a difficult case to handle - it's possible for locking
// to fail, but unless we signal the condition variable then the
// program would hang. instead, treat this as a fatal error.
std::cerr << "Thread " << thread_index << " failed to lock mutex!\n";
std::cerr << "Thread " << thread_index << " failed to lock mutex!"
<< std::endl;
abort();
}
}
Expand Down Expand Up @@ -320,17 +326,21 @@ void extract_users(std::map<int64_t, std::string> &display_name_map) {
}

template <typename T>
void reader_thread(int thread_index,
boost::exception_ptr exc,
void reader_thread(int thread_index,
boost::exception_ptr exc,
boost::shared_ptr<control_block<T> > blk) {
try {
thread_writer<T> writer(blk);
extract_element<T>(writer);

} catch (...) {
exc = boost::current_exception();
std::cerr << "EXCEPTION: reader_thread(" << thread_index << "): "
std::cerr << "EXCEPTION: reader_thread(" << thread_index << "): "
<< boost::diagnostic_information(exc) << std::endl;
// if the reader thread failed, we can't make any progress, and it's
// unlikely that the writer threads can recover from this safely, so
// just explode.
abort();
}

try {
Expand All @@ -342,7 +352,8 @@ void reader_thread(int thread_index,
// this is a difficult case to handle - it's possible for locking
// to fail, but unless we signal the condition variable then the
// program would hang. instead, treat this as a fatal error.
std::cerr << "Thread " << thread_index << " failed to lock mutex!\n";
std::cerr << "Thread " << thread_index << " failed to lock mutex!"
<< std::endl;
abort();
}
}
Expand Down

0 comments on commit b8d80d1

Please sign in to comment.