Skip to content
This repository has been archived by the owner on Jun 21, 2023. It is now read-only.

Commit

Permalink
Added read_condition for services and clients
Browse files Browse the repository at this point in the history
  • Loading branch information
esteve committed Aug 18, 2015
1 parent b922e8c commit 6943f23
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 104 deletions.
161 changes: 103 additions & 58 deletions rmw_connext_cpp/src/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,15 @@ struct ConnextStaticClientInfo
{
void * requester_;
DDSDataReader * response_datareader_;
DDSReadCondition * read_condition_;
const service_type_support_callbacks_t * callbacks_;
};

struct ConnextStaticServiceInfo
{
void * replier_;
DDSDataReader * request_datareader_;
DDSReadCondition * read_condition_;
const service_type_support_callbacks_t * callbacks_;
};

Expand Down Expand Up @@ -1080,6 +1082,7 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
return RMW_RET_ERROR;
}
// TODO(wjwwood): need to figure out when to unregister types with the participant.
auto result = RMW_RET_OK;
ConnextStaticSubscriberInfo * subscriber_info =
(ConnextStaticSubscriberInfo *)subscription->data;
if (subscriber_info) {
Expand All @@ -1091,37 +1094,37 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
if (read_condition) {
if (topic_reader->delete_readcondition(read_condition) != DDS_RETCODE_OK) {
RMW_SET_ERROR_MSG("failed to delete readcondition");
return RMW_RET_ERROR;
result = RMW_RET_ERROR;
}
subscriber_info->read_condition_ = nullptr;
}
if (dds_subscriber->delete_datareader(topic_reader) != DDS_RETCODE_OK) {
RMW_SET_ERROR_MSG("failed to delete datareader");
return RMW_RET_ERROR;
result = RMW_RET_ERROR;
}
subscriber_info->topic_reader_ = nullptr;
} else if (subscriber_info->read_condition_) {
RMW_SET_ERROR_MSG("cannot delete readcondition because the datareader is null");
return RMW_RET_ERROR;
result = RMW_RET_ERROR;
}
if (participant->delete_subscriber(dds_subscriber) != DDS_RETCODE_OK) {
RMW_SET_ERROR_MSG("failed to delete subscriber");
return RMW_RET_ERROR;
result = RMW_RET_ERROR;
}
subscriber_info->dds_subscriber_ = nullptr;
} else if (subscriber_info->topic_reader_) {
RMW_SET_ERROR_MSG("cannot delete datareader because the subscriber is null");
return RMW_RET_ERROR;
result = RMW_RET_ERROR;
}
RMW_TRY_DESTRUCTOR(
subscriber_info->~ConnextStaticSubscriberInfo(),
ConnextStaticSubscriberInfo, return RMW_RET_ERROR)
ConnextStaticSubscriberInfo, result = RMW_RET_ERROR)
rmw_free(subscriber_info);
subscription->data = nullptr;
}
rmw_subscription_free(subscription);

return RMW_RET_OK;
return result;
}

