Skip to content

Commit

Permalink
updated wire.h to be the same as tateyama
Browse files Browse the repository at this point in the history
  • Loading branch information
t-horikawa committed Apr 25, 2024
1 parent e1a550a commit 128abdb
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 16 deletions.
31 changes: 16 additions & 15 deletions modules/ipc/src/main/native/include/wire.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class message_header {
public:
using length_type = std::uint32_t;
using index_type = std::uint16_t;
static constexpr index_type null_request = 0xffff;
static constexpr index_type terminate_request = 0xffff;

static constexpr std::size_t size = sizeof(length_type) + sizeof(index_type);

Expand Down Expand Up @@ -376,18 +376,23 @@ class unidirectional_message_wire : public simple_wire<message_header> {

/**
* @brief wait a request message arives and peep the current header.
* @returnm the essage_header if request message has been received,
* otherwise, say timeout or termination requested, dummy request message whose length is 0 and index is message_header::null_request.
* @returnm the essage_header if request message has been received, for normal reception of request message.
* otherwise, dummy request message whose length is 0 and index is message_header::termination_request for termination request,
* and dummy request message whose length is 0 and index is message_header::timeout for timeout.
*/
message_header peep(const char* base) {
while (true) {
if(stored() >= message_header::size) {
copy_header(base);
return header_received_;
}
if (termination_requested_.load() || onetime_notification_.load()) {
if (termination_requested_.load()) {
termination_requested_.store(false);
return {message_header::terminate_request, 0};
}
if (onetime_notification_.load()) {
onetime_notification_.store(false);
return {message_header::null_request, 0};
return {message_header::terminate_request, 0};
}
boost::interprocess::scoped_lock lock(m_mutex_);
wait_for_read_ = true;
Expand All @@ -396,19 +401,11 @@ class unidirectional_message_wire : public simple_wire<message_header> {
boost::get_system_time() + boost::posix_time::microseconds(u_cap(u_round(watch_interval * 1000 * 1000))),
[this](){ return (stored() >= message_header::size) || termination_requested_.load() || onetime_notification_.load(); })) {
wait_for_read_ = false;
header_received_ = message_header(message_header::null_request, 0);
return header_received_;
throw std::runtime_error("request has not been received within the specified time");
}
wait_for_read_ = false;
}
}
/**
* @brief check if an termination request has been made
* @retrun true if terminate request has been made
*/
[[nodiscard]] bool terminate_requested() {
return termination_requested_.load();
}
/**
* @brief wake up the worker immediately.
*/
Expand Down Expand Up @@ -470,7 +467,11 @@ class unidirectional_response_wire : public simple_wire<response_header> {
/**
* @brief wait for response arrival and return its header.
*/
response_header await(const char* base, std::int64_t timeout) {
response_header await(const char* base, std::int64_t timeout = 0) {
if (timeout == 0) {
timeout = watch_interval * 1000 * 1000;
}

while (true) {
if (closed_.load()) {
header_received_ = response_header(0, 0, 0);
Expand Down
10 changes: 9 additions & 1 deletion modules/ipc/src/test/native/src/server_wireJNI.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,15 @@ JNIEXPORT jbyteArray JNICALL Java_com_tsurugidb_tsubakuro_channel_ipc_sql_Server
server_wire_container* container = reinterpret_cast<server_wire_container*>(static_cast<std::uintptr_t>(handle));

auto& wire = container->get_request_wire();
message_header h = wire.peep();
message_header h{};
while (true) {
try {
h = wire.peep();
break;
} catch (std::runtime_error&) {
continue;
}
}
std::size_t length = h.get_length();
jbyteArray dstj = env->NewByteArray(length);
if (dstj != nullptr) {
Expand Down

0 comments on commit 128abdb

Please sign in to comment.