Skip to content

Commit

Permalink
Merge pull request #203 from project-tsurugi/revise/ipc_memory
Browse files Browse the repository at this point in the history
revise ipc memory stats
  • Loading branch information
t-horikawa committed May 23, 2024
2 parents fee0b5e + eeabd6b commit e7c77f5
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 52 deletions.
5 changes: 3 additions & 2 deletions src/tateyama/endpoint/ipc/bootstrap/ipc_listener.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ class ipc_listener {
// set maximum thread size to status objects
status_->set_maximum_sessions(threads);

// set memory usage per session to ipc_metrics
ipc_metrics_.memory_usage(server_wire_container_impl::memory_usage(datachannel_buffer_size_, max_datachannel_buffers_));
// set memory usage parameters to ipc_metrics
ipc_metrics_.set_memory_parameters(connection_container::fixed_memory_size(threads),
server_wire_container_impl::proportional_memory_size(datachannel_buffer_size_, max_datachannel_buffers_));

// output configuration to be used
LOG(INFO) << tateyama::endpoint::common::ipc_endpoint_config_prefix
Expand Down
21 changes: 11 additions & 10 deletions src/tateyama/endpoint/ipc/bootstrap/server_wires_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ class server_wire_container_impl : public server_wire_container
unrestricted_permissions.set_unrestricted();

managed_shared_memory_ =
std::make_unique<boost::interprocess::managed_shared_memory>(boost::interprocess::create_only, name_.c_str(), memory_usage(datachannel_buffer_size_, max_datachannel_buffers), nullptr, unrestricted_permissions);
std::make_unique<boost::interprocess::managed_shared_memory>(boost::interprocess::create_only, name_.c_str(), proportional_memory_size(datachannel_buffer_size_, max_datachannel_buffers), nullptr, unrestricted_permissions);
auto req_wire = managed_shared_memory_->construct<tateyama::common::wire::unidirectional_message_wire>(tateyama::common::wire::request_wire_name)(managed_shared_memory_.get(), request_buffer_size);
auto res_wire = managed_shared_memory_->construct<tateyama::common::wire::unidirectional_response_wire>(tateyama::common::wire::response_wire_name)(managed_shared_memory_.get(), response_buffer_size);
status_provider_ = managed_shared_memory_->construct<tateyama::common::wire::status_provider>(tateyama::common::wire::status_provider_name)(managed_shared_memory_.get(), mutex_file);
Expand Down Expand Up @@ -558,7 +558,7 @@ class server_wire_container_impl : public server_wire_container
response_wire_.notify_shutdown();
}

