diff --git a/rmw_cyclonedds_cpp/src/rmw_node.cpp b/rmw_cyclonedds_cpp/src/rmw_node.cpp index acfc33d4..36cf47ea 100644 --- a/rmw_cyclonedds_cpp/src/rmw_node.cpp +++ b/rmw_cyclonedds_cpp/src/rmw_node.cpp @@ -511,11 +511,57 @@ static void dds_listener_callback(dds_entity_t entity, void * arg) MAKE_DDS_EVENT_CALLBACK_FN(requested_deadline_missed, REQUESTED_DEADLINE_MISSED) MAKE_DDS_EVENT_CALLBACK_FN(liveliness_lost, LIVELINESS_LOST) MAKE_DDS_EVENT_CALLBACK_FN(offered_deadline_missed, OFFERED_DEADLINE_MISSED) -MAKE_DDS_EVENT_CALLBACK_FN(requested_incompatible_qos, REQUESTED_INCOMPATIBLE_QOS) MAKE_DDS_EVENT_CALLBACK_FN(sample_lost, SAMPLE_LOST) -MAKE_DDS_EVENT_CALLBACK_FN(offered_incompatible_qos, OFFERED_INCOMPATIBLE_QOS) MAKE_DDS_EVENT_CALLBACK_FN(liveliness_changed, LIVELINESS_CHANGED) +/** + * Because the inconsistent topic is not raised by CycloneDDS when a reader/writer fail to match because of differing type definitions + * this callback is signalled via the incompatible qos handlers + */ +// MAKE_DDS_EVENT_CALLBACK_FN(inconsistent_topic, INCONSISTENT_TOPIC) + +static void on_requested_incompatible_qos_fn( + dds_entity_t entity, + const dds_requested_incompatible_qos_status_t status, + void * arg) +{ + (void)entity; + auto data = static_cast(arg); + std::lock_guard guard(data->mutex); + uint32_t type = DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS_ID; + if (DDS_TYPE_CONSISTENCY_ENFORCEMENT_QOS_POLICY_ID == status.last_policy_id) { + /* incompatible types */ + type = DDS_INCONSISTENT_TOPIC_STATUS_ID; + } + auto cb = data->event_callback[type]; + if (cb) { + cb(data->event_data[type], 1); + } else { + data->event_unread_count[type]++; + } +} + +static void on_offered_incompatible_qos_fn( + dds_entity_t entity, + const dds_offered_incompatible_qos_status_t status, + void * arg) +{ + (void)entity; + auto data = static_cast(arg); + std::lock_guard guard(data->mutex); + uint32_t type = DDS_OFFERED_INCOMPATIBLE_QOS_STATUS_ID; + if (DDS_TYPE_CONSISTENCY_ENFORCEMENT_QOS_POLICY_ID == status.last_policy_id) { + /* incompatible types */ + type = DDS_INCONSISTENT_TOPIC_STATUS_ID; + } + auto cb = data->event_callback[type]; + if (cb) { + cb(data->event_data[type], 1); + } else { + data->event_unread_count[type]++; + } +} + static void listener_set_event_callbacks(dds_listener_t * l, void * arg) { dds_lset_requested_deadline_missed_arg(l, on_requested_deadline_missed_fn, arg, false); @@ -716,6 +762,24 @@ extern "C" rmw_ret_t rmw_event_set_callback( break; } + case RMW_EVENT_PUBLISHER_INCOMPATIBLE_TYPE: + { + auto pub_event = static_cast(rmw_event->data); + event_set_callback( + pub_event, DDS_INCONSISTENT_TOPIC_STATUS_ID, + callback, user_data); + break; + } + + case RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE: + { + auto sub_event = static_cast(rmw_event->data); + event_set_callback( + sub_event, DDS_INCONSISTENT_TOPIC_STATUS_ID, + callback, user_data); + break; + } + case RMW_EVENT_INVALID: { return RMW_RET_INVALID_ARGUMENT; @@ -3627,6 +3691,8 @@ static const std::unordered_map mask_map{ {RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE, DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS}, {RMW_EVENT_OFFERED_QOS_INCOMPATIBLE, DDS_OFFERED_INCOMPATIBLE_QOS_STATUS}, {RMW_EVENT_MESSAGE_LOST, DDS_SAMPLE_LOST_STATUS}, + {RMW_EVENT_PUBLISHER_INCOMPATIBLE_TYPE, DDS_INCONSISTENT_TOPIC_STATUS}, + {RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE, DDS_INCONSISTENT_TOPIC_STATUS}, }; static bool is_event_supported(const rmw_event_type_t event_t) @@ -3799,6 +3865,40 @@ extern "C" rmw_ret_t rmw_take_event( } } + case RMW_EVENT_PUBLISHER_INCOMPATIBLE_TYPE: { + auto it = static_cast(event_info); + auto pub = static_cast(event_handle->data); + + const dds_entity_t topic = dds_get_topic(pub->enth); + dds_inconsistent_topic_status_t st; + if (dds_get_inconsistent_topic_status(topic, &st) < 0) { + *taken = false; + return RMW_RET_ERROR; + } else { + it->total_count = static_cast(st.total_count); + it->total_count_change = st.total_count_change; + *taken = true; + return RMW_RET_OK; + } + } + + case RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE: { + auto it = static_cast(event_info); + auto sub = static_cast(event_handle->data); + + const dds_entity_t topic = dds_get_topic(sub->enth); + dds_inconsistent_topic_status_t st; + if (dds_get_inconsistent_topic_status(topic, &st) < 0) { + *taken = false; + return RMW_RET_ERROR; + } else { + it->total_count = static_cast(st.total_count); + it->total_count_change = st.total_count_change; + *taken = true; + return RMW_RET_OK; + } + } + case RMW_EVENT_INVALID: { break; }