Skip to content

Commit

Permalink
Merge pull request #457 from project-tsurugi/work
Browse files Browse the repository at this point in the history
updated wire.h to be the same as tateyama
  • Loading branch information
t-horikawa committed Jul 2, 2024
2 parents 05843fe + ca9b5e2 commit b236f56
Showing 1 changed file with 29 additions and 12 deletions.
41 changes: 29 additions & 12 deletions modules/ipc/src/main/native/include/wire.h
Original file line number Diff line number Diff line change
Expand Up @@ -1012,30 +1012,41 @@ class connection_queue
}
void push(std::size_t sid, std::size_t admin_slots = 0) {
boost::interprocess::scoped_lock lock(mutex_);
queue_.at(index(pushed_.load() + admin_slots)) = sid;
pushed_.fetch_add(1);
if (admin_slots > 0 && is_admin(sid)) {
queue_.at(index(pushed_.load() + admin_slots)) = reset_admin(sid);
admin_slots_in_use_.fetch_sub(1, std::memory_order_release);
pushed_.fetch_add(1);
} else {
queue_.at(index(pushed_.load() + admin_slots)) = sid;
pushed_.fetch_add(1, std::memory_order_release);
}
std::atomic_thread_fence(std::memory_order_acq_rel);
condition_.notify_one();
}
[[nodiscard]] std::size_t try_pop() {
boost::interprocess::scoped_lock lock(mutex_); // trade off
auto current = poped_.load();
while (true) {
if (pushed_.load() <= current) {
throw std::runtime_error("no request available");
auto ps = pushed_.load(std::memory_order_acquire);
if ((ps + admin_slots_in_use_.load()) <= current) {
throw std::runtime_error("no request slot is available for normal request");
}
if (poped_.compare_exchange_strong(current, current + 1)) {
return queue_.at(index(current));
}
}
}
[[nodiscard]] std::size_t try_pop(std::uint8_t admin_slots) {
boost::interprocess::scoped_lock lock(mutex_);
auto current = poped_.load();
while (true) {
if ((pushed_.load() + admin_slots) <= current) {
throw std::runtime_error("no request available");
auto ps = pushed_.load(std::memory_order_acquire);
if ((ps + (admin_slots - admin_slots_in_use_.load())) <= current) {
throw std::runtime_error("no request slot is available for admin request");
}
if (poped_.compare_exchange_strong(current, current + 1)) {
return queue_.at(index(current));
admin_slots_in_use_.fetch_add(1);
return set_admin(queue_.at(index(current)));
}
}
}
Expand Down Expand Up @@ -1064,7 +1075,8 @@ class connection_queue
}
private:
boost::interprocess::vector<std::size_t, long_allocator> queue_;
std::size_t capacity_;
std::uint32_t capacity_;
std::atomic_uint8_t admin_slots_in_use_{0};
boost::interprocess::interprocess_mutex mutex_{};
boost::interprocess::interprocess_condition condition_{};

Expand Down Expand Up @@ -1136,6 +1148,11 @@ class connection_queue

using element_allocator = boost::interprocess::allocator<element, boost::interprocess::managed_shared_memory::segment_manager>;
constexpr static std::size_t session_id_indicating_error = UINT64_MAX;
constexpr static std::size_t admin_bit = 1ULL << 63UL;

static std::size_t set_admin(std::size_t slot) { return slot | admin_bit; }
static std::size_t reset_admin(std::size_t slot) { return slot & ~admin_bit; }
static bool is_admin(std::size_t slot) { return (slot & admin_bit) != 0; }

/**
* @brief Construct a new object.
Expand Down Expand Up @@ -1165,7 +1182,7 @@ class connection_queue
return sid;
}
std::size_t wait(std::size_t sid, std::int64_t timeout = 0) {
auto& entry = v_requested_.at(sid);
auto& entry = v_requested_.at(reset_admin(sid));
try {
auto rtnv = entry.wait(timeout);
entry.reuse();
Expand All @@ -1176,7 +1193,7 @@ class connection_queue
}
}
bool check(std::size_t sid) {
return v_requested_.at(sid).check();
return v_requested_.at(reset_admin(sid)).check();
}
std::size_t listen() {
if (q_requested_.wait(terminate_)) {
Expand All @@ -1190,12 +1207,12 @@ class connection_queue
// either accept() or reject() must be called
void accept(std::size_t sid, std::size_t session_id) {
q_requested_.pop();
v_requested_.at(sid).accept(session_id);
v_requested_.at(reset_admin(sid)).accept(session_id);
}
// either accept() or reject() must be called
void reject(std::size_t sid) {
q_requested_.pop();
v_requested_.at(sid).reject();
v_requested_.at(reset_admin(sid)).reject();
q_free_.push(sid, admin_slots_);
}
void disconnect(std::size_t sid) {
Expand Down

0 comments on commit b236f56

Please sign in to comment.