Skip to content
This repository has been archived by the owner on Oct 7, 2021. It is now read-only.

support wait_for_service #137

Merged
merged 6 commits into from
May 28, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rmw_opensplice_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ add_library(rmw_opensplice_cpp SHARED
src/rmw_request.cpp
src/rmw_response.cpp
src/rmw_service.cpp
src/rmw_service_server_is_available.cpp
src/rmw_subscription.cpp
src/rmw_take.cpp
src/rmw_topic_names_and_types.cpp
Expand Down
46 changes: 44 additions & 2 deletions rmw_opensplice_cpp/src/rmw_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ rmw_create_node(const char * name, size_t domain_id)

rmw_node_t * node = nullptr;
OpenSpliceStaticNodeInfo * node_info = nullptr;
rmw_guard_condition_t * graph_guard_condition = nullptr;
CustomPublisherListener * publisher_listener = nullptr;
CustomSubscriberListener * subscriber_listener = nullptr;
void * buf = nullptr;
Expand All @@ -133,6 +134,12 @@ rmw_create_node(const char * name, size_t domain_id)
goto fail;
}

graph_guard_condition = rmw_create_guard_condition();
if (!graph_guard_condition) {
// error message already set
goto fail;
}

// setup publisher listener
data_reader = builtin_subscriber->lookup_datareader("DCPSPublication");
builtin_publication_datareader =
Expand All @@ -147,7 +154,8 @@ rmw_create_node(const char * name, size_t domain_id)
RMW_SET_ERROR_MSG("failed to allocate memory");
goto fail;
}
RMW_TRY_PLACEMENT_NEW(publisher_listener, buf, goto fail, CustomPublisherListener)
RMW_TRY_PLACEMENT_NEW(
publisher_listener, buf, goto fail, CustomPublisherListener, graph_guard_condition)
buf = nullptr;
builtin_publication_datareader->set_listener(publisher_listener, DDS::DATA_AVAILABLE_STATUS);

Expand All @@ -165,7 +173,8 @@ rmw_create_node(const char * name, size_t domain_id)
RMW_SET_ERROR_MSG("failed to allocate memory");
goto fail;
}
RMW_TRY_PLACEMENT_NEW(subscriber_listener, buf, goto fail, CustomSubscriberListener)
RMW_TRY_PLACEMENT_NEW(
subscriber_listener, buf, goto fail, CustomSubscriberListener, graph_guard_condition)
buf = nullptr;
builtin_subscription_datareader->set_listener(subscriber_listener, DDS::DATA_AVAILABLE_STATUS);

Expand All @@ -190,6 +199,7 @@ rmw_create_node(const char * name, size_t domain_id)
RMW_TRY_PLACEMENT_NEW(node_info, buf, goto fail, OpenSpliceStaticNodeInfo)
buf = nullptr;
node_info->participant = participant;
node_info->graph_guard_condition = graph_guard_condition;
node_info->publisher_listener = publisher_listener;
node_info->subscriber_listener = subscriber_listener;

Expand All @@ -216,6 +226,12 @@ rmw_create_node(const char * name, size_t domain_id)
subscriber_listener->~CustomSubscriberListener(), CustomSubscriberListener)
rmw_free(subscriber_listener);
}
if (graph_guard_condition) {
rmw_ret_t ret = rmw_destroy_guard_condition(graph_guard_condition);
if (ret != RMW_RET_OK) {
fprintf(stderr, "failed to destroy guard condition: %s\n", rmw_get_error_string_safe());
}
}
if (node_info) {
RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(
node_info->~OpenSpliceStaticNodeInfo(), OpenSpliceStaticNodeInfo)
Expand Down Expand Up @@ -287,11 +303,37 @@ rmw_destroy_node(rmw_node_t * node)
node_info->subscriber_listener = nullptr;
}

if (node_info->graph_guard_condition) {
rmw_ret_t ret = rmw_destroy_guard_condition(node_info->graph_guard_condition);
if (ret != RMW_RET_OK) {
fprintf(stderr, "failed to destroy guard condition: %s\n", rmw_get_error_string_safe());
}
}

rmw_free(node_info);
node->data = nullptr;
rmw_free(const_cast<char *>(node->name));
node->name = nullptr;
rmw_node_free(node);
return result;
}