static std::size_t memory_usage(std::size_t datachannel_buffer_size, std::size_t max_datachannel_buffers) {
static std::size_t proportional_memory_size(std::size_t datachannel_buffer_size, std::size_t max_datachannel_buffers) {
return (datachannel_buffer_size + data_channel_overhead) * max_datachannel_buffers + (request_buffer_size + response_buffer_size) + total_overhead;
}

Expand Down Expand Up @@ -599,16 +599,16 @@ inline void server_wire_container_impl::resultset_wire_container_impl::write_com
class connection_container
{
public:
explicit connection_container(std::string_view name, std::size_t n) : name_(name) {
explicit connection_container(std::string_view name, std::size_t threads) : name_(name) {
boost::interprocess::shared_memory_object::remove(name_.c_str());
try {
boost::interprocess::permissions unrestricted_permissions;
unrestricted_permissions.set_unrestricted();

managed_shared_memory_ =
std::make_unique<boost::interprocess::managed_shared_memory>(boost::interprocess::create_only, name_.c_str(), request_queue_size(n), nullptr, unrestricted_permissions);
std::make_unique<boost::interprocess::managed_shared_memory>(boost::interprocess::create_only, name_.c_str(), fixed_memory_size(threads), nullptr, unrestricted_permissions);
managed_shared_memory_->destroy<tateyama::common::wire::connection_queue>(tateyama::common::wire::connection_queue::name);
connection_queue_ = managed_shared_memory_->construct<tateyama::common::wire::connection_queue>(tateyama::common::wire::connection_queue::name)(n, managed_shared_memory_->get_segment_manager());
connection_queue_ = managed_shared_memory_->construct<tateyama::common::wire::connection_queue>(tateyama::common::wire::connection_queue::name)(threads, managed_shared_memory_->get_segment_manager());
}
catch(const boost::interprocess::interprocess_exception& ex) {
using namespace std::literals::string_view_literals;
Expand Down Expand Up @@ -642,18 +642,19 @@ class connection_container
return connection_queue_->session_id_accepted();
}

static std::size_t fixed_memory_size(std::size_t n) {
std::size_t size = initial_size + (n * per_size); // exact size
size += initial_size / 2; // a little bit of leeway
return ((size / 4096) + 1) * 4096; // round up to the page size
}

private:
std::string name_;
std::unique_ptr<boost::interprocess::managed_shared_memory> managed_shared_memory_{};
tateyama::common::wire::connection_queue* connection_queue_;

static constexpr std::size_t initial_size = 720; // obtained by experiment
static constexpr std::size_t per_size = 112; // obtained by experiment
std::size_t request_queue_size(std::size_t n) {
std::size_t size = initial_size + (n * per_size); // exact size
size += initial_size / 2; // a little bit of leeway
return ((size / 4096) + 1) * 4096; // round up to the page size
}
};

}; // namespace tateyama::common::wire
67 changes: 27 additions & 40 deletions src/tateyama/endpoint/ipc/metrics/ipc_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,66 +50,53 @@ class ipc_metrics {
// for ipc_memory
class ipc_memory_aggregator : public tateyama::metrics::metrics_aggregator {
public:
void add(tateyama::metrics::metrics_metadata const& metadata, double value) override {
if (metadata.key() == "ipc_session_count") {
session_count_ = value;
} else if (metadata.key() == "memory_usage_per_session") {
memory_per_session_ = value;
}
ipc_memory_aggregator(std::size_t fixed, std::size_t proportional)
: fixed_(static_cast<double>(fixed)), proportional_(static_cast<double>(proportional)) {
}
ipc_memory_aggregator() = delete;
void add(tateyama::metrics::metrics_metadata const&, double value) override {
ipc_session_count_ = value;
}
result_type aggregate() override {
if(tateyama::metrics::service::ipc_correction) {
return (session_count_ - 1.0) * memory_per_session_;
return fixed_ + (ipc_session_count_ - 1.0) * proportional_;
}
return session_count_ * memory_per_session_;
return fixed_ + ipc_session_count_ * proportional_;
}
private:
double memory_per_session_{};
double session_count_{};
double fixed_;
double proportional_{};
double ipc_session_count_{};
};

public:
explicit ipc_metrics(tateyama::framework::environment& env)
: metrics_store_(env.resource_repository().find<::tateyama::metrics::resource::bridge>()->metrics_store()),
session_count_(metrics_store_.register_item(session_count_metadata_)),
ipc_memory_usage_(metrics_store_.register_item(ipc_memory_usage_metada_))
{
metrics_store_.register_aggregation(tateyama::metrics::metrics_aggregation{"session_count", "number of active sessions", [](){return std::make_unique<session_count_aggregator>();}});
metrics_store_.register_aggregation(tateyama::metrics::metrics_aggregation{"ipc_buffer_size", "allocated buffer size for all IPC sessions", [](){return std::make_unique<ipc_memory_aggregator>();}});
}
session_count_slot_(metrics_store_.register_item(tateyama::metrics::metrics_metadata{"ipc_session_count"s, "number of active ipc sessions"s,
std::vector<std::tuple<std::string, std::string>> {},
std::vector<std::string> {"session_count"s, "ipc_buffer_size"s},
false})) {
metrics_store_.register_aggregation(tateyama::metrics::metrics_aggregation{"session_count", "number of active sessions", [](){return std::make_unique<session_count_aggregator>();}});
}

private:
tateyama::metrics::metrics_store& metrics_store_;
tateyama::metrics::metrics_item_slot& session_count_slot_;

tateyama::metrics::metrics_metadata session_count_metadata_ {
"ipc_session_count"s, "number of active ipc sessions"s,
std::vector<std::tuple<std::string, std::string>> {},
std::vector<std::string> {"session_count"s, "ipc_buffer_size"s},
false
};
tateyama::metrics::metrics_metadata ipc_memory_usage_metada_ {
"memory_usage_per_session"s, "memory usage per session"s,
std::vector<std::tuple<std::string, std::string>> {},
std::vector<std::string> {"ipc_buffer_size"s},
false
};

// have to be placed after corresponding metrics_metadata definition
tateyama::metrics::metrics_item_slot& session_count_;
tateyama::metrics::metrics_item_slot& ipc_memory_usage_;

std::atomic_long count_{};
std::atomic_long session_count_{};

void memory_usage(std::size_t bytes) noexcept {
ipc_memory_usage_ = static_cast<double>(bytes);
void set_memory_parameters(std::size_t f, std::size_t p) noexcept {
metrics_store_.register_aggregation(tateyama::metrics::metrics_aggregation{"ipc_buffer_size",
"allocated buffer size for all IPC sessions",
[f, p](){return std::make_unique<ipc_memory_aggregator>(f, p);}});
}
void increase() noexcept {
count_++;
session_count_ = static_cast<double>(count_.load());
session_count_++;
session_count_slot_ = static_cast<double>(session_count_.load());
}
void decrease() noexcept {
count_--;
session_count_ = static_cast<double>(count_.load());
session_count_--;
session_count_slot_ = static_cast<double>(session_count_.load());
}

friend class tateyama::endpoint::ipc::bootstrap::ipc_listener;
Expand Down

0 comments on commit e7c77f5

Please sign in to comment.