diff --git a/src/tateyama/endpoint/common/worker_common.h b/src/tateyama/endpoint/common/worker_common.h index 6930ac46..1ff54ad5 100644 --- a/src/tateyama/endpoint/common/worker_common.h +++ b/src/tateyama/endpoint/common/worker_common.h @@ -103,7 +103,7 @@ class worker_common { } [[nodiscard]] bool is_quiet() { - return !has_incomplete_response() && !has_incomplete_resultset(); + return !has_incomplete_response() && !has_incomplete_resultset() && terminated(); } /** diff --git a/src/tateyama/endpoint/ipc/bootstrap/ipc_listener.h b/src/tateyama/endpoint/ipc/bootstrap/ipc_listener.h index 65e4f4e7..e77f7296 100644 --- a/src/tateyama/endpoint/ipc/bootstrap/ipc_listener.h +++ b/src/tateyama/endpoint/ipc/bootstrap/ipc_listener.h @@ -145,14 +145,10 @@ class ipc_listener { auto& worker = workers_.at(index); worker->register_worker_in_context(worker); worker->run(); - std::shared_ptr worker_ptr{}; { - std::unique_lock lock(mtx_workers_); - worker_ptr = std::move(worker); - } - { - std::unique_lock lock(mtx_undertakers_); - undertakers_.emplace(std::move(worker_ptr)); + std::unique_lock lock_w(mtx_workers_); + std::unique_lock lock_u(mtx_undertakers_); + undertakers_.emplace(std::move(worker)); } connection_queue.disconnect(index); }); @@ -229,21 +225,23 @@ class ipc_listener { void confirm_workers_termination() { bool message_output{false}; while (true) { - bool worker_remain{false}; - for (auto&& worker : workers_) { - std::shared_ptr worker_ptr = worker; - if (worker_ptr) { - if (!message_output) { // message output for the first worker only - VLOG_LP(log_trace) << "wait for remaining worker thread, session id = " << worker_ptr->session_id(); - message_output = true; + { + std::unique_lock lock(mtx_workers_); + bool worker_remain{false}; + for (auto& worker : workers_) { + if (worker) { + if (!message_output) { // message output for the first worker only + VLOG_LP(log_trace) << "wait for remaining worker thread, session id = " << worker->session_id(); + message_output = true; + } + worker_remain = true; } - worker_remain = true; - std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + if (!worker_remain) { + break; } } - if (!worker_remain) { - break; - } + std::this_thread::sleep_for(std::chrono::milliseconds(20)); } while (!care_undertakers()) { std::this_thread::sleep_for(std::chrono::milliseconds(20)); diff --git a/src/tateyama/endpoint/stream/bootstrap/stream_listener.h b/src/tateyama/endpoint/stream/bootstrap/stream_listener.h index 9909d5e4..68d66ba9 100644 --- a/src/tateyama/endpoint/stream/bootstrap/stream_listener.h +++ b/src/tateyama/endpoint/stream/bootstrap/stream_listener.h @@ -164,14 +164,10 @@ class stream_listener { auto& worker = workers_.at(index); worker->register_worker_in_context(worker); worker->run(); - std::shared_ptr worker_ptr{}; { - std::unique_lock lock(mtx_workers_); - worker_ptr = std::move(worker); - } - { - std::unique_lock lock(mtx_undertakers_); - undertakers_.emplace(std::move(worker_ptr)); + std::unique_lock lock_w(mtx_workers_); + std::unique_lock lock_u(mtx_undertakers_); + undertakers_.emplace(std::move(worker)); } }); session_id++; @@ -227,20 +223,23 @@ class stream_listener { void confirm_workers_termination() { bool message_output{false}; while (true) { - bool worker_remain{false}; - for (auto&& worker : workers_) { - std::shared_ptr worker_ptr = worker; - if (worker_ptr) { - if (!message_output) { - VLOG_LP(log_trace) << "wait for remaining worker thread, session id = " << worker->session_id(); - message_output = true; + { + std::unique_lock lock(mtx_workers_); + bool worker_remain{false}; + for (auto& worker : workers_) { + if (worker) { + if (!message_output) { // message output for the first worker only + VLOG_LP(log_trace) << "wait for remaining worker thread, session id = " << worker->session_id(); + message_output = true; + } + worker_remain = true; } - std::this_thread::sleep_for(std::chrono::milliseconds(20)); + } + if (!worker_remain) { + break; } } - if (!worker_remain) { - break; - } + std::this_thread::sleep_for(std::chrono::milliseconds(20)); } while (!care_undertakers()) { std::this_thread::sleep_for(std::chrono::milliseconds(20));