Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix freeing of loaned chunks #365

Merged
merged 7 commits into from
Feb 16, 2022
15 changes: 14 additions & 1 deletion rmw_cyclonedds_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,16 @@ find_package(tracetools REQUIRED)

#find_package(cyclonedds_cmake_module REQUIRED)
find_package(CycloneDDS QUIET CONFIG)
if(NOT CycloneDDS_FOUND)
if(CycloneDDS_FOUND)
# Support for shared memory in RMW depends on support for shared memory being compiled in
# Cyclone DDS and iceoryx_binding_c being available
get_target_property(_cyclonedds_has_shm CycloneDDS::ddsc SHM_SUPPORT_IS_AVAILABLE)
if(_cyclonedds_has_shm)
find_package(iceoryx_binding_c REQUIRED)
clalancette marked this conversation as resolved.
Show resolved Hide resolved
else()
message(STATUS "Cyclone DDS is NOT compiled with support for shared memory")
endif()
else()
message(WARNING "Could not find Eclipse Cyclone DDS - skipping '${PROJECT_NAME}'")
ament_package()
return()
Expand Down Expand Up @@ -69,6 +78,10 @@ add_library(rmw_cyclonedds_cpp
target_link_libraries(rmw_cyclonedds_cpp PRIVATE
CycloneDDS::ddsc)

if(_cyclonedds_has_shm)
target_link_libraries(rmw_cyclonedds_cpp PRIVATE iceoryx_binding_c::iceoryx_binding_c)
endif()

target_link_libraries(rmw_cyclonedds_cpp PUBLIC
rmw::rmw)
target_link_libraries(rmw_cyclonedds_cpp PRIVATE
Expand Down
1 change: 1 addition & 0 deletions rmw_cyclonedds_cpp/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
<buildtool_depend>ament_cmake_ros</buildtool_depend>

<depend>cyclonedds</depend>
<depend>iceoryx_binding_c</depend>
<depend>rcutils</depend>
<depend>rcpputils</depend>
<depend>rmw</depend>
Expand Down
27 changes: 19 additions & 8 deletions rmw_cyclonedds_cpp/src/rmw_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1583,6 +1583,8 @@ extern "C" rmw_ret_t rmw_publish_serialized_message(
RET_NULL_X(sample_ptr, return RMW_RET_ERROR);
if (rmw_deserialize(serialized_message, &pub->type_supports, sample_ptr) != RMW_RET_OK) {
RMW_SET_ERROR_MSG("Failed to deserialize sample into loaned memory");
fini_and_free_sample(pub, sample_ptr);
ddsi_serdata_unref(d);
return RMW_RET_ERROR;
}
d->iox_chunk = SHIFT_BACK_TO_ICEORYX_HEADER(sample_ptr);
Expand Down Expand Up @@ -1619,12 +1621,14 @@ static rmw_ret_t publish_loaned_int(

// if the publisher allow loaning
if (cdds_publisher->is_loaning_available) {
auto d = std::make_unique<serdata_rmw>(cdds_publisher->sertype, ddsi_serdata_kind::SDK_DATA);
auto d = new serdata_rmw(cdds_publisher->sertype, ddsi_serdata_kind::SDK_DATA);
d->iox_chunk = SHIFT_BACK_TO_ICEORYX_HEADER(ros_message);
if (dds_writecdr(cdds_publisher->enth, d.release()) >= 0) {
if (dds_writecdr(cdds_publisher->enth, d) >= 0) {
return RMW_RET_OK;
} else {
RMW_SET_ERROR_MSG("Failed to publish data");
fini_and_free_sample(cdds_publisher, ros_message);
ddsi_serdata_unref(d);
return RMW_RET_ERROR;
}
} else {
Expand Down Expand Up @@ -3007,13 +3011,11 @@ static rmw_ret_t rmw_take_ser_int(
SHIFT_PAST_ICEORYX_HEADER(d->iox_chunk), &sub->type_supports,
serialized_message) != RMW_RET_OK)
{
RMW_SET_ERROR_MSG("Failed to srialize sample from loaned memory");
RMW_SET_ERROR_MSG("Failed to serialize sample from loaned memory");
ddsi_serdata_unref(d);
return RMW_RET_ERROR;
}
// free the loaned memory
dds_data_allocator_init(sub->enth, &sub->data_allocator);
dds_data_allocator_free(&sub->data_allocator, d->iox_chunk);
dds_data_allocator_fini(&sub->data_allocator);
ddsi_serdata_unref(d);
*taken = true;
return RMW_RET_OK;
} else // NOLINT
Expand Down Expand Up @@ -3080,23 +3082,32 @@ static rmw_ret_t rmw_take_loan_int(
if (d->iox_chunk != nullptr) {
*loaned_message = SHIFT_PAST_ICEORYX_HEADER(d->iox_chunk);
*taken = true;
// doesn't allocate, but initialise the allocator to free the chunk later
// doesn't allocate, but initialise the allocator to free the chunk later when the loan
// is returned
dds_data_allocator_init(
cdds_subscription->enth, &cdds_subscription->data_allocator);
// set the loaned chunk to null, so that the loaned chunk is not release in
// rmw_serdata_free(), but will be released when
// `rmw_return_loaned_message_from_subscription()` is called
d->iox_chunk = nullptr;
ddsi_serdata_unref(d);
return RMW_RET_OK;
} else if (d->type->iox_size > 0U) {
auto sample_ptr = init_and_alloc_sample(cdds_subscription, d->type->iox_size, true);
RET_NULL_X(sample_ptr, return RMW_RET_ERROR);
ddsi_serdata_to_sample(d, sample_ptr, nullptr, nullptr);
*loaned_message = sample_ptr;
ddsi_serdata_unref(d);
*taken = true;
return RMW_RET_OK;
} else {
RMW_SET_ERROR_MSG("Data nor loan is available to take");
ddsi_serdata_unref(d);
*taken = false;
return RMW_RET_ERROR;
}
}
ddsi_serdata_unref(d);
}
*taken = false;
return RMW_RET_OK;
Expand Down
88 changes: 55 additions & 33 deletions rmw_cyclonedds_cpp/src/serdata.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,14 @@ static uint32_t serdata_rmw_size(const struct ddsi_serdata * dcmn)

static void serdata_rmw_free(struct ddsi_serdata * dcmn)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unhelpful parameter name, can we change this here (even just data or serdata would be less confusing ...). I suppose it is for something like common but this does also not really match with later conversions.

{
auto * d = static_cast<const serdata_rmw *>(dcmn);
auto * d = static_cast<serdata_rmw *>(dcmn);

#ifdef DDS_HAS_SHM
if (d->iox_chunk && d->iox_subscriber) {
iox_sub_release_chunk(*static_cast<iox_sub_t *>(d->iox_subscriber), d->iox_chunk);
d->iox_chunk = nullptr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless the destructor tries to access d->iox_chunk (in the delete d call) it this should also not be needed.

}
#endif
delete d;
}

Expand All @@ -173,28 +180,33 @@ static struct ddsi_serdata * serdata_rmw_from_ser(
enum ddsi_serdata_kind kind,
const struct nn_rdata * fragchain, size_t size)
{
auto d = std::make_unique<serdata_rmw>(type, kind);
uint32_t off = 0;
assert(fragchain->min == 0);
assert(fragchain->maxp1 >= off); /* CDR header must be in first fragment */
d->resize(size);

auto cursor = d->data();
while (fragchain) {
if (fragchain->maxp1 > off) {
/* only copy if this fragment adds data */
const unsigned char * payload =
NN_RMSG_PAYLOADOFF(fragchain->rmsg, NN_RDATA_PAYLOAD_OFF(fragchain));
auto src = payload + off - fragchain->min;
auto n_bytes = fragchain->maxp1 - off;
memcpy(cursor, src, n_bytes);
cursor = byte_offset(cursor, n_bytes);
off = fragchain->maxp1;
assert(off <= size);
try {
auto d = std::make_unique<serdata_rmw>(type, kind);
uint32_t off = 0;
assert(fragchain->min == 0);
assert(fragchain->maxp1 >= off); /* CDR header must be in first fragment */
d->resize(size);

auto cursor = d->data();
while (fragchain) {
if (fragchain->maxp1 > off) {
/* only copy if this fragment adds data */
const unsigned char * payload =
NN_RMSG_PAYLOADOFF(fragchain->rmsg, NN_RDATA_PAYLOAD_OFF(fragchain));
auto src = payload + off - fragchain->min;
auto n_bytes = fragchain->maxp1 - off;
memcpy(cursor, src, n_bytes);
cursor = byte_offset(cursor, n_bytes);
off = fragchain->maxp1;
assert(off <= size);
}
fragchain = fragchain->nextfrag;
}
fragchain = fragchain->nextfrag;
return d.release();
} catch (std::exception & e) {
RMW_SET_ERROR_MSG(e.what());
return nullptr;
}
return d.release();
}

static struct ddsi_serdata * serdata_rmw_from_ser_iov(
Expand All @@ -203,15 +215,20 @@ static struct ddsi_serdata * serdata_rmw_from_ser_iov(
ddsrt_msg_iovlen_t niov, const ddsrt_iovec_t * iov,
size_t size)
{
auto d = std::make_unique<serdata_rmw>(type, kind);
d->resize(size);
try {
auto d = std::make_unique<serdata_rmw>(type, kind);
d->resize(size);

auto cursor = d->data();
for (ddsrt_msg_iovlen_t i = 0; i < niov; i++) {
memcpy(cursor, iov[i].iov_base, iov[i].iov_len);
cursor = byte_offset(cursor, iov[i].iov_len);
auto cursor = d->data();
for (ddsrt_msg_iovlen_t i = 0; i < niov; i++) {
memcpy(cursor, iov[i].iov_base, iov[i].iov_len);
cursor = byte_offset(cursor, iov[i].iov_len);
}
return d.release();
} catch (std::exception & e) {
RMW_SET_ERROR_MSG(e.what());
return nullptr;
}
return d.release();
}

static struct ddsi_serdata * serdata_rmw_from_keyhash(
Expand Down Expand Up @@ -244,11 +261,16 @@ static struct ddsi_serdata * serdata_rmw_from_iox(
const struct ddsi_sertype * typecmn,
enum ddsi_serdata_kind kind, void * sub, void * iox_buffer)
{
static_cast<void>(sub); // unused
const struct sertype_rmw * type = static_cast<const struct sertype_rmw *>(typecmn);
auto d = std::make_unique<serdata_rmw>(type, kind);
d->iox_chunk = iox_buffer;
return d.release();
try {
const struct sertype_rmw * type = static_cast<const struct sertype_rmw *>(typecmn);
auto d = std::make_unique<serdata_rmw>(type, kind);
d->iox_chunk = iox_buffer;
d->iox_subscriber = sub;
return d.release();
} catch (std::exception & e) {
RMW_SET_ERROR_MSG(e.what());
return nullptr;
}
}
#endif // DDS_HAS_SHM

Expand Down
3 changes: 3 additions & 0 deletions rmw_cyclonedds_cpp/src/serdata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
#include "dds/ddsi/ddsi_serdata.h"
#ifdef DDS_HAS_SHM
#include "dds/ddsi/q_xmsg.h"
extern "C" {
#include "dds/ddsi/shm_sync.h"
}
#endif // DDS_HAS_SHM

#if !DDS_HAS_DDSI_SERTYPE
Expand Down