Skip to content

Commit

Permalink
Fix wrong usage of take_next_sample with read samples
Browse files Browse the repository at this point in the history
Signed-off-by: Ricardo González Moreno <ricardo@richiware.dev>
  • Loading branch information
richiware committed Jul 20, 2022
1 parent 2cc7df0 commit 3f796a2
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ typedef struct CustomClientResponse
{
eprosima::fastrtps::rtps::SampleIdentity sample_identity_;
std::unique_ptr<eprosima::fastcdr::FastBuffer> buffer_;
eprosima::fastdds::dds::SampleInfo sample_info_ {};
} CustomClientResponse;

class ClientListener : public eprosima::fastdds::dds::DataReaderListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ typedef struct CustomServiceRequest
{
eprosima::fastrtps::rtps::SampleIdentity sample_identity_;
eprosima::fastcdr::FastBuffer * buffer_;
eprosima::fastdds::dds::SampleInfo sample_info_ {};

CustomServiceRequest()
: buffer_(nullptr)
Expand Down
27 changes: 17 additions & 10 deletions rmw_fastrtps_shared_cpp/src/rmw_request.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "fastcdr/FastBuffer.h"

#include "fastdds/rtps/common/WriteParams.h"
#include "fastdds/dds/core/StackAllocatedSequence.hpp"

#include "rmw/error_handling.h"
#include "rmw/rmw.h"
Expand Down Expand Up @@ -100,22 +101,24 @@ __rmw_take_request(
data.is_cdr_buffer = true;
data.data = request.buffer_;
data.impl = nullptr; // not used when is_cdr_buffer is true
if (info->request_reader_->take_next_sample(
&data,
&request.sample_info_) == ReturnCode_t::RETCODE_OK)
{
if (request.sample_info_.valid_data) {
request.sample_identity_ = request.sample_info_.sample_identity;

eprosima::fastdds::dds::StackAllocatedSequence<void *, 1> data_values;
const_cast<void **>(data_values.buffer())[0] = &data;
eprosima::fastdds::dds::SampleInfoSeq info_seq{1};

if (ReturnCode_t::RETCODE_OK == info->request_reader_->take(data_values, info_seq, 1)) {
if (info_seq[0].valid_data) {
request.sample_identity_ = info_seq[0].sample_identity;
// Use response subscriber guid (on related_sample_identity) when present.
const eprosima::fastrtps::rtps::GUID_t & reader_guid =
request.sample_info_.related_sample_identity.writer_guid();
info_seq[0].related_sample_identity.writer_guid();
if (reader_guid != eprosima::fastrtps::rtps::GUID_t::unknown()) {
request.sample_identity_.writer_guid() = reader_guid;
}

// Save both guids in the clients_endpoints map
const eprosima::fastrtps::rtps::GUID_t & writer_guid =
request.sample_info_.sample_identity.writer_guid();
info_seq[0].sample_identity.writer_guid();
info->pub_listener_->endpoint_add_reader_and_writer(reader_guid, writer_guid);

auto raw_type_support = dynamic_cast<rmw_fastrtps_shared_cpp::TypeSupport *>(
Expand All @@ -132,11 +135,15 @@ __rmw_take_request(
request_header->request_id.sequence_number =
((int64_t)request.sample_identity_.sequence_number().high) <<
32 | request.sample_identity_.sequence_number().low;
request_header->source_timestamp = request.sample_info_.source_timestamp.to_ns();
request_header->received_timestamp = request.sample_info_.source_timestamp.to_ns();
request_header->source_timestamp = info_seq[0].source_timestamp.to_ns();
request_header->received_timestamp = info_seq[0].source_timestamp.to_ns();
*taken = true;
}
}

info->request_reader_->return_loan(data_values, info_seq);
data_values.length(0);
info_seq.length(0);
}

delete request.buffer_;
Expand Down
23 changes: 15 additions & 8 deletions rmw_fastrtps_shared_cpp/src/rmw_response.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "fastcdr/Cdr.h"

#include "fastdds/rtps/common/WriteParams.h"
#include "fastdds/dds/core/StackAllocatedSequence.hpp"

#include "rmw/error_handling.h"
#include "rmw/rmw.h"
Expand Down Expand Up @@ -60,12 +61,14 @@ __rmw_take_response(
data.is_cdr_buffer = true;
data.data = response.buffer_.get();
data.impl = nullptr; // not used when is_cdr_buffer is true
if (info->response_reader_->take_next_sample(
&data,
&response.sample_info_) == ReturnCode_t::RETCODE_OK)
{
if (response.sample_info_.valid_data) {
response.sample_identity_ = response.sample_info_.related_sample_identity;

eprosima::fastdds::dds::StackAllocatedSequence<void *, 1> data_values;
const_cast<void **>(data_values.buffer())[0] = &data;
eprosima::fastdds::dds::SampleInfoSeq info_seq{1};

if (ReturnCode_t::RETCODE_OK == info->response_reader_->take(data_values, info_seq, 1)) {
if (info_seq[0].valid_data) {
response.sample_identity_ = info_seq[0].related_sample_identity;

if (response.sample_identity_.writer_guid() == info->reader_guid_ ||
response.sample_identity_.writer_guid() == info->writer_guid_)
Expand All @@ -79,8 +82,8 @@ __rmw_take_response(
if (raw_type_support->deserializeROSmessage(
deser, ros_response, info->response_type_support_impl_))
{
request_header->source_timestamp = response.sample_info_.source_timestamp.to_ns();
request_header->received_timestamp = response.sample_info_.reception_timestamp.to_ns();
request_header->source_timestamp = info_seq[0].source_timestamp.to_ns();
request_header->received_timestamp = info_seq[0].reception_timestamp.to_ns();
request_header->request_id.sequence_number =
((int64_t)response.sample_identity_.sequence_number().high) <<
32 | response.sample_identity_.sequence_number().low;
Expand All @@ -89,6 +92,10 @@ __rmw_take_response(
}
}
}

info->response_reader_->return_loan(data_values, info_seq);
data_values.length(0);
info_seq.length(0);
}

return RMW_RET_OK;
Expand Down
90 changes: 61 additions & 29 deletions rmw_fastrtps_shared_cpp/src/rmw_take.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "rmw/rmw.h"

#include "fastdds/dds/subscriber/SampleInfo.hpp"
#include "fastdds/dds/core/StackAllocatedSequence.hpp"

#include "fastrtps/utils/collections/ResourceLimitedVector.hpp"

Expand Down Expand Up @@ -83,33 +84,50 @@ _take(
auto info = static_cast<CustomSubscriberInfo *>(subscription->data);
RCUTILS_CHECK_FOR_NULL_WITH_MSG(info, "custom subscriber info is null", return RMW_RET_ERROR);

eprosima::fastdds::dds::SampleInfo sinfo;

rmw_fastrtps_shared_cpp::SerializedData data;

data.is_cdr_buffer = false;
data.data = ros_message;
data.impl = info->type_support_impl_;

while (0 < info->data_reader_->get_unread_count()) {
if (info->data_reader_->take_next_sample(&data, &sinfo) == ReturnCode_t::RETCODE_OK) {
if (subscription->options.ignore_local_publications) {
auto sample_writer_guid =
eprosima::fastrtps::rtps::iHandle2GUID(sinfo.publication_handle);
eprosima::fastdds::dds::StackAllocatedSequence<void *, 1> data_values;
const_cast<void **>(data_values.buffer())[0] = &data;
eprosima::fastdds::dds::SampleInfoSeq info_seq{1};

if (sample_writer_guid.guidPrefix == info->data_reader_->guid().guidPrefix) {
// This is a local publication. Ignore it
continue;
}
while (ReturnCode_t::RETCODE_OK == info->data_reader_->take(data_values, info_seq, 1)) {
class ReturnLoan
{
public:
ReturnLoan(std::function<void()> functor)
: functor_(functor) {}

~ReturnLoan() {functor_();}

private:
std::function<void()> functor_;
} return_loan(
[&]()
{
info->data_reader_->return_loan(data_values, info_seq);
data_values.length(0);
info_seq.length(0);
});

if (subscription->options.ignore_local_publications) {
auto sample_writer_guid =
eprosima::fastrtps::rtps::iHandle2GUID(info_seq[0].publication_handle);

if (sample_writer_guid.guidPrefix == info->data_reader_->guid().guidPrefix) {
// This is a local publication. Ignore it
continue;
}
}

if (sinfo.valid_data) {
if (message_info) {
_assign_message_info(identifier, message_info, &sinfo);
}
*taken = true;
break;
if (info_seq[0].valid_data) {
if (message_info) {
_assign_message_info(identifier, message_info, &info_seq[0]);
}
*taken = true;
break;
}
}

Expand Down Expand Up @@ -144,14 +162,6 @@ _take_sequence(
auto info = static_cast<CustomSubscriberInfo *>(subscription->data);
RCUTILS_CHECK_FOR_NULL_WITH_MSG(info, "custom subscriber info is null", return RMW_RET_ERROR);

// Limit the upper bound of reads to the number unread at the beginning.
// This prevents any samples that are added after the beginning of the
// _take_sequence call from being read.
auto unread_count = info->data_reader_->get_unread_count();
if (unread_count < count) {
count = unread_count;
}

for (size_t ii = 0; ii < count; ++ii) {
taken_flag = false;
ret = _take(
Expand Down Expand Up @@ -315,8 +325,30 @@ _take_serialized_message(
data.data = &buffer;
data.impl = nullptr; // not used when is_cdr_buffer is true

if (info->data_reader_->take_next_sample(&data, &sinfo) == ReturnCode_t::RETCODE_OK) {
if (sinfo.valid_data) {
eprosima::fastdds::dds::StackAllocatedSequence<void *, 1> data_values;
const_cast<void **>(data_values.buffer())[0] = &data;
eprosima::fastdds::dds::SampleInfoSeq info_seq{1};

while (ReturnCode_t::RETCODE_OK == info->data_reader_->take(data_values, info_seq, 1)) {
class ReturnLoan
{
public:
ReturnLoan(std::function<void()> functor)
: functor_(functor) {}

~ReturnLoan() {functor_();}

private:
std::function<void()> functor_;
} return_loan(
[&]()
{
info->data_reader_->return_loan(data_values, info_seq);
data_values.length(0);
info_seq.length(0);
});

if (info_seq[0].valid_data) {
auto buffer_size = static_cast<size_t>(buffer.getBufferSize());
if (serialized_message->buffer_capacity < buffer_size) {
auto ret = rmw_serialized_message_resize(serialized_message, buffer_size);
Expand All @@ -328,7 +360,7 @@ _take_serialized_message(
memcpy(serialized_message->buffer, buffer.getBuffer(), serialized_message->buffer_length);

if (message_info) {
_assign_message_info(identifier, message_info, &sinfo);
_assign_message_info(identifier, message_info, &info_seq[0]);
}
*taken = true;
}
Expand Down

0 comments on commit 3f796a2

Please sign in to comment.