rmw_ret_t
Expand Down Expand Up @@ -1297,23 +1300,13 @@ rmw_wait(rmw_subscriptions_t * subscriptions,
RMW_SET_ERROR_MSG("service info handle is null");
return RMW_RET_ERROR;
}
DDSDataReader * request_datareader = service_info->request_datareader_;
if (!request_datareader) {
RMW_SET_ERROR_MSG("request datareader handle is null");
return RMW_RET_ERROR;
}
DDSStatusCondition * condition = request_datareader->get_statuscondition();
if (!condition) {
RMW_SET_ERROR_MSG("condition handle is null");
return RMW_RET_ERROR;
}
DDS_ReturnCode_t status = condition->set_enabled_statuses(DDS_DATA_AVAILABLE_STATUS);
if (status != DDS_RETCODE_OK) {
RMW_SET_ERROR_MSG("failed to set enabled statuses");
DDSReadCondition * read_condition = service_info->read_condition_;
if (!read_condition) {
RMW_SET_ERROR_MSG("read condition handle is null");
return RMW_RET_ERROR;
}
rmw_ret_t rmw_status = check_attach_condition_error(
waitset.attach_condition(condition));
waitset.attach_condition(read_condition));
if (rmw_status != RMW_RET_OK) {
return rmw_status;
}
Expand All @@ -1332,18 +1325,14 @@ rmw_wait(rmw_subscriptions_t * subscriptions,
RMW_SET_ERROR_MSG("response datareader handle is null");
return RMW_RET_ERROR;
}
DDSStatusCondition * condition = response_datareader->get_statuscondition();
if (!condition) {
RMW_SET_ERROR_MSG("condition handle is null");
return RMW_RET_ERROR;
}
DDS_ReturnCode_t status = condition->set_enabled_statuses(DDS_DATA_AVAILABLE_STATUS);
if (status != DDS_RETCODE_OK) {
RMW_SET_ERROR_MSG("failed to set enabled statuses");

DDSReadCondition * read_condition = client_info->read_condition_;
if (!read_condition) {
RMW_SET_ERROR_MSG("read condition handle is null");
return RMW_RET_ERROR;
}
rmw_ret_t rmw_status = check_attach_condition_error(
waitset.attach_condition(condition));
waitset.attach_condition(read_condition));
if (rmw_status != RMW_RET_OK) {
return rmw_status;
}
Expand Down Expand Up @@ -1434,21 +1423,16 @@ rmw_wait(rmw_subscriptions_t * subscriptions,
RMW_SET_ERROR_MSG("service info handle is null");
return RMW_RET_ERROR;
}
DDSDataReader * request_datareader = service_info->request_datareader_;
if (!request_datareader) {
RMW_SET_ERROR_MSG("request datareader handle is null");
return RMW_RET_ERROR;
}
DDSStatusCondition * condition = request_datareader->get_statuscondition();
if (!condition) {
RMW_SET_ERROR_MSG("condition handle is null");
DDSReadCondition * read_condition = service_info->read_condition_;
if (!read_condition) {
RMW_SET_ERROR_MSG("read condition handle is null");
return RMW_RET_ERROR;
}

// search for service condition in active set
DDS_Long j = 0;
for (; j < active_conditions.length(); ++j) {
if (active_conditions[j] == condition) {
if (active_conditions[j] == read_condition) {
break;
}
}
Expand All @@ -1467,21 +1451,16 @@ rmw_wait(rmw_subscriptions_t * subscriptions,
RMW_SET_ERROR_MSG("client info handle is null");
return RMW_RET_ERROR;
}
DDSDataReader * response_datareader = client_info->response_datareader_;
if (!response_datareader) {
RMW_SET_ERROR_MSG("response datareader handle is null");
return RMW_RET_ERROR;
}
DDSStatusCondition * condition = response_datareader->get_statuscondition();
if (!condition) {
RMW_SET_ERROR_MSG("condition handle is null");
DDSReadCondition * read_condition = client_info->read_condition_;
if (!read_condition) {
RMW_SET_ERROR_MSG("read condition handle is null");
return RMW_RET_ERROR;
}

// search for service condition in active set
DDS_Long j = 0;
for (; j < active_conditions.length(); ++j) {
if (active_conditions[j] == condition) {
if (active_conditions[j] == read_condition) {
break;
}
}
Expand Down Expand Up @@ -1541,6 +1520,7 @@ rmw_create_client(
DDS_DataReaderQos datareader_qos;
DDS_DataWriterQos datawriter_qos;
DDSDataReader * response_datareader = nullptr;
DDSReadCondition * read_condition = nullptr;
void * requester = nullptr;
void * buf = nullptr;
ConnextStaticClientInfo * client_info = nullptr;
Expand Down Expand Up @@ -1572,6 +1552,13 @@ rmw_create_client(
goto fail;
}

read_condition = response_datareader->create_readcondition(
DDS_ANY_SAMPLE_STATE, DDS_ANY_VIEW_STATE, DDS_ANY_INSTANCE_STATE);
if (!read_condition) {
RMW_SET_ERROR_MSG("failed to create read condition");
goto fail;
}

buf = rmw_allocate(sizeof(ConnextStaticClientInfo));
if (!buf) {
RMW_SET_ERROR_MSG("failed to allocate memory");
Expand All @@ -1583,6 +1570,7 @@ rmw_create_client(
client_info->requester_ = requester;
client_info->callbacks_ = callbacks;
client_info->response_datareader_ = response_datareader;
client_info->read_condition_ = read_condition;

client->implementation_identifier = rti_connext_identifier;
client->data = client_info;
Expand Down Expand Up @@ -1624,12 +1612,32 @@ rmw_destroy_client(rmw_client_t * client)
return RMW_RET_ERROR)

auto result = RMW_RET_OK;
// TODO(esteve): de-allocate Requester and response DataReader
RMW_TRY_DESTRUCTOR(
static_cast<ConnextStaticClientInfo *>(client->data)->~ConnextStaticClientInfo(),
ConnextStaticClientInfo,
result = RMW_RET_ERROR)
ConnextStaticClientInfo * client_info = static_cast<ConnextStaticClientInfo *>(client->data);

if (client_info) {
auto response_datareader = client_info->response_datareader_;
if (response_datareader) {
auto read_condition = client_info->read_condition_;
if (read_condition) {
if (response_datareader->delete_readcondition(read_condition) != DDS_RETCODE_OK) {
RMW_SET_ERROR_MSG("failed to delete readcondition");
result = RMW_RET_ERROR;
}
client_info->read_condition_ = nullptr;
}
} else if (client_info->read_condition_) {
RMW_SET_ERROR_MSG("cannot delete readcondition because the datareader is null");
result = RMW_RET_ERROR;
}

RMW_TRY_DESTRUCTOR(
client_info->~ConnextStaticClientInfo(),
ConnextStaticClientInfo, result = RMW_RET_ERROR)
rmw_free(client_info);
client->data = nullptr;
}
rmw_client_free(client);

return result;
}

Expand Down Expand Up @@ -1717,6 +1725,7 @@ rmw_create_service(
}
// Past this point, a failure results in unrolling code in the goto fail block.
DDSDataReader * request_datareader = nullptr;
DDSReadCondition * read_condition = nullptr;
DDS_DataReaderQos datareader_qos;
DDS_DataWriterQos datawriter_qos;
void * replier = nullptr;
Expand Down Expand Up @@ -1751,6 +1760,13 @@ rmw_create_service(
goto fail;
}

read_condition = request_datareader->create_readcondition(
DDS_ANY_SAMPLE_STATE, DDS_ANY_VIEW_STATE, DDS_ANY_INSTANCE_STATE);
if (!read_condition) {
RMW_SET_ERROR_MSG("failed to create read condition");
goto fail;
}

buf = rmw_allocate(sizeof(ConnextStaticServiceInfo));
if (!buf) {
RMW_SET_ERROR_MSG("failed to allocate memory");
Expand All @@ -1762,6 +1778,7 @@ rmw_create_service(
service_info->replier_ = replier;
service_info->callbacks_ = callbacks;
service_info->request_datareader_ = request_datareader;
service_info->read_condition_ = read_condition;

service->implementation_identifier = rti_connext_identifier;
service->data = service_info;
Expand All @@ -1771,6 +1788,14 @@ rmw_create_service(
rmw_service_free(service);
}
if (request_datareader) {
if (read_condition) {
if (request_datareader->delete_readcondition(read_condition) != DDS_RETCODE_OK) {
std::stringstream ss;
ss << "leaking readcondition while handling failure at " <<
__FILE__ << ":" << __LINE__ << '\n';
(std::cerr << ss.str()).flush();
}
}
if (participant->delete_datareader(request_datareader) != RMW_RET_OK) {
std::stringstream ss;
ss << "leaking datareader while handling failure at " <<
Expand Down Expand Up @@ -1802,12 +1827,32 @@ rmw_destroy_service(rmw_service_t * service)
return RMW_RET_ERROR)

auto result = RMW_RET_OK;
// TODO(esteve): de-allocate Replier and request DataReader
RMW_TRY_DESTRUCTOR(
static_cast<ConnextStaticServiceInfo *>(service->data)->~ConnextStaticServiceInfo(),
ConnextStaticServiceInfo,
result = RMW_RET_ERROR)
ConnextStaticServiceInfo * service_info = static_cast<ConnextStaticServiceInfo *>(service->data);

if (service_info) {
auto request_datareader = service_info->request_datareader_;
if (request_datareader) {
auto read_condition = service_info->read_condition_;
if (read_condition) {
if (request_datareader->delete_readcondition(read_condition) != DDS_RETCODE_OK) {
RMW_SET_ERROR_MSG("failed to delete readcondition");
result = RMW_RET_ERROR;
}
service_info->read_condition_ = nullptr;
}
} else if (service_info->read_condition_) {
RMW_SET_ERROR_MSG("cannot delete readcondition because the datareader is null");
result = RMW_RET_ERROR;
}

RMW_TRY_DESTRUCTOR(
service_info->~ConnextStaticServiceInfo(),
ConnextStaticServiceInfo, result = RMW_RET_ERROR)
rmw_free(service_info);
service->data = nullptr;
}
rmw_service_free(service);

return result;
}

Expand Down
Loading

0 comments on commit 6943f23

Please sign in to comment.