Skip to content

Commit

Permalink
Ensure compliant subscription API. (#419)
Browse files Browse the repository at this point in the history
Signed-off-by: Michel Hidalgo <michel@ekumenlabs.com>
  • Loading branch information
hidmic authored and ahcorde committed Oct 15, 2020
1 parent a8b2154 commit 285b3d4
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 181 deletions.
51 changes: 39 additions & 12 deletions rmw_fastrtps_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,12 @@ rmw_create_subscription(
const rmw_qos_profile_t * qos_policies,
const rmw_subscription_options_t * subscription_options)
{
if (!node) {
RMW_SET_ERROR_MSG("node handle is null");
return nullptr;
}

if (node->implementation_identifier != eprosima_fastrtps_identifier) {
RMW_SET_ERROR_MSG("node handle not from this implementation");
return nullptr;
}
RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
node,
node->implementation_identifier,
eprosima_fastrtps_identifier,
return nullptr);

auto participant_info =
static_cast<CustomParticipantInfo *>(node->context->impl->participant_info);
Expand Down Expand Up @@ -99,8 +96,18 @@ rmw_create_subscription(
static_cast<void *>(&msg),
nullptr);
if (RMW_RET_OK != rmw_ret) {
rmw_fastrtps_shared_cpp::__rmw_destroy_subscription(
rmw_error_state_t error_state = *rmw_get_error_state();
rmw_reset_error();
static_cast<void>(common_context->graph_cache.dissociate_writer(
info->subscription_gid_, common_context->gid, node->name, node->namespace_));
rmw_ret = rmw_fastrtps_shared_cpp::__rmw_destroy_subscription(
eprosima_fastrtps_identifier, node, subscription);
if (RMW_RET_OK != rmw_ret) {
RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "' cleanup\n");
rmw_reset_error();
}
rmw_set_error_state(error_state.message, error_state.file, error_state.line_number);
return nullptr;
}
}
Expand All @@ -121,13 +128,33 @@ rmw_subscription_get_actual_qos(
const rmw_subscription_t * subscription,
rmw_qos_profile_t * qos)
{
return rmw_fastrtps_shared_cpp::__rmw_subscription_get_actual_qos(
subscription, qos);
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
subscription,
subscription->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT);

return rmw_fastrtps_shared_cpp::__rmw_subscription_get_actual_qos(subscription, qos);
}

rmw_ret_t
rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
{
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
node,
node->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
subscription,
subscription->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

return rmw_fastrtps_shared_cpp::__rmw_destroy_subscription(
eprosima_fastrtps_identifier, node, subscription);
}
Expand Down
102 changes: 62 additions & 40 deletions rmw_fastrtps_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include "rmw/allocators.h"
#include "rmw/error_handling.h"
#include "rmw/rmw.h"
#include "rmw/validate_full_topic_name.h"

#include "rcpputils/scope_exit.hpp"