const rmw_guard_condition_t *
rmw_node_get_graph_guard_condition(const rmw_node_t * node)
{
if (!node) {
RMW_SET_ERROR_MSG("received null pointer");
return NULL;
}
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
node handle,
node->implementation_identifier, opensplice_cpp_identifier,
return NULL)
auto node_info = static_cast<OpenSpliceStaticNodeInfo *>(node->data);
if (!node_info) {
RMW_SET_ERROR_MSG("node info handle is null");
return NULL;
}
return node_info->graph_guard_condition;
}
} // extern "C"
74 changes: 74 additions & 0 deletions rmw_opensplice_cpp/src/rmw_service_server_is_available.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2016 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <ccpp_dds_dcps.h>
#include <dds_dcps.h>

#include "rmw/allocators.h"
#include "rmw/error_handling.h"
#include "rmw/impl/cpp/macros.hpp"
#include "rmw/rmw.h"

#include "identifier.hpp"
#include "types.hpp"

// The extern "C" here enforces that overloading is not used.
extern "C"
{
rmw_ret_t
rmw_service_server_is_available(
const rmw_node_t * node,
const rmw_client_t * client,
bool * is_available)
{
if (!node) {
RMW_SET_ERROR_MSG("node handle is null");
return RMW_RET_ERROR;
}
if (!client) {
RMW_SET_ERROR_MSG("client handle is null");
return RMW_RET_ERROR;
}
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
client handle,
client->implementation_identifier, opensplice_cpp_identifier,
return RMW_RET_ERROR)
if (!is_available) {
RMW_SET_ERROR_MSG("is_available is null");
return RMW_RET_ERROR;
}

OpenSpliceStaticClientInfo * client_info =
static_cast<OpenSpliceStaticClientInfo *>(client->data);
if (!client_info) {
RMW_SET_ERROR_MSG("client info handle is null");
return RMW_RET_ERROR;
}

const service_type_support_callbacks_t * callbacks = client_info->callbacks_;
if (!callbacks) {
RMW_SET_ERROR_MSG("callbacks handle is null");
return RMW_RET_ERROR;
}

const char * err = callbacks->server_is_available(
client_info->requester_, node, is_available, &rmw_count_publishers, &rmw_count_subscribers);
if (err) {
RMW_SET_ERROR_MSG(err);
return RMW_RET_ERROR;
}

return RMW_RET_OK;
}
} // extern "C"
20 changes: 20 additions & 0 deletions rmw_opensplice_cpp/src/rmw_wait.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,26 @@ rmw_wait(
RMW_SET_ERROR_MSG("Failed to get detach condition from waitset");
}
}

DDS::ConditionSeq * active_conditions =
static_cast<DDS::ConditionSeq *>(waitset_info->active_conditions);
if (!active_conditions) {
RMW_SET_ERROR_MSG("DDS condition sequence handle is null");
return;
}

// Disassociate conditions left in active_conditions so that when the
// waitset (and therefore the active_conditions sequence) are destroyed
// they do not try to free the entities to which they point.
// These entities are already being deleted (are owned) by other entities
// like rmw_guard_conditions_t and rmw_node_t.
// Without this step, sporadic bad memory accesses could occur when the
// items added to the waitset were destroyed before the waitset.
// The items in active_conditions are not being leaked though because
// other entities own them and are responsible for removing them.
for (uint32_t i = 0; i < active_conditions->length(); ++i) {
active_conditions->get_buffer()[i] = nullptr;
}
}
rmw_waitset_t * waitset = NULL;
} atexit;
Expand Down
5 changes: 3 additions & 2 deletions rmw_opensplice_cpp/src/rmw_waitset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,14 +141,15 @@ rmw_destroy_waitset(rmw_waitset_t * waitset)

// Explicitly call destructor since the "placement new" was used

