Skip to content

Commit

Permalink
Ensure compliant publisher API. (#414)
Browse files Browse the repository at this point in the history
Signed-off-by: Michel Hidalgo <michel@ekumenlabs.com>
  • Loading branch information
hidmic committed Jul 28, 2020
1 parent ece181f commit 8fc9ca3
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 157 deletions.
88 changes: 51 additions & 37 deletions rmw_fastrtps_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include "rmw/allocators.h"
#include "rmw/error_handling.h"
#include "rmw/rmw.h"
#include "rmw/validate_full_topic_name.h"

#include "rcpputils/scope_exit.hpp"

#include "rmw_fastrtps_shared_cpp/create_rmw_gid.hpp"
#include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp"
Expand Down Expand Up @@ -49,22 +52,30 @@ rmw_fastrtps_cpp::create_publisher(
bool keyed,
bool create_publisher_listener)
{
if (!participant_info) {
RMW_SET_ERROR_MSG("participant_info is null");
return nullptr;
}
if (!topic_name || strlen(topic_name) == 0) {
RMW_SET_ERROR_MSG("publisher topic is null or empty string");
RMW_CHECK_ARGUMENT_FOR_NULL(participant_info, nullptr);
RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr);
RMW_CHECK_ARGUMENT_FOR_NULL(topic_name, nullptr);
if (0 == strlen(topic_name)) {
RMW_SET_ERROR_MSG("topic_name argument is an empty string");
return nullptr;
}
if (!qos_policies) {
RMW_SET_ERROR_MSG("qos_policies is null");
return nullptr;
}
if (!publisher_options) {
RMW_SET_ERROR_MSG("publisher_options is null");
return nullptr;
RMW_CHECK_ARGUMENT_FOR_NULL(qos_policies, nullptr);
if (!qos_policies->avoid_ros_namespace_conventions) {
int validation_result = RMW_TOPIC_VALID;
rmw_ret_t ret = rmw_validate_full_topic_name(topic_name, &validation_result, nullptr);
if (RMW_RET_OK != ret) {
return nullptr;
}
if (RMW_TOPIC_VALID != validation_result) {
const char * reason = rmw_full_topic_name_validation_result_string(validation_result);
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("invalid topic name: %s", reason);
return nullptr;
}
}
RMW_CHECK_ARGUMENT_FOR_NULL(publisher_options, nullptr);

Participant * participant = participant_info->participant;
RMW_CHECK_ARGUMENT_FOR_NULL(participant, nullptr);

const rosidl_message_type_support_t * type_support = get_message_typesupport_handle(
type_supports, RMW_FASTRTPS_CPP_TYPESUPPORT_C);
Expand Down Expand Up @@ -93,23 +104,30 @@ rmw_fastrtps_cpp::create_publisher(
RMW_SET_ERROR_MSG("failed to allocate CustomPublisherInfo");
return nullptr;
}

auto cleanup_info = rcpputils::make_scope_exit(
[info, participant]() {
if (info->type_support_) {
_unregister_type(participant, info->type_support_);
}
delete info->listener_;
delete info;
});
info->typesupport_identifier_ = type_support->typesupport_identifier;
info->type_support_impl_ = type_support->data;

auto callbacks = static_cast<const message_type_support_callbacks_t *>(type_support->data);
std::string type_name = _create_type_name(callbacks);
if (
!Domain::getRegisteredType(
participant_info->participant, type_name.c_str(),
participant, type_name.c_str(),
reinterpret_cast<TopicDataType **>(&info->type_support_)))
{
info->type_support_ = new (std::nothrow) MessageTypeSupport_cpp(callbacks);
if (!info->type_support_) {
RMW_SET_ERROR_MSG("Failed to allocate MessageTypeSupport");
goto fail;
RMW_SET_ERROR_MSG("failed to allocate MessageTypeSupport");
return nullptr;
}
_register_type(participant_info->participant, info->type_support_);
_register_type(participant, info->type_support_);
}

if (!participant_info->leave_middleware_default_qos) {
Expand All @@ -124,26 +142,25 @@ rmw_fastrtps_cpp::create_publisher(
publisherParam.topic.topicName = _create_topic_name(qos_policies, ros_topic_prefix, topic_name);

if (!get_datawriter_qos(*qos_policies, publisherParam)) {
RMW_SET_ERROR_MSG("failed to get datawriter qos");
goto fail;
return nullptr;
}

info->listener_ = nullptr;
if (create_publisher_listener) {
info->listener_ = new (std::nothrow) PubListener(info);
if (!info->listener_) {
RMW_SET_ERROR_MSG("create_publisher() could not create publisher listener");
goto fail;
return nullptr;
}
}

info->publisher_ = Domain::createPublisher(
participant_info->participant,
participant,
publisherParam,
info->listener_);
if (!info->publisher_) {
RMW_SET_ERROR_MSG("create_publisher() could not create publisher");
goto fail;
return nullptr;
}

info->publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
Expand All @@ -152,30 +169,27 @@ rmw_fastrtps_cpp::create_publisher(
rmw_publisher = rmw_publisher_allocate();
if (!rmw_publisher) {
RMW_SET_ERROR_MSG("failed to allocate publisher");
goto fail;
return nullptr;
}
auto cleanup_publisher = rcpputils::make_scope_exit(
[rmw_publisher]() {
rmw_free(const_cast<char *>(rmw_publisher->topic_name));
rmw_publisher_free(rmw_publisher);
});

rmw_publisher->implementation_identifier = eprosima_fastrtps_identifier;
rmw_publisher->data = info;
rmw_publisher->topic_name = static_cast<char *>(rmw_allocate(strlen(topic_name) + 1));

rmw_publisher->topic_name = static_cast<char *>(rmw_allocate(strlen(topic_name) + 1));
if (!rmw_publisher->topic_name) {
RMW_SET_ERROR_MSG("failed to allocate memory for publisher topic name");
goto fail;
return nullptr;
}

memcpy(const_cast<char *>(rmw_publisher->topic_name), topic_name, strlen(topic_name) + 1);

rmw_publisher->options = *publisher_options;

cleanup_publisher.cancel();
cleanup_info.cancel();
return rmw_publisher;

fail:
if (info) {
delete info->type_support_;
delete info->listener_;
delete info;
}
rmw_publisher_free(rmw_publisher);

return nullptr;
}
42 changes: 32 additions & 10 deletions rmw_fastrtps_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include "rmw/error_handling.h"
#include "rmw/rmw.h"

#include "rmw/impl/cpp/macros.hpp"

#include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp"
#include "rmw_fastrtps_shared_cpp/custom_publisher_info.hpp"
#include "rmw_fastrtps_shared_cpp/rmw_common.hpp"
Expand Down Expand Up @@ -63,15 +65,12 @@ rmw_create_publisher(
const rmw_qos_profile_t * qos_policies,
const rmw_publisher_options_t * publisher_options)
{
if (!node) {
RMW_SET_ERROR_MSG("node handle is null");
return nullptr;
}

if (node->implementation_identifier != eprosima_fastrtps_identifier) {
RMW_SET_ERROR_MSG("node handle not from this implementation");
return nullptr;
}
RMW_CHECK_ARGUMENT_FOR_NULL(node, nullptr);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
node,
node->implementation_identifier,
eprosima_fastrtps_identifier,
return nullptr);

rmw_publisher_t * publisher = rmw_fastrtps_cpp::create_publisher(
static_cast<CustomParticipantInfo *>(node->context->impl->participant_info),
Expand Down Expand Up @@ -101,8 +100,18 @@ rmw_create_publisher(
static_cast<void *>(&msg),
nullptr);
if (RMW_RET_OK != rmw_ret) {
rmw_fastrtps_shared_cpp::__rmw_destroy_publisher(
rmw_error_state_t error_state = *rmw_get_error_state();
rmw_reset_error();
static_cast<void>(common_context->graph_cache.dissociate_writer(
info->publisher_gid, common_context->gid, node->name, node->namespace_));
rmw_ret = rmw_fastrtps_shared_cpp::__rmw_destroy_publisher(
eprosima_fastrtps_identifier, node, publisher);
if (RMW_RET_OK != rmw_ret) {
RMW_SAFE_FWRITE_TO_STDERR(rmw_get_error_string().str);
RMW_SAFE_FWRITE_TO_STDERR(" during '" RCUTILS_STRINGIFY(__function__) "' cleanup\n");
rmw_reset_error();
}
rmw_set_error_state(error_state.message, error_state.file, error_state.line_number);
return nullptr;
}
}
Expand Down Expand Up @@ -164,6 +173,19 @@ rmw_return_loaned_message_from_publisher(
rmw_ret_t
rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)
{
RMW_CHECK_ARGUMENT_FOR_NULL(node, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_ARGUMENT_FOR_NULL(publisher, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
node,
node->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
publisher,
publisher->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);

return rmw_fastrtps_shared_cpp::__rmw_destroy_publisher(
eprosima_fastrtps_identifier, node, publisher);
}
Expand Down
91 changes: 50 additions & 41 deletions rmw_fastrtps_dynamic_cpp/src/publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#include "rmw/allocators.h"
#include "rmw/error_handling.h"
#include "rmw/rmw.h"
#include "rmw/validate_full_topic_name.h"

#include "rcpputils/scope_exit.hpp"

#include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp"
#include "rmw_fastrtps_shared_cpp/custom_publisher_info.hpp"
Expand Down Expand Up @@ -51,31 +54,30 @@ rmw_fastrtps_dynamic_cpp::create_publisher(
(void)keyed;
(void)create_publisher_listener;

if (!participant_info) {
RMW_SET_ERROR_MSG("participant_info is null");
return nullptr;
}

if (!topic_name || strlen(topic_name) == 0) {
RMW_SET_ERROR_MSG("publisher topic is null or empty string");
return nullptr;
}

if (!qos_policies) {
RMW_SET_ERROR_MSG("qos_policies is null");
RMW_CHECK_ARGUMENT_FOR_NULL(participant_info, nullptr);
RMW_CHECK_ARGUMENT_FOR_NULL(type_supports, nullptr);
RMW_CHECK_ARGUMENT_FOR_NULL(topic_name, nullptr);
if (0 == strlen(topic_name)) {
RMW_SET_ERROR_MSG("topic_name argument is an empty string");
return nullptr;
}

if (!publisher_options) {
RMW_SET_ERROR_MSG("publisher_options is null");
return nullptr;
RMW_CHECK_ARGUMENT_FOR_NULL(qos_policies, nullptr);
if (!qos_policies->avoid_ros_namespace_conventions) {
int validation_result = RMW_TOPIC_VALID;
rmw_ret_t ret = rmw_validate_full_topic_name(topic_name, &validation_result, nullptr);
if (RMW_RET_OK != ret) {
return nullptr;
}
if (RMW_TOPIC_VALID != validation_result) {
const char * reason = rmw_full_topic_name_validation_result_string(validation_result);
RMW_SET_ERROR_MSG_WITH_FORMAT_STRING("invalid topic name: %s", reason);
return nullptr;
}
}
RMW_CHECK_ARGUMENT_FOR_NULL(publisher_options, nullptr);

Participant * participant = participant_info->participant;
if (!participant) {
RMW_SET_ERROR_MSG("participant handle is null");
return nullptr;
}
RMW_CHECK_ARGUMENT_FOR_NULL(participant, nullptr);

const rosidl_message_type_support_t * type_support = get_message_typesupport_handle(
type_supports, rosidl_typesupport_introspection_c__identifier);
Expand Down Expand Up @@ -104,14 +106,25 @@ rmw_fastrtps_dynamic_cpp::create_publisher(
RMW_SET_ERROR_MSG("failed to allocate CustomPublisherInfo");
return nullptr;
}
auto cleanup_info = rcpputils::make_scope_exit(
[info, participant]() {
if (info->type_support_) {
_unregister_type(participant, info->type_support_);
}
delete info->listener_;
delete info;
});

TypeSupportRegistry & type_registry = TypeSupportRegistry::get_instance();
auto type_impl = type_registry.get_message_type_support(type_support);
if (!type_impl) {
delete info;
RMW_SET_ERROR_MSG("failed to allocate type support");
return nullptr;
}
auto return_type_support = rcpputils::make_scope_exit(
[&type_registry, type_support]() {
type_registry.return_message_type_support(type_support);
});

info->typesupport_identifier_ = type_support->typesupport_identifier;
info->type_support_impl_ = type_impl;
Expand All @@ -126,7 +139,7 @@ rmw_fastrtps_dynamic_cpp::create_publisher(
info->type_support_ = new (std::nothrow) TypeSupportProxy(type_impl);
if (!info->type_support_) {
RMW_SET_ERROR_MSG("failed to allocate TypeSupportProxy");
goto fail;
return nullptr;
}
_register_type(participant, info->type_support_);
}
Expand All @@ -150,20 +163,19 @@ rmw_fastrtps_dynamic_cpp::create_publisher(
// publisherParam.throughputController = throughputController;

if (!get_datawriter_qos(*qos_policies, publisherParam)) {
RMW_SET_ERROR_MSG("failed to get datawriter qos");
goto fail;
return nullptr;
}

info->listener_ = new (std::nothrow) PubListener(info);
if (!info->listener_) {
RMW_SET_ERROR_MSG("create_publisher() could not create publisher listener");
goto fail;
return nullptr;
}

info->publisher_ = Domain::createPublisher(participant, publisherParam, info->listener_);
if (!info->publisher_) {
RMW_SET_ERROR_MSG("create_publisher() could not create publisher");
goto fail;
return nullptr;
}

info->publisher_gid = rmw_fastrtps_shared_cpp::create_rmw_gid(
Expand All @@ -172,32 +184,29 @@ rmw_fastrtps_dynamic_cpp::create_publisher(
rmw_publisher = rmw_publisher_allocate();
if (!rmw_publisher) {
RMW_SET_ERROR_MSG("failed to allocate publisher");
goto fail;
return nullptr;
}
auto cleanup_publisher = rcpputils::make_scope_exit(
[rmw_publisher]() {
rmw_free(const_cast<char *>(rmw_publisher->topic_name));
rmw_publisher_free(rmw_publisher);
});

rmw_publisher->can_loan_messages = false;
rmw_publisher->implementation_identifier = eprosima_fastrtps_identifier;
rmw_publisher->data = info;
rmw_publisher->topic_name = static_cast<char *>(rmw_allocate(strlen(topic_name) + 1));

rmw_publisher->topic_name = static_cast<char *>(rmw_allocate(strlen(topic_name) + 1));
if (!rmw_publisher->topic_name) {
RMW_SET_ERROR_MSG("failed to allocate memory for publisher topic name");
goto fail;
return nullptr;
}

memcpy(const_cast<char *>(rmw_publisher->topic_name), topic_name, strlen(topic_name) + 1);

rmw_publisher->options = *publisher_options;

cleanup_publisher.cancel();
cleanup_info.cancel();
return_type_support.cancel();
return rmw_publisher;

fail:
if (info) {
delete info->type_support_;
delete info->listener_;
delete info;
}
type_registry.return_message_type_support(type_support);
rmw_publisher_free(rmw_publisher);

return nullptr;
}
Loading

0 comments on commit 8fc9ca3

Please sign in to comment.