#include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp"
#include "rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp"
Expand Down Expand Up @@ -55,27 +58,30 @@ create_subscription(
bool keyed,
bool create_subscription_listener)
{
if (!topic_name || strlen(topic_name) == 0) {
RMW_SET_ERROR_MSG("subscription topic is null or empty string");
return nullptr;
}
if (!qos_policies) {
RMW_SET_ERROR_MSG("qos_policies is null");
return nullptr;
}
if (!subscription_options) {
RMW_SET_ERROR_MSG("subscription_options is null");
RMW_CHECK_ARGUMENT_FOR_NULL(participant_info, nullptr);
RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr);
RMW_CHECK_ARGUMENT_FOR_NULL(topic_name, nullptr);
if (0 == strlen(topic_name)) {
RMW_SET_ERROR_MSG("topic_name argument is an empty string");
return nullptr;
}
if (!participant_info) {
RMW_SET_ERROR_MSG("participant_info is null");
return nullptr;
RMW_CHECK_ARGUMENT_FOR_NULL(qos_policies, nullptr);
if (!qos_policies->avoid_ros_namespace_conventions) {
int validation_result = RMW_TOPIC_VALID;
rmw_ret_t ret = rmw_validate_full_topic_name(topic_name, &validation_result, nullptr);
if (RMW_RET_OK != ret) {
return nullptr;
}
if (RMW_TOPIC_VALID != validation_result) {
const char * reason = rmw_full_topic_name_validation_result_string(validation_result);
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("invalid topic_name argument: %s", reason);
return nullptr;
}
}
RMW_CHECK_ARGUMENT_FOR_NULL(subscription_options, nullptr);
Participant * participant = participant_info->participant;
if (!participant) {
RMW_SET_ERROR_MSG("participant handle is null");
return nullptr;
}
RMW_CHECK_FOR_NULL_WITH_MSG(participant, "participant handle is null", return nullptr);

const rosidl_message_type_support_t * type_support = get_message_typesupport_handle(
type_supports, RMW_FASTRTPS_CPP_TYPESUPPORT_C);
if (!type_support) {
Expand All @@ -89,17 +95,31 @@ create_subscription(
if (!is_valid_qos(*qos_policies)) {
return nullptr;
}
CustomSubscriberInfo * info = nullptr;
rmw_subscription_t * rmw_subscription = nullptr;
eprosima::fastrtps::SubscriberAttributes subscriberParam;

// Load default XML profile.
eprosima::fastrtps::SubscriberAttributes subscriberParam;
Domain::getDefaultSubscriberAttributes(subscriberParam);
info = new (std::nothrow) CustomSubscriberInfo();

CustomSubscriberInfo * info = new (std::nothrow) CustomSubscriberInfo();
if (!info) {
RMW_SET_ERROR_MSG("failed to allocate CustomSubscriberInfo");
return nullptr;
}
auto cleanup_info = rcpputils::make_scope_exit(
[info, participant]() {
if (info->type_support_) {
_unregister_type(participant, info->type_support_);
}
if (info->subscriber_) {
if (!Domain::removeSubscriber(info->subscriber_)) {
RMW_SAFE_FWRITE_TO_STDERR(
"Failed to remove subscriber after '"
RCUTILS_STRINGIFY(__function__) "' failed.\n");
}
}
delete info->listener_;
delete info;
});
info->typesupport_identifier_ = type_support->typesupport_identifier;
info->type_support_impl_ = type_support->data;

Expand All @@ -113,7 +133,7 @@ create_subscription(
info->type_support_ = new (std::nothrow) MessageTypeSupport_cpp(callbacks);
if (!info->type_support_) {
RMW_SET_ERROR_MSG("failed to allocate MessageTypeSupport_cpp");
goto fail;
return nullptr;
}
_register_type(participant, info->type_support_);
}
Expand All @@ -128,47 +148,49 @@ create_subscription(
subscriberParam.topic.topicName = _create_topic_name(qos_policies, ros_topic_prefix, topic_name);

if (!get_datareader_qos(*qos_policies, subscriberParam)) {
RMW_SET_ERROR_MSG("failed to get datareader qos");
goto fail;
return nullptr;
}
info->listener_ = nullptr;

if (create_subscription_listener) {
info->listener_ = new (std::nothrow) SubListener(info);
if (!info->listener_) {
RMW_SET_ERROR_MSG("create_subscriber() could not create subscriber listener");
goto fail;
return nullptr;
}
}

info->subscriber_ = Domain::createSubscriber(participant, subscriberParam, info->listener_);
if (!info->subscriber_) {
RMW_SET_ERROR_MSG("create_subscriber() could not create subscriber");
goto fail;
return nullptr;
}
info->subscription_gid_ = rmw_fastrtps_shared_cpp::create_rmw_gid(
eprosima_fastrtps_identifier, info->subscriber_->getGuid());
rmw_subscription = rmw_subscription_allocate();

rmw_subscription_t * rmw_subscription = rmw_subscription_allocate();
if (!rmw_subscription) {
RMW_SET_ERROR_MSG("failed to allocate subscription");
goto fail;
return nullptr;
}
auto cleanup_subscription = rcpputils::make_scope_exit(
[rmw_subscription]() {
rmw_free(const_cast<char *>(rmw_subscription->topic_name));
rmw_subscription_free(rmw_subscription);
});

rmw_subscription->implementation_identifier = eprosima_fastrtps_identifier;
rmw_subscription->data = info;

rmw_subscription->topic_name = rcutils_strdup(topic_name, rcutils_get_default_allocator());
if (!rmw_subscription->topic_name) {
RMW_SET_ERROR_MSG("failed to allocate memory for subscription topic name");
goto fail;
return nullptr;
}

rmw_subscription->options = *subscription_options;
return rmw_subscription;
rmw_subscription->can_loan_messages = false;

fail:
if (info != nullptr) {
delete info->type_support_;
delete info->listener_;
delete info;
}
rmw_subscription_free(rmw_subscription);
return nullptr;
cleanup_subscription.cancel();
cleanup_info.cancel();
return rmw_subscription;
}
} // namespace rmw_fastrtps_cpp
55 changes: 39 additions & 16 deletions rmw_fastrtps_dynamic_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,12 @@ rmw_create_subscription(
const rmw_qos_profile_t * qos_policies,
const rmw_subscription_options_t * subscription_options)
{
if (!node) {
RMW_SET_ERROR_MSG("node handle is null");
return nullptr;
}

if (node->implementation_identifier != eprosima_fastrtps_identifier) {
RMW_SET_ERROR_MSG("node handle not from this implementation");
return nullptr;
}
RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
node,
node->implementation_identifier,
eprosima_fastrtps_identifier,
return nullptr);

auto participant_info =
static_cast<CustomParticipantInfo *>(node->context->impl->participant_info);
Expand Down Expand Up @@ -102,8 +99,18 @@ rmw_create_subscription(
static_cast<void *>(&msg),
nullptr);
if (RMW_RET_OK != rmw_ret) {
rmw_fastrtps_shared_cpp::__rmw_destroy_subscription(
rmw_error_state_t error_state = *rmw_get_error_state();
rmw_reset_error();
static_cast<void>(common_context->graph_cache.dissociate_writer(
info->subscription_gid_, common_context->gid, node->name, node->namespace_));
rmw_ret = rmw_fastrtps_shared_cpp::__rmw_destroy_subscription(
eprosima_fastrtps_identifier, node, subscription);
if (RMW_RET_OK != rmw_ret) {
RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "' cleanup\n");
rmw_reset_error();
}
rmw_set_error_state(error_state.message, error_state.file, error_state.line_number);
return nullptr;
}
}
Expand All @@ -124,21 +131,37 @@ rmw_subscription_get_actual_qos(
const rmw_subscription_t * subscription,
rmw_qos_profile_t * qos)
{
return rmw_fastrtps_shared_cpp::__rmw_subscription_get_actual_qos(
subscription, qos);
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
subscription,
subscription->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT);

return rmw_fastrtps_shared_cpp::__rmw_subscription_get_actual_qos(subscription, qos);
}

using BaseTypeSupport = rmw_fastrtps_dynamic_cpp::BaseTypeSupport;

rmw_ret_t
rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
{
auto info = static_cast<CustomSubscriberInfo *>(subscription->data);
RCUTILS_CHECK_FOR_NULL_WITH_MSG(info, "subscription info pointer is null", return RMW_RET_ERROR);
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(subscription, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
node,
node->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
subscription,
subscription->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

auto info = static_cast<CustomSubscriberInfo *>(subscription->data);
auto impl = static_cast<const BaseTypeSupport *>(info->type_support_impl_);
RCUTILS_CHECK_FOR_NULL_WITH_MSG(impl, "publisher type support is null", return RMW_RET_ERROR);

auto ros_type_support = static_cast<const rosidl_message_type_support_t *>(
impl->ros_type_support());

Expand Down
Loading

0 comments on commit 285b3d4

Please sign in to comment.