Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect incompatible types from incompatible QoS callbacks #436

Open
wants to merge 4 commits into
base: rolling
Choose a base branch
from
Open
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
104 changes: 102 additions & 2 deletions rmw_cyclonedds_cpp/src/rmw_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -511,11 +511,57 @@ static void dds_listener_callback(dds_entity_t entity, void * arg)
MAKE_DDS_EVENT_CALLBACK_FN(requested_deadline_missed, REQUESTED_DEADLINE_MISSED)
MAKE_DDS_EVENT_CALLBACK_FN(liveliness_lost, LIVELINESS_LOST)
MAKE_DDS_EVENT_CALLBACK_FN(offered_deadline_missed, OFFERED_DEADLINE_MISSED)
MAKE_DDS_EVENT_CALLBACK_FN(requested_incompatible_qos, REQUESTED_INCOMPATIBLE_QOS)
MAKE_DDS_EVENT_CALLBACK_FN(sample_lost, SAMPLE_LOST)
MAKE_DDS_EVENT_CALLBACK_FN(offered_incompatible_qos, OFFERED_INCOMPATIBLE_QOS)
MAKE_DDS_EVENT_CALLBACK_FN(liveliness_changed, LIVELINESS_CHANGED)

/**
* Because the inconsistent topic is not handled correctly in CycloneDDS,
allenh1 marked this conversation as resolved.
Show resolved Hide resolved
* this callback is signalled via the incompatible qos handlers
*/
// MAKE_DDS_EVENT_CALLBACK_FN(inconsistent_topic, INCONSISTENT_TOPIC)

static void on_requested_incompatible_qos_fn(
dds_entity_t entity,
const dds_requested_incompatible_qos_status_t status,
void * arg)
{
(void)entity;
auto data = static_cast<user_callback_data_t *>(arg);
std::lock_guard<std::mutex> guard(data->mutex);
uint32_t type = DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS_ID;
if (DDS_TYPE_CONSISTENCY_ENFORCEMENT_QOS_POLICY_ID == status.last_policy_id) {
/* incompatible types */
type = DDS_INCONSISTENT_TOPIC_STATUS_ID;
}
auto cb = data->event_callback[type];
if (cb) {
cb(data->event_data[type], 1);
} else {
data->event_unread_count[type]++;
}
}

static void on_offered_incompatible_qos_fn(
dds_entity_t entity,
const dds_offered_incompatible_qos_status_t status,
void * arg)
{
(void)entity;
auto data = static_cast<user_callback_data_t *>(arg);
std::lock_guard<std::mutex> guard(data->mutex);
uint32_t type = DDS_OFFERED_INCOMPATIBLE_QOS_STATUS_ID;
if (DDS_TYPE_CONSISTENCY_ENFORCEMENT_QOS_POLICY_ID == status.last_policy_id) {
/* incompatible types */
type = DDS_INCONSISTENT_TOPIC_STATUS_ID;
}
auto cb = data->event_callback[type];
if (cb) {
cb(data->event_data[type], 1);
} else {
data->event_unread_count[type]++;
}
}

static void listener_set_event_callbacks(dds_listener_t * l, void * arg)
{
dds_lset_requested_deadline_missed_arg(l, on_requested_deadline_missed_fn, arg, false);
Expand Down Expand Up @@ -716,6 +762,24 @@ extern "C" rmw_ret_t rmw_event_set_callback(
break;
}

case RMW_EVENT_PUBLISHER_INCOMPATIBLE_TYPE:
{
auto pub_event = static_cast<CddsPublisher *>(rmw_event->data);
event_set_callback(
pub_event, DDS_INCONSISTENT_TOPIC_STATUS_ID,
callback, user_data);
break;
}

case RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE:
{
auto sub_event = static_cast<CddsSubscription *>(rmw_event->data);
event_set_callback(
sub_event, DDS_INCONSISTENT_TOPIC_STATUS_ID,
callback, user_data);
break;
}

case RMW_EVENT_INVALID:
{
return RMW_RET_INVALID_ARGUMENT;
Expand Down Expand Up @@ -3627,6 +3691,8 @@ static const std::unordered_map<rmw_event_type_t, uint32_t> mask_map{
{RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE, DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS},
{RMW_EVENT_OFFERED_QOS_INCOMPATIBLE, DDS_OFFERED_INCOMPATIBLE_QOS_STATUS},
{RMW_EVENT_MESSAGE_LOST, DDS_SAMPLE_LOST_STATUS},
{RMW_EVENT_PUBLISHER_INCOMPATIBLE_TYPE, DDS_INCONSISTENT_TOPIC_STATUS},
{RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE, DDS_INCONSISTENT_TOPIC_STATUS},
};

static bool is_event_supported(const rmw_event_type_t event_t)
Expand Down Expand Up @@ -3799,6 +3865,40 @@ extern "C" rmw_ret_t rmw_take_event(
}
}

case RMW_EVENT_PUBLISHER_INCOMPATIBLE_TYPE: {
auto it = static_cast<rmw_incompatible_type_status_t *>(event_info);
auto pub = static_cast<CddsPublisher *>(event_handle->data);

const dds_entity_t topic = dds_get_topic(pub->enth);
dds_inconsistent_topic_status_t st;
if (dds_get_inconsistent_topic_status(topic, &st) < 0) {
*taken = false;
return RMW_RET_ERROR;
} else {
it->total_count = static_cast<int32_t>(st.total_count);
it->total_count_change = st.total_count_change;
*taken = true;
return RMW_RET_OK;
}
}

case RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE: {
auto it = static_cast<rmw_incompatible_type_status_t *>(event_info);
auto sub = static_cast<CddsSubscription *>(event_handle->data);

const dds_entity_t topic = dds_get_topic(sub->enth);
dds_inconsistent_topic_status_t st;
if (dds_get_inconsistent_topic_status(topic, &st) < 0) {
*taken = false;
return RMW_RET_ERROR;
} else {
it->total_count = static_cast<int32_t>(st.total_count);
it->total_count_change = st.total_count_change;
*taken = true;
return RMW_RET_OK;
}
}

case RMW_EVENT_INVALID: {
break;
}
Expand Down