Skip to content
This repository has been archived by the owner on Jun 21, 2023. It is now read-only.

detect and report local graph changes #194

Merged
merged 8 commits into from
Oct 29, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
261 changes: 244 additions & 17 deletions rmw_connext_cpp/src/functions.cpp

Large diffs are not rendered by default.

32 changes: 30 additions & 2 deletions rmw_connext_dynamic_cpp/src/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,10 @@ rmw_create_publisher(
}
memcpy(const_cast<char *>(publisher->topic_name), topic_name, strlen(topic_name) + 1);

node_info->publisher_listener->add_information(
dds_publisher->get_instance_handle(), topic_name, type_name, EntityType::Publisher);
node_info->publisher_listener->trigger_graph_guard_condition();

return publisher;
fail:
// Something went wrong, unwind anything that's already been done.
Expand Down Expand Up @@ -564,6 +568,9 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)

auto custom_publisher_info = static_cast<CustomPublisherInfo *>(publisher->data);
if (custom_publisher_info) {
node_info->publisher_listener->remove_information(
custom_publisher_info->dds_publisher_->get_instance_handle(), EntityType::Publisher);
node_info->publisher_listener->trigger_graph_guard_condition();
DDSDynamicDataTypeSupport * ddts = custom_publisher_info->dynamic_data_type_support_;
if (ddts) {
if (custom_publisher_info->dynamic_data) {
Expand Down Expand Up @@ -889,6 +896,11 @@ rmw_create_subscription(
goto fail;
}
memcpy(const_cast<char *>(subscription->topic_name), topic_name, strlen(topic_name) + 1);

node_info->subscriber_listener->add_information(
dds_subscriber->get_instance_handle(), topic_name, type_name, EntityType::Subscriber);
node_info->subscriber_listener->trigger_graph_guard_condition();

return subscription;
fail:
// Something has gone wrong, unroll what has been done.
Expand Down Expand Up @@ -1021,6 +1033,9 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)

auto custom_subscription_info = static_cast<CustomSubscriberInfo *>(subscription->data);
if (custom_subscription_info) {
node_info->subscriber_listener->remove_information(
custom_subscription_info->dds_subscriber_->get_instance_handle(), EntityType::Subscriber);
node_info->subscriber_listener->trigger_graph_guard_condition();
DDSDynamicDataTypeSupport * ddts = custom_subscription_info->dynamic_data_type_support_;
if (ddts) {
if (custom_subscription_info->dynamic_data) {
Expand Down Expand Up @@ -1540,8 +1555,9 @@ rmw_create_client(
}

rmw_ret_t
rmw_destroy_client(rmw_client_t * client)
rmw_destroy_client(rmw_node_t * node, rmw_client_t * client)
{
(void)node;
if (!client) {
RMW_SET_ERROR_MSG("client handle is null");
return RMW_RET_ERROR;
Expand Down Expand Up @@ -1923,8 +1939,9 @@ rmw_create_service(
}

rmw_ret_t
rmw_destroy_service(rmw_service_t * service)
rmw_destroy_service(rmw_node_t * node, rmw_service_t * service)
{
(void)node;
if (!service) {
RMW_SET_ERROR_MSG("service handle is null");
return RMW_RET_ERROR;
Expand Down Expand Up @@ -2376,6 +2393,12 @@ rmw_service_server_is_available(
// error string already set
return ret;
}
#ifdef DISCOVERY_DEBUG_LOGGING
printf("Checking for service server:\n");
printf(" - %s: %zu\n",
client_info->requester_->get_request_datawriter()->get_topic()->get_name(),
number_of_request_subscribers);
#endif
if (number_of_request_subscribers == 0) {
// not ready
return RMW_RET_OK;
Expand All @@ -2390,6 +2413,11 @@ rmw_service_server_is_available(
// error string already set
return ret;
}
#ifdef DISCOVERY_DEBUG_LOGGING
printf(" - %s: %zu\n",
client_info->response_datareader_->get_topicdescription()->get_name(),
number_of_response_publishers);
#endif
if (number_of_response_publishers == 0) {
// not ready
return RMW_RET_OK;
Expand Down
43 changes: 25 additions & 18 deletions rmw_connext_shared_cpp/include/rmw_connext_shared_cpp/types.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <limits>
#include <list>
#include <map>
#include <mutex>
#include <set>
#include <sstream>
#include <stdexcept>
Expand All @@ -43,62 +44,68 @@

#include "rmw/rmw.h"

enum EntityType {Publisher, Subscriber};

class CustomDataReaderListener
: public DDSDataReaderListener
{
public:
explicit
CustomDataReaderListener(
const char * implementation_identifier, rmw_guard_condition_t * graph_guard_condition)
: graph_guard_condition_(graph_guard_condition),
implementation_identifier_(implementation_identifier)
{}

std::map<std::string, std::multiset<std::string>> topic_names_and_types;

protected:
virtual void add_information(
const DDS_SampleInfo & sample_info,
const DDS_InstanceHandle_t & instance_handle,
const std::string & topic_name,
const std::string & type_name);
const std::string & type_name,
EntityType entity_type);

virtual void remove_information(const DDS_SampleInfo & sample_info);
virtual void remove_information(
const DDS_InstanceHandle_t & instance_handle,
EntityType entity_type);

virtual void trigger_graph_guard_condition();

private:
mutable std::mutex topic_descriptor_mutex_;
struct TopicDescriptor
{
DDS_InstanceHandle_t instance_handle;
std::string name;
std::string type;
};
std::list<TopicDescriptor> topic_descriptors;
rmw_guard_condition_t * graph_guard_condition_;
const char * implementation_identifier_;
};

class CustomPublisherListener
: public CustomDataReaderListener
{
public:
explicit CustomPublisherListener(
CustomPublisherListener(
const char * implementation_identifier, rmw_guard_condition_t * graph_guard_condition)
: implementation_identifier_(implementation_identifier),
graph_guard_condition_(graph_guard_condition)
: CustomDataReaderListener(implementation_identifier, graph_guard_condition)
{}

virtual void on_data_available(DDSDataReader * reader);

private:
const char * implementation_identifier_;
rmw_guard_condition_t * graph_guard_condition_;
};

class CustomSubscriberListener
: public CustomDataReaderListener
{
public:
explicit CustomSubscriberListener(
CustomSubscriberListener(
const char * implementation_identifier, rmw_guard_condition_t * graph_guard_condition)
: implementation_identifier_(implementation_identifier),
graph_guard_condition_(graph_guard_condition)
: CustomDataReaderListener(implementation_identifier, graph_guard_condition)
{}

virtual void on_data_available(DDSDataReader * reader);

private:
const char * implementation_identifier_;
rmw_guard_condition_t * graph_guard_condition_;
};

struct ConnextNodeInfo
Expand Down
69 changes: 52 additions & 17 deletions rmw_connext_shared_cpp/src/shared_functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <map>
#include <mutex>
#include <set>
#include <string>

Expand All @@ -21,29 +22,50 @@

#include "rmw_connext_shared_cpp/shared_functions.hpp"

// Uncomment this to get extra console output about discovery.
// #define DISCOVERY_DEBUG_LOGGING 1

void CustomDataReaderListener::add_information(
const DDS_SampleInfo & sample_info,
const DDS_InstanceHandle_t & instance_handle,
const std::string & topic_name,
const std::string & type_name)
const std::string & type_name,
EntityType entity_type)
{
(void)entity_type;
std::lock_guard<std::mutex> topic_descriptor_lock(topic_descriptor_mutex_);
// store topic name and type name
auto & topic_types = topic_names_and_types[topic_name];
topic_types.insert(type_name);
// store mapping to instance handle
TopicDescriptor topic_descriptor;
topic_descriptor.instance_handle = sample_info.instance_handle;
topic_descriptor.instance_handle = instance_handle;
topic_descriptor.name = topic_name;
topic_descriptor.type = type_name;
topic_descriptors.push_back(topic_descriptor);
#ifdef DISCOVERY_DEBUG_LOGGING
printf("+%s %s <%s>\n",
entity_type == EntityType::Publisher ? "P" : "S",
topic_name.c_str(),
type_name.c_str());
#endif
}

void CustomDataReaderListener::remove_information(
const DDS_SampleInfo & sample_info)
const DDS_InstanceHandle_t & instance_handle,
EntityType entity_type)
{
(void)entity_type;
std::lock_guard<std::mutex> topic_descriptor_lock(topic_descriptor_mutex_);
// find entry by instance handle
for (auto it = topic_descriptors.begin(); it != topic_descriptors.end(); ++it) {
if (DDS_InstanceHandle_equals(&it->instance_handle, &sample_info.instance_handle)) {
if (DDS_InstanceHandle_equals(&it->instance_handle, &instance_handle)) {
// remove entries
#ifdef DISCOVERY_DEBUG_LOGGING
printf("-%s %s <%s>\n",
entity_type == EntityType::Publisher ? "P" : "S",
it->name.c_str(),
it->type.c_str());
#endif
auto & topic_types = topic_names_and_types[it->name];
topic_types.erase(topic_types.find(it->type));
if (topic_types.empty()) {
Expand All @@ -55,6 +77,17 @@ void CustomDataReaderListener::remove_information(
}
}

void CustomDataReaderListener::trigger_graph_guard_condition()
{
#ifdef DISCOVERY_DEBUG_LOGGING
printf("graph guard condition triggered...\n");
#endif
rmw_ret_t ret = trigger_guard_condition(implementation_identifier_, graph_guard_condition_);
if (ret != RMW_RET_OK) {
fprintf(stderr, "failed to trigger graph guard condition: %s\n", rmw_get_error_string_safe());
}
}

void CustomPublisherListener::on_data_available(DDSDataReader * reader)
{
DDSPublicationBuiltinTopicDataDataReader * builtin_reader =
Expand All @@ -76,17 +109,18 @@ void CustomPublisherListener::on_data_available(DDSDataReader * reader)

for (auto i = 0; i < data_seq.length(); ++i) {
if (info_seq[i].valid_data) {
add_information(info_seq[i], data_seq[i].topic_name, data_seq[i].type_name);
add_information(
info_seq[i].instance_handle,
data_seq[i].topic_name,
data_seq[i].type_name,
EntityType::Publisher);
} else {
remove_information(info_seq[i]);
remove_information(info_seq[i].instance_handle, EntityType::Publisher);
}
}

if (data_seq.length() > 0) {
rmw_ret_t ret = trigger_guard_condition(implementation_identifier_, graph_guard_condition_);
if (ret != RMW_RET_OK) {
fprintf(stderr, "failed to trigger graph guard condition: %s\n", rmw_get_error_string_safe());
}
this->trigger_graph_guard_condition();
}

builtin_reader->return_loan(data_seq, info_seq);
Expand All @@ -113,17 +147,18 @@ void CustomSubscriberListener::on_data_available(DDSDataReader * reader)

for (auto i = 0; i < data_seq.length(); ++i) {
if (info_seq[i].valid_data) {
add_information(info_seq[i], data_seq[i].topic_name, data_seq[i].type_name);
add_information(
info_seq[i].instance_handle,
data_seq[i].topic_name,
data_seq[i].type_name,
EntityType::Subscriber);
} else {
remove_information(info_seq[i]);
remove_information(info_seq[i].instance_handle, EntityType::Subscriber);
}
}

if (data_seq.length() > 0) {
rmw_ret_t ret = trigger_guard_condition(implementation_identifier_, graph_guard_condition_);
if (ret != RMW_RET_OK) {
fprintf(stderr, "failed to trigger graph guard condition: %s\n", rmw_get_error_string_safe());
}
this->trigger_graph_guard_condition();
}

builtin_reader->return_loan(data_seq, info_seq);
Expand Down
32 changes: 28 additions & 4 deletions rosidl_typesupport_connext_c/resource/srv__type_support_c.cpp.em
Original file line number Diff line number Diff line change
Expand Up @@ -253,13 +253,34 @@ bool send_response__@(spec.srv_name)(
return true;
}

const char *
get_request_topic_name__@(spec.srv_name)(void * untyped_requester)
void *
get_request_datawriter__@(spec.srv_name)(void * untyped_requester)
{
return @(spec.pkg_name)::srv::typesupport_connext_cpp::get_request_topic_name__@(spec.srv_name)(
return @(spec.pkg_name)::srv::typesupport_connext_cpp::get_request_datawriter__@(spec.srv_name)(
untyped_requester);
}

void *
get_reply_datareader__@(spec.srv_name)(void * untyped_requester)
{
return @(spec.pkg_name)::srv::typesupport_connext_cpp::get_reply_datareader__@(spec.srv_name)(
untyped_requester);
}

void *
get_request_datareader__@(spec.srv_name)(void * untyped_replier)
{
return @(spec.pkg_name)::srv::typesupport_connext_cpp::get_request_datareader__@(spec.srv_name)(
untyped_replier);
}

void *
get_reply_datawriter__@(spec.srv_name)(void * untyped_replier)
{
return @(spec.pkg_name)::srv::typesupport_connext_cpp::get_reply_datawriter__@(spec.srv_name)(
untyped_replier);
}

static service_type_support_callbacks_t __callbacks = {
"@(spec.pkg_name)",
"@(spec.srv_name)",
Expand All @@ -271,7 +292,10 @@ static service_type_support_callbacks_t __callbacks = {
&take_request__@(spec.srv_name),
&send_response__@(spec.srv_name),
&take_response__@(spec.srv_name),
&get_request_topic_name__@(spec.srv_name),
&get_request_datawriter__@(spec.srv_name),
&get_reply_datareader__@(spec.srv_name),
&get_request_datareader__@(spec.srv_name),
&get_reply_datawriter__@(spec.srv_name),
};

static rosidl_service_type_support_t __type_support = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,18 @@ typedef struct service_type_support_callbacks_t
const void * ros_response);
// Function to read a ROS response from the wire
bool (* take_response)(void * requester, rmw_request_id_t * request_header, void * ros_response);
// Function to get the topic name from an untyped requester
const char *
(*get_request_topic_name)(void * untyped_requester);
// Function to get the type erased dds request datawriter for the requester
void *
(*get_request_datawriter)(void * untyped_requester);
// Function to get the type erased dds reply datawriter for the requester
void *
(*get_reply_datareader)(void * untyped_requester);
// Function to get the type erased dds request datawriter for the replier
void *
(*get_request_datareader)(void * untyped_replier);
// Function to get the type erased dds reply datawriter for the replier
void *
(*get_reply_datawriter)(void * untyped_replier);
} service_type_support_callbacks_t;

#endif // ROSIDL_TYPESUPPORT_CONNEXT_CPP__SERVICE_TYPE_SUPPORT_H_
Loading