Skip to content

Commit

Permalink
make session smoke test pass
Browse files Browse the repository at this point in the history
  • Loading branch information
t-horikawa committed May 24, 2024
1 parent e7c77f5 commit aac9d98
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 26 deletions.
5 changes: 5 additions & 0 deletions src/tateyama/endpoint/common/response.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
#include <atomic>
#include <memory>

#include <glog/logging.h>
#include <tateyama/logging.h>
#include <tateyama/logging_helper.h>

#include <tateyama/api/server/response.h>

namespace tateyama::endpoint::common {
Expand All @@ -38,6 +42,7 @@ class response : public tateyama::api::server::response {
}

void cancel() noexcept {
VLOG_LP(log_trace) << "set cancel flag for session " << session_id_;
cancel_ = true;
if (data_channel_) {
release_channel(*data_channel_);
Expand Down
8 changes: 6 additions & 2 deletions src/tateyama/endpoint/common/worker_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ class worker_common {
* @brief dispose the session_elements in the session_store.
*/
void dispose_session_store() {
session_store_.dispose();
if (!dispose_done_) {
session_store_.dispose();
dispose_done_ = true;
}
}

protected:
Expand Down Expand Up @@ -341,7 +344,7 @@ class worker_common {
}
}

bool is_shuttingdown() {
bool check_shutdown_request() {
auto sr = session_context_->shutdown_request();
if ((sr == tateyama::session::shutdown_request_type::forceful) && !cancel_requested_to_all_responses_) {
foreach_reqreses([](tateyama::endpoint::common::response& r){ r.cancel(); });
Expand Down Expand Up @@ -425,6 +428,7 @@ class worker_common {
std::map<std::size_t, std::pair<std::shared_ptr<tateyama::api::server::request>, std::shared_ptr<tateyama::endpoint::common::response>>> reqreses_{};
std::mutex mtx_reqreses_{};
bool cancel_requested_to_all_responses_{};
bool dispose_done_{};

std::string_view connection_label(connection_type con) {
switch (con) {
Expand Down
20 changes: 11 additions & 9 deletions src/tateyama/endpoint/ipc/bootstrap/ipc_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ void Worker::do_work() {
try {
hdr = request_wire_container_->peep();
} catch (std::runtime_error &ex) {
VLOG_LP(log_trace) << "cought exception: " << ex.what();
care_reqreses();
if (is_shuttingdown() && is_completed()) {
wire_->get_response_wire().notify_shutdown();
VLOG_LP(log_info) << "terminate worker because shutdown completed";
if (check_shutdown_request() && is_completed()) {
VLOG_LP(log_trace) << "terminate worker thread for session " << session_id_ << ", as it has received a shutdown request from outside the communication partner";
break;
}
continue;
Expand All @@ -67,9 +67,8 @@ void Worker::do_work() {
dispose_session_store();
request_shutdown(tateyama::session::shutdown_request_type::forceful);
care_reqreses();
if (is_shuttingdown() && is_completed()) {
wire_->get_response_wire().notify_shutdown();
VLOG_LP(log_info) << "terminate worker because shutdown completed";
if (check_shutdown_request() && is_completed()) {
VLOG_LP(log_trace) << "terminate worker thread for session " << session_id_ << ", as disconnection is requested and the subsequent shutdown process is completed";
break;
}
continue;
Expand Down Expand Up @@ -105,14 +104,14 @@ void Worker::do_work() {
break;

default:
if (!is_shuttingdown()) {
if (!check_shutdown_request()) {
if (!service_(std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::api::server::response>(response))) {
VLOG_LP(log_info) << "terminate worker because service returns an error";
exit_frag = true;
}
} else {
notify_client(response.get(), tateyama::proto::diagnostics::SESSION_CLOSED, "");
notify_client(response.get(), tateyama::proto::diagnostics::SESSION_CLOSED, "this session is already shutdown");
}
break;

Expand All @@ -129,18 +128,21 @@ void Worker::do_work() {
break;
}
}
dispose_session_store();
wire_->get_response_wire().notify_shutdown();
VLOG_LP(log_trace) << "destroy session wire: session_id = " << std::to_string(session_id_);
#ifdef ENABLE_ALTIMETER
tateyama::endpoint::altimeter::session_end(database_info_, session_info_);
#endif
VLOG(log_debug_timing_event) << "/:tateyama:timing:session:finished " << session_id_;
}

// Processes shutdown requests from outside the communication partner.
bool Worker::terminate(tateyama::session::shutdown_request_type type) {
VLOG_LP(log_trace) << "send terminate request: session_id = " << std::to_string(session_id_);

auto rv = request_shutdown(type);
wire_->notify_shutdown();
wire_->get_request_wire()->notify();
return rv;
}

Expand Down
7 changes: 1 addition & 6 deletions src/tateyama/endpoint/ipc/bootstrap/server_wires_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ class server_wire_container_impl : public server_wire_container
}
std::size_t read_point() override { return wire_->read_point(); }
void dispose() override { wire_->dispose(); }
void notify() { wire_->notify(); }
void notify() override { wire_->notify(); }

// for mainly client, except for terminate request from server
void write(const char* from, const std::size_t len, tateyama::common::wire::message_header::index_type index) {
Expand Down Expand Up @@ -553,11 +553,6 @@ class server_wire_container_impl : public server_wire_container
return garbage_collector_impl_.get();
}

void notify_shutdown() {
request_wire_.notify();
response_wire_.notify_shutdown();
}

static std::size_t proportional_memory_size(std::size_t datachannel_buffer_size, std::size_t max_datachannel_buffers) {
return (datachannel_buffer_size + data_channel_overhead) * max_datachannel_buffers + (request_buffer_size + response_buffer_size) + total_overhead;
}
Expand Down
1 change: 1 addition & 0 deletions src/tateyama/endpoint/ipc/server_wires.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class server_wire_container
virtual std::string_view payload() = 0;
virtual void read(char *) = 0;
virtual void dispose() = 0;
virtual void notify() = 0;
};
class response_wire_container {
public:
Expand Down
17 changes: 12 additions & 5 deletions src/tateyama/endpoint/ipc/wire.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,7 @@ class unidirectional_message_wire : public simple_wire<message_header> {
return {message_header::terminate_request, 0};
}
if (onetime_notification_.load()) {
onetime_notification_.store(false);
return {message_header::terminate_request, 0};
throw std::runtime_error("received shutdown request from outside the communication partner");
}
boost::interprocess::scoped_lock lock(m_mutex_);
wait_for_read_ = true;
Expand Down Expand Up @@ -473,7 +472,7 @@ class unidirectional_response_wire : public simple_wire<response_header> {
}

while (true) {
if (closed_.load()) {
if (closed_.load() || shutdown_.load()) {
header_received_ = response_header(0, 0, 0);
return header_received_;
}
Expand All @@ -485,7 +484,7 @@ class unidirectional_response_wire : public simple_wire<response_header> {
wait_for_read_ = true;
std::atomic_thread_fence(std::memory_order_acq_rel);

if (!c_empty_.timed_wait(lock, boost::get_system_time() + boost::posix_time::microseconds(u_cap(u_round(timeout))), [this](){ return (stored() >= response_header::size) || closed_.load(); })) {
if (!c_empty_.timed_wait(lock, boost::get_system_time() + boost::posix_time::microseconds(u_cap(u_round(timeout))), [this](){ return (stored() >= response_header::size) || closed_.load() || shutdown_.load(); })) {
wait_for_read_ = false;
throw std::runtime_error("response has not been received within the specified time");
}
Expand Down Expand Up @@ -520,6 +519,10 @@ class unidirectional_response_wire : public simple_wire<response_header> {
closed_.store(true);
std::atomic_thread_fence(std::memory_order_acq_rel);
if (wait_for_write_) {
boost::interprocess::scoped_lock lock(m_mutex_);
c_full_.notify_one();
}
if (wait_for_read_) {
boost::interprocess::scoped_lock lock(m_mutex_);
c_empty_.notify_one();
}
Expand All @@ -544,8 +547,12 @@ class unidirectional_response_wire : public simple_wire<response_header> {
/**
* @brief notify client of the client of the shutdown
*/
void notify_shutdown() noexcept {
void notify_shutdown() {
shutdown_.store(true);
if (wait_for_read_) {
boost::interprocess::scoped_lock lock(m_mutex_);
c_empty_.notify_one();
}
}

private:
Expand Down
9 changes: 5 additions & 4 deletions src/tateyama/endpoint/stream/bootstrap/stream_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,14 @@ void stream_worker::do_work()
break;

default:
if (!is_shuttingdown()) {
if (!check_shutdown_request()) {
if(service_(std::dynamic_pointer_cast<tateyama::api::server::request>(request),
std::dynamic_pointer_cast<tateyama::api::server::response>(response))) {
continue;
}
VLOG_LP(log_info) << "terminate worker because service returns an error";
} else {
notify_client(response.get(), tateyama::proto::diagnostics::SESSION_CLOSED, "");
notify_client(response.get(), tateyama::proto::diagnostics::SESSION_CLOSED, "this session is already shutdown");
continue;
}
}
Expand All @@ -126,23 +126,24 @@ void stream_worker::do_work()

case tateyama::endpoint::stream::stream_socket::await_result::timeout:
care_reqreses();
if (is_shuttingdown() && is_completed()) {
if (check_shutdown_request() && is_completed()) {
VLOG_LP(log_trace) << "received and completed shutdown request: session_id = " << std::to_string(session_id_);
break;
}
continue;

case tateyama::endpoint::stream::stream_socket::await_result::termination_request:
dispose_session_store();
request_shutdown(tateyama::session::shutdown_request_type::forceful);
session_stream_->send_session_bye_ok();
continue;

default: // some error
dispose_session_store();
break;
}
break;
}
dispose_session_store();
session_stream_->close();

#ifdef ENABLE_ALTIMETER
Expand Down

1 comment on commit aac9d98

@t-horikawa
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.