diff --git a/src/tateyama/endpoint/common/worker_common.h b/src/tateyama/endpoint/common/worker_common.h index 522688ea..44d51eda 100644 --- a/src/tateyama/endpoint/common/worker_common.h +++ b/src/tateyama/endpoint/common/worker_common.h @@ -61,19 +61,12 @@ class worker_common { stream, }; - worker_common(connection_type con, std::size_t session_id, std::string_view conn_info, std::shared_ptr session) + worker_common(connection_type con, std::size_t session_id, std::string_view conn_info, [[maybe_unused]] const std::shared_ptr& session) : connection_type_(con), session_id_(session_id), - session_info_(session_id, connection_label(con), conn_info), - session_(std::move(session)), - session_variable_set_(variable_declarations()), - session_context_(std::make_shared(session_info_, session_variable_set_)) - { - if (session_) { - session_->register_session(session_context_); - } + session_info_(session_id, connection_label(con), conn_info) { } - worker_common(connection_type con, std::size_t id, std::shared_ptr session) : worker_common(con, id, "", std::move(session)) { + worker_common(connection_type con, std::size_t id, const std::shared_ptr& session) : worker_common(con, id, "", session) { } void invoke(std::function func) { task_ = std::packaged_task(std::move(func)); @@ -88,7 +81,6 @@ class worker_common { const connection_type connection_type_; // NOLINT const std::size_t session_id_; // NOLINT session_info_impl session_info_; // NOLINT - // for ipc endpoint only std::string connection_info_{}; // NOLINT // for stream endpoint only @@ -99,9 +91,6 @@ class worker_common { std::future future_; // NOLINT std::thread thread_{}; // NOLINT - // for session management - const std::shared_ptr session_; // NOLINT - bool handshake(tateyama::api::server::request* req, tateyama::api::server::response* res) { if (req->service_id() != tateyama::framework::service_id_endpoint_broker) { LOG(INFO) << "request received is not handshake"; @@ -181,53 +170,7 @@ class worker_common { record.release_message(); } - bool endpoint_service([[maybe_unused]] const std::shared_ptr& req, - const std::shared_ptr& res, - std::size_t slot) { - auto data = req->payload(); - tateyama::proto::endpoint::request::Request rq{}; - if(! rq.ParseFromArray(data.data(), static_cast(data.size()))) { - std::string error_message{"request parse error"}; - LOG(INFO) << error_message; - notify_client(res.get(), tateyama::proto::diagnostics::Code::INVALID_REQUEST, error_message); - return false; - } - if(rq.command_case() != tateyama::proto::endpoint::request::Request::kCancel) { - std::stringstream ss; - ss << "bad request (cancel in endpoint): " << rq.command_case(); - LOG(INFO) << ss.str(); - notify_client(res.get(), tateyama::proto::diagnostics::Code::INVALID_REQUEST, ss.str()); - return false; - } - { - std::lock_guard lock(mtx_responses_); - if (auto itr = responses_.find(slot); itr != responses_.end()) { - if (auto ptr = itr->second.lock(); ptr) { - ptr->set_cancel(res); - } - } - } - return true; - } - - void register_response(std::size_t slot, const std::shared_ptr& response) noexcept { - std::lock_guard lock(mtx_responses_); - if (auto itr = responses_.find(slot); itr != responses_.end()) { - responses_.erase(itr); - } - responses_.emplace(slot, response); - } - void remove_response(std::size_t slot) noexcept { - std::lock_guard lock(mtx_responses_); - responses_.erase(slot); - } - private: - tateyama::session::session_variable_set session_variable_set_; - const std::shared_ptr session_context_; - std::map> responses_{}; - std::mutex mtx_responses_{}; - std::string_view connection_label(connection_type con) { switch (con) { case connection_type::ipc: @@ -238,13 +181,6 @@ class worker_common { return ""; } } - - [[nodiscard]] std::vector> variable_declarations() const noexcept { - return { - { "example_integer", tateyama::session::session_variable_type::signed_integer, static_cast(0) } - }; - } - }; } // tateyama::endpoint::common diff --git a/src/tateyama/endpoint/ipc/bootstrap/worker.cpp b/src/tateyama/endpoint/ipc/bootstrap/worker.cpp index 34592430..13a0471e 100644 --- a/src/tateyama/endpoint/ipc/bootstrap/worker.cpp +++ b/src/tateyama/endpoint/ipc/bootstrap/worker.cpp @@ -54,22 +54,11 @@ void Worker::run() } auto request = std::make_shared(*wire_, h, database_info_, session_info_); - std::size_t index = h.get_idx(); - auto response = std::make_shared(wire_, h.get_idx(), [this, index](){remove_response(index);}); - if (request->service_id() != tateyama::framework::service_id_endpoint_broker) { - register_response(index, static_cast>(response)); - if (!service_(static_cast>(request), - static_cast>(std::move(response)))) { - VLOG_LP(log_info) << "terminate worker because service returns an error"; - break; - } - } else { - if (!endpoint_service(static_cast>(request), - static_cast>(std::move(response)), - index)) { - VLOG_LP(log_info) << "terminate worker because endpoint service returns an error"; - break; - } + auto response = std::make_shared(wire_, h.get_idx()); + if (!service_(static_cast>(request), + static_cast>(std::move(response)))) { + VLOG_LP(log_info) << "terminate worker because service returns an error"; + break; } request->dispose(); request = nullptr; diff --git a/src/tateyama/endpoint/stream/bootstrap/stream_worker.cpp b/src/tateyama/endpoint/stream/bootstrap/stream_worker.cpp index 219e7f41..89272135 100644 --- a/src/tateyama/endpoint/stream/bootstrap/stream_worker.cpp +++ b/src/tateyama/endpoint/stream/bootstrap/stream_worker.cpp @@ -75,20 +75,10 @@ void stream_worker::run() auto request = std::make_shared(*session_stream_, payload, database_info_, session_info_); auto response = std::make_shared(session_stream_, slot); - if (request->service_id() != tateyama::framework::service_id_endpoint_broker) { - register_response(slot, static_cast>(response)); - if(!service_(static_cast>(request), - static_cast>(std::move(response)))) { - VLOG_LP(log_info) << "terminate worker because service returns an error"; - break; - } - } else { - if (!endpoint_service(static_cast>(request), - static_cast>(std::move(response)), - slot)) { - VLOG_LP(log_info) << "terminate worker because endpoint service returns an error"; - break; - } + if(!service_(static_cast>(request), + static_cast>(std::move(response)))) { + VLOG_LP(log_info) << "terminate worker because service returns an error"; + break; } request = nullptr; }