Skip to content

Commit

Permalink
Merge pull request #191 from project-tsurugi/wip/i_75_partially_revert
Browse files Browse the repository at this point in the history
revert portions of e118bd2
  • Loading branch information
t-horikawa committed Feb 22, 2024
2 parents 34ed47a + cb7a548 commit cb61b46
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 97 deletions.
70 changes: 3 additions & 67 deletions src/tateyama/endpoint/common/worker_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,19 +61,12 @@ class worker_common {
stream,
};

worker_common(connection_type con, std::size_t session_id, std::string_view conn_info, std::shared_ptr<tateyama::session::resource::bridge> session)
worker_common(connection_type con, std::size_t session_id, std::string_view conn_info, [[maybe_unused]] const std::shared_ptr<tateyama::session::resource::bridge>& session)
: connection_type_(con),
session_id_(session_id),
session_info_(session_id, connection_label(con), conn_info),
session_(std::move(session)),
session_variable_set_(variable_declarations()),
session_context_(std::make_shared<tateyama::session::resource::session_context_impl>(session_info_, session_variable_set_))
{
if (session_) {
session_->register_session(session_context_);
}
session_info_(session_id, connection_label(con), conn_info) {
}
worker_common(connection_type con, std::size_t id, std::shared_ptr<tateyama::session::resource::bridge> session) : worker_common(con, id, "", std::move(session)) {
worker_common(connection_type con, std::size_t id, const std::shared_ptr<tateyama::session::resource::bridge>& session) : worker_common(con, id, "", session) {
}
void invoke(std::function<void(void)> func) {
task_ = std::packaged_task<void()>(std::move(func));
Expand All @@ -88,7 +81,6 @@ class worker_common {
const connection_type connection_type_; // NOLINT
const std::size_t session_id_; // NOLINT
session_info_impl session_info_; // NOLINT

// for ipc endpoint only
std::string connection_info_{}; // NOLINT
// for stream endpoint only
Expand All @@ -99,9 +91,6 @@ class worker_common {
std::future<void> future_; // NOLINT
std::thread thread_{}; // NOLINT

// for session management
const std::shared_ptr<tateyama::session::resource::bridge> session_; // NOLINT

bool handshake(tateyama::api::server::request* req, tateyama::api::server::response* res) {
if (req->service_id() != tateyama::framework::service_id_endpoint_broker) {
LOG(INFO) << "request received is not handshake";
Expand Down Expand Up @@ -181,53 +170,7 @@ class worker_common {
record.release_message();
}

bool endpoint_service([[maybe_unused]] const std::shared_ptr<tateyama::api::server::request>& req,
const std::shared_ptr<tateyama::api::server::response>& res,
std::size_t slot) {
auto data = req->payload();
tateyama::proto::endpoint::request::Request rq{};
if(! rq.ParseFromArray(data.data(), static_cast<int>(data.size()))) {
std::string error_message{"request parse error"};
LOG(INFO) << error_message;
notify_client(res.get(), tateyama::proto::diagnostics::Code::INVALID_REQUEST, error_message);
return false;
}
if(rq.command_case() != tateyama::proto::endpoint::request::Request::kCancel) {
std::stringstream ss;
ss << "bad request (cancel in endpoint): " << rq.command_case();
LOG(INFO) << ss.str();
notify_client(res.get(), tateyama::proto::diagnostics::Code::INVALID_REQUEST, ss.str());
return false;
}
{
std::lock_guard<std::mutex> lock(mtx_responses_);
if (auto itr = responses_.find(slot); itr != responses_.end()) {
if (auto ptr = itr->second.lock(); ptr) {
ptr->set_cancel(res);
}
}
}
return true;
}

void register_response(std::size_t slot, const std::shared_ptr<tateyama::endpoint::common::response>& response) noexcept {
std::lock_guard<std::mutex> lock(mtx_responses_);
if (auto itr = responses_.find(slot); itr != responses_.end()) {
responses_.erase(itr);
}
responses_.emplace(slot, response);
}
void remove_response(std::size_t slot) noexcept {
std::lock_guard<std::mutex> lock(mtx_responses_);
responses_.erase(slot);
}

private:
tateyama::session::session_variable_set session_variable_set_;
const std::shared_ptr<tateyama::session::resource::session_context_impl> session_context_;
std::map<std::size_t, std::weak_ptr<tateyama::endpoint::common::response>> responses_{};
std::mutex mtx_responses_{};

std::string_view connection_label(connection_type con) {
switch (con) {
case connection_type::ipc:
Expand All @@ -238,13 +181,6 @@ class worker_common {
return "";
}
}

[[nodiscard]] std::vector<std::tuple<std::string, tateyama::session::session_variable_set::variable_type, tateyama::session::session_variable_set::value_type>> variable_declarations() const noexcept {
return {
{ "example_integer", tateyama::session::session_variable_type::signed_integer, static_cast<std::int64_t>(0) }
};
}

};

} // tateyama::endpoint::common
21 changes: 5 additions & 16 deletions src/tateyama/endpoint/ipc/bootstrap/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,11 @@ void Worker::run()
}

auto request = std::make_shared<ipc_request>(*wire_, h, database_info_, session_info_);
std::size_t index = h.get_idx();
auto response = std::make_shared<ipc_response>(wire_, h.get_idx(), [this, index](){remove_response(index);});
if (request->service_id() != tateyama::framework::service_id_endpoint_broker) {
register_response(index, static_cast<std::shared_ptr<tateyama::endpoint::common::response>>(response));
if (!service_(static_cast<std::shared_ptr<tateyama::api::server::request>>(request),
static_cast<std::shared_ptr<tateyama::api::server::response>>(std::move(response)))) {
VLOG_LP(log_info) << "terminate worker because service returns an error";
break;
}
} else {
if (!endpoint_service(static_cast<std::shared_ptr<tateyama::api::server::request>>(request),
static_cast<std::shared_ptr<tateyama::api::server::response>>(std::move(response)),
index)) {
VLOG_LP(log_info) << "terminate worker because endpoint service returns an error";
break;
}
auto response = std::make_shared<ipc_response>(wire_, h.get_idx());
if (!service_(static_cast<std::shared_ptr<tateyama::api::server::request>>(request),
static_cast<std::shared_ptr<tateyama::api::server::response>>(std::move(response)))) {
VLOG_LP(log_info) << "terminate worker because service returns an error";
break;
}
request->dispose();
request = nullptr;
Expand Down
18 changes: 4 additions & 14 deletions src/tateyama/endpoint/stream/bootstrap/stream_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,20 +75,10 @@ void stream_worker::run()

auto request = std::make_shared<stream_request>(*session_stream_, payload, database_info_, session_info_);
auto response = std::make_shared<stream_response>(session_stream_, slot);
if (request->service_id() != tateyama::framework::service_id_endpoint_broker) {
register_response(slot, static_cast<std::shared_ptr<tateyama::endpoint::common::response>>(response));
if(!service_(static_cast<std::shared_ptr<tateyama::api::server::request>>(request),
static_cast<std::shared_ptr<tateyama::api::server::response>>(std::move(response)))) {
VLOG_LP(log_info) << "terminate worker because service returns an error";
break;
}
} else {
if (!endpoint_service(static_cast<std::shared_ptr<tateyama::api::server::request>>(request),
static_cast<std::shared_ptr<tateyama::api::server::response>>(std::move(response)),
slot)) {
VLOG_LP(log_info) << "terminate worker because endpoint service returns an error";
break;
}
if(!service_(static_cast<std::shared_ptr<tateyama::api::server::request>>(request),
static_cast<std::shared_ptr<tateyama::api::server::response>>(std::move(response)))) {
VLOG_LP(log_info) << "terminate worker because service returns an error";
break;
}
request = nullptr;
}
Expand Down

0 comments on commit cb61b46

Please sign in to comment.