using DDS::ConditionSeq;
if (waitset_info->active_conditions) {
RMW_TRY_DESTRUCTOR(
waitset_info->active_conditions->~DDS_DCPSUObjSeq(), ConditionSeq, result = RMW_RET_ERROR)
waitset_info->active_conditions->~ConditionSeq(), ConditionSeq, result = RMW_RET_ERROR)
rmw_free(waitset_info->active_conditions);
}
if (waitset_info->attached_conditions) {
RMW_TRY_DESTRUCTOR(
waitset_info->attached_conditions->~DDS_DCPSUObjSeq(), ConditionSeq, result = RMW_RET_ERROR)
waitset_info->attached_conditions->~ConditionSeq(), ConditionSeq, result = RMW_RET_ERROR)
rmw_free(waitset_info->attached_conditions);
}
if (waitset_info->waitset) {
Expand Down
38 changes: 36 additions & 2 deletions rmw_opensplice_cpp/src/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include <string>

#include "rmw/error_handling.h"

std::string
create_type_name(
const message_type_support_callbacks_t * callbacks,
Expand Down Expand Up @@ -60,6 +62,11 @@ CustomDataReaderListener::remove_information(const DDS::SampleInfo & sample_info
}
}

CustomPublisherListener::CustomPublisherListener(rmw_guard_condition_t * graph_guard_condition)
: graph_guard_condition_(graph_guard_condition)
{
}

void
CustomPublisherListener::on_data_available(DDS::DataReader * reader)
{
Expand All @@ -82,15 +89,31 @@ CustomPublisherListener::on_data_available(DDS::DataReader * reader)

for (DDS::ULong i = 0; i < data_seq.length(); ++i) {
if (info_seq[i].valid_data) {
add_information(info_seq[i], data_seq[i].topic_name.in(), data_seq[i].type_name.in());
if (info_seq[i].instance_state == DDS::ALIVE_INSTANCE_STATE) {
add_information(info_seq[i], data_seq[i].topic_name.in(), data_seq[i].type_name.in());
} else {
remove_information(info_seq[i]);
}
} else {
remove_information(info_seq[i]);
}
}

if (data_seq.length() > 0) {
rmw_ret_t ret = rmw_trigger_guard_condition(graph_guard_condition_);
if (ret != RMW_RET_OK) {
fprintf(stderr, "failed to trigger graph guard condition: %s\n", rmw_get_error_string_safe());
}
}

builtin_reader->return_loan(data_seq, info_seq);
}

CustomSubscriberListener::CustomSubscriberListener(rmw_guard_condition_t * graph_guard_condition)
: graph_guard_condition_(graph_guard_condition)
{
}

void
CustomSubscriberListener::on_data_available(DDS::DataReader * reader)
{
Expand All @@ -113,11 +136,22 @@ CustomSubscriberListener::on_data_available(DDS::DataReader * reader)

for (DDS::ULong i = 0; i < data_seq.length(); ++i) {
if (info_seq[i].valid_data) {
add_information(info_seq[i], data_seq[i].topic_name.in(), data_seq[i].type_name.in());
if (info_seq[i].instance_state == DDS::ALIVE_INSTANCE_STATE) {
add_information(info_seq[i], data_seq[i].topic_name.in(), data_seq[i].type_name.in());
} else {
remove_information(info_seq[i]);
}
} else {
remove_information(info_seq[i]);
}
}

if (data_seq.length() > 0) {
rmw_ret_t ret = rmw_trigger_guard_condition(graph_guard_condition_);
if (ret != RMW_RET_OK) {
fprintf(stderr, "failed to trigger graph guard condition: %s\n", rmw_get_error_string_safe());
}
}

builtin_reader->return_loan(data_seq, info_seq);
}
9 changes: 9 additions & 0 deletions rmw_opensplice_cpp/src/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,28 @@ class CustomPublisherListener
: public CustomDataReaderListener
{
public:
explicit CustomPublisherListener(rmw_guard_condition_t * graph_guard_condition);
virtual void on_data_available(DDS::DataReader * reader);

private:
rmw_guard_condition_t * graph_guard_condition_;
};

class CustomSubscriberListener
: public CustomDataReaderListener
{
public:
explicit CustomSubscriberListener(rmw_guard_condition_t * graph_guard_condition);
virtual void on_data_available(DDS::DataReader * reader);

private:
rmw_guard_condition_t * graph_guard_condition_;
};

struct OpenSpliceStaticNodeInfo
{
DDS::DomainParticipant * participant;
rmw_guard_condition_t * graph_guard_condition;
CustomPublisherListener * publisher_listener;
CustomSubscriberListener * subscriber_listener;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,16 @@ destroy_responder__@(spec.srv_name)(void * untyped_responder, void (* deallocato
untyped_responder, deallocator);
}

const char *
server_is_available__@(spec.srv_name)(
void * requester, const rmw_node_t * node, bool * is_available,
rmw_ret_t (*count_publishers)(const rmw_node_t *, const char *, size_t *),
rmw_ret_t (*count_subscribers)(const rmw_node_t *, const char *, size_t *))
{
return @(spec.pkg_name)::srv::typesupport_opensplice_cpp::server_is_available__@(spec.srv_name)(
requester, node, is_available, count_publishers, count_subscribers);
}

static service_type_support_callbacks_t __callbacks = {
"@(spec.pkg_name)",
"@(spec.srv_name)",
Expand All @@ -146,6 +156,7 @@ static service_type_support_callbacks_t __callbacks = {
&take_request__@(spec.srv_name),
&send_response__@(spec.srv_name),
&take_response__@(spec.srv_name),
&server_is_available__@(spec.srv_name),
};

static rosidl_service_type_support_t __type_support = {
Expand Down
Loading