Skip to content
This repository has been archived by the owner on Jun 21, 2023. It is now read-only.

Commit

Permalink
Added read_condition for services and clients
Browse files Browse the repository at this point in the history
  • Loading branch information
esteve authored and Jackie Kay committed Oct 15, 2015
1 parent beaf78d commit 8e8f40c
Show file tree
Hide file tree
Showing 3 changed files with 155 additions and 58 deletions.
103 changes: 86 additions & 17 deletions rmw_connext_cpp/src/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,15 @@ struct ConnextStaticClientInfo
{
void * requester_;
DDSDataReader * response_datareader_;
DDSReadCondition * read_condition_;
const service_type_support_callbacks_t * callbacks_;
};

struct ConnextStaticServiceInfo
{
void * replier_;
DDSDataReader * request_datareader_;
DDSReadCondition * read_condition_;
const service_type_support_callbacks_t * callbacks_;
};

Expand Down Expand Up @@ -622,6 +624,7 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
return RMW_RET_ERROR;
}
// TODO(wjwwood): need to figure out when to unregister types with the participant.
auto result = RMW_RET_OK;
ConnextStaticSubscriberInfo * subscriber_info =
(ConnextStaticSubscriberInfo *)subscription->data;
if (subscriber_info) {
Expand All @@ -633,37 +636,37 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
if (read_condition) {
if (topic_reader->delete_readcondition(read_condition) != DDS_RETCODE_OK) {
RMW_SET_ERROR_MSG("failed to delete readcondition");
return RMW_RET_ERROR;
result = RMW_RET_ERROR;
}
subscriber_info->read_condition_ = nullptr;
}
if (dds_subscriber->delete_datareader(topic_reader) != DDS_RETCODE_OK) {
RMW_SET_ERROR_MSG("failed to delete datareader");
return RMW_RET_ERROR;
result = RMW_RET_ERROR;
}
subscriber_info->topic_reader_ = nullptr;
} else if (subscriber_info->read_condition_) {
RMW_SET_ERROR_MSG("cannot delete readcondition because the datareader is null");
return RMW_RET_ERROR;
result = RMW_RET_ERROR;
}
if (participant->delete_subscriber(dds_subscriber) != DDS_RETCODE_OK) {
RMW_SET_ERROR_MSG("failed to delete subscriber");
return RMW_RET_ERROR;
result = RMW_RET_ERROR;
}
subscriber_info->dds_subscriber_ = nullptr;
} else if (subscriber_info->topic_reader_) {
RMW_SET_ERROR_MSG("cannot delete datareader because the subscriber is null");
return RMW_RET_ERROR;
result = RMW_RET_ERROR;
}
RMW_TRY_DESTRUCTOR(
subscriber_info->~ConnextStaticSubscriberInfo(),
ConnextStaticSubscriberInfo, return RMW_RET_ERROR)
ConnextStaticSubscriberInfo, result = RMW_RET_ERROR)
rmw_free(subscriber_info);
subscription->data = nullptr;
}
rmw_subscription_free(subscription);

return RMW_RET_OK;
return result;
}

rmw_ret_t
Expand Down Expand Up @@ -822,6 +825,7 @@ rmw_create_client(
DDS_DataReaderQos datareader_qos;
DDS_DataWriterQos datawriter_qos;
DDSDataReader * response_datareader = nullptr;
DDSReadCondition * read_condition = nullptr;
void * requester = nullptr;
void * buf = nullptr;
ConnextStaticClientInfo * client_info = nullptr;
Expand Down Expand Up @@ -855,6 +859,13 @@ rmw_create_client(
goto fail;
}

read_condition = response_datareader->create_readcondition(
DDS_ANY_SAMPLE_STATE, DDS_ANY_VIEW_STATE, DDS_ANY_INSTANCE_STATE);
if (!read_condition) {
RMW_SET_ERROR_MSG("failed to create read condition");
goto fail;
}

buf = rmw_allocate(sizeof(ConnextStaticClientInfo));
if (!buf) {
RMW_SET_ERROR_MSG("failed to allocate memory");
Expand All @@ -866,6 +877,7 @@ rmw_create_client(
client_info->requester_ = requester;
client_info->callbacks_ = callbacks;
client_info->response_datareader_ = response_datareader;
client_info->read_condition_ = read_condition;

client->implementation_identifier = rti_connext_identifier;
client->data = client_info;
Expand Down Expand Up @@ -907,12 +919,32 @@ rmw_destroy_client(rmw_client_t * client)
return RMW_RET_ERROR)

auto result = RMW_RET_OK;
// TODO(esteve): de-allocate Requester and response DataReader
RMW_TRY_DESTRUCTOR(
static_cast<ConnextStaticClientInfo *>(client->data)->~ConnextStaticClientInfo(),
ConnextStaticClientInfo,
result = RMW_RET_ERROR)
ConnextStaticClientInfo * client_info = static_cast<ConnextStaticClientInfo *>(client->data);

if (client_info) {
auto response_datareader = client_info->response_datareader_;
if (response_datareader) {
auto read_condition = client_info->read_condition_;
if (read_condition) {
if (response_datareader->delete_readcondition(read_condition) != DDS_RETCODE_OK) {
RMW_SET_ERROR_MSG("failed to delete readcondition");
result = RMW_RET_ERROR;
}
client_info->read_condition_ = nullptr;
}
} else if (client_info->read_condition_) {
RMW_SET_ERROR_MSG("cannot delete readcondition because the datareader is null");
result = RMW_RET_ERROR;
}

RMW_TRY_DESTRUCTOR(
client_info->~ConnextStaticClientInfo(),
ConnextStaticClientInfo, result = RMW_RET_ERROR)
rmw_free(client_info);
client->data = nullptr;
}
rmw_client_free(client);

return result;
}

Expand Down Expand Up @@ -1002,6 +1034,7 @@ rmw_create_service(

// Past this point, a failure results in unrolling code in the goto fail block.
DDSDataReader * request_datareader = nullptr;
DDSReadCondition * read_condition = nullptr;
DDS_DataReaderQos datareader_qos;
DDS_DataWriterQos datawriter_qos;
void * replier = nullptr;
Expand Down Expand Up @@ -1037,6 +1070,13 @@ rmw_create_service(
goto fail;
}

read_condition = request_datareader->create_readcondition(
DDS_ANY_SAMPLE_STATE, DDS_ANY_VIEW_STATE, DDS_ANY_INSTANCE_STATE);
if (!read_condition) {
RMW_SET_ERROR_MSG("failed to create read condition");
goto fail;
}

buf = rmw_allocate(sizeof(ConnextStaticServiceInfo));
if (!buf) {
RMW_SET_ERROR_MSG("failed to allocate memory");
Expand All @@ -1048,6 +1088,7 @@ rmw_create_service(
service_info->replier_ = replier;
service_info->callbacks_ = callbacks;
service_info->request_datareader_ = request_datareader;
service_info->read_condition_ = read_condition;

service->implementation_identifier = rti_connext_identifier;
service->data = service_info;
Expand All @@ -1057,6 +1098,14 @@ rmw_create_service(
rmw_service_free(service);
}
if (request_datareader) {
if (read_condition) {
if (request_datareader->delete_readcondition(read_condition) != DDS_RETCODE_OK) {
std::stringstream ss;
ss << "leaking readcondition while handling failure at " <<
__FILE__ << ":" << __LINE__ << '\n';
(std::cerr << ss.str()).flush();
}
}
if (participant->delete_datareader(request_datareader) != RMW_RET_OK) {
std::stringstream ss;
ss << "leaking datareader while handling failure at " <<
Expand Down Expand Up @@ -1088,12 +1137,32 @@ rmw_destroy_service(rmw_service_t * service)
return RMW_RET_ERROR)

auto result = RMW_RET_OK;
// TODO(esteve): de-allocate Replier and request DataReader
RMW_TRY_DESTRUCTOR(
static_cast<ConnextStaticServiceInfo *>(service->data)->~ConnextStaticServiceInfo(),
ConnextStaticServiceInfo,
result = RMW_RET_ERROR)
ConnextStaticServiceInfo * service_info = static_cast<ConnextStaticServiceInfo *>(service->data);

if (service_info) {
auto request_datareader = service_info->request_datareader_;
if (request_datareader) {
auto read_condition = service_info->read_condition_;
if (read_condition) {
if (request_datareader->delete_readcondition(read_condition) != DDS_RETCODE_OK) {
RMW_SET_ERROR_MSG("failed to delete readcondition");
result = RMW_RET_ERROR;
}
service_info->read_condition_ = nullptr;
}
} else if (service_info->read_condition_) {
RMW_SET_ERROR_MSG("cannot delete readcondition because the datareader is null");
result = RMW_RET_ERROR;
}

RMW_TRY_DESTRUCTOR(
service_info->~ConnextStaticServiceInfo(),
ConnextStaticServiceInfo, result = RMW_RET_ERROR)
rmw_free(service_info);
service->data = nullptr;
}
rmw_service_free(service);

return result;
}

Expand Down
52 changes: 52 additions & 0 deletions rmw_connext_dynamic_cpp/src/functions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ struct ConnextDynamicServiceInfo
{
connext::Replier<DDS_DynamicData, DDS_DynamicData> * replier_;
DDSDataReader * request_datareader_;
DDSReadCondition * read_condition_;
DDS::DynamicDataTypeSupport * request_type_support_;
DDS::DynamicDataTypeSupport * response_type_support_;
DDS_TypeCode * response_type_code_;
Expand All @@ -118,6 +119,7 @@ struct ConnextDynamicClientInfo
{
connext::Requester<DDS_DynamicData, DDS_DynamicData> * requester_;
DDSDataReader * response_datareader_;
DDSReadCondition * read_condition_;
DDS::DynamicDataTypeSupport * request_type_support_;
DDS::DynamicDataTypeSupport * response_type_support_;
DDS_TypeCode * response_type_code_;
Expand Down Expand Up @@ -2125,6 +2127,7 @@ rmw_create_client(
DDS_DataWriterQos datawriter_qos;
connext::Requester<DDS_DynamicData, DDS_DynamicData> * requester = nullptr;
DDSDataReader * response_datareader = nullptr;
DDSReadCondition * read_condition = nullptr;
ConnextDynamicClientInfo * client_info = nullptr;
// Begin initializing elements
client = rmw_client_allocate();
Expand Down Expand Up @@ -2208,6 +2211,13 @@ rmw_create_client(
goto fail;
}

read_condition = response_datareader->create_readcondition(
DDS_ANY_SAMPLE_STATE, DDS_ANY_VIEW_STATE, DDS_ANY_INSTANCE_STATE);
if (!read_condition) {
RMW_SET_ERROR_MSG("failed to create read condition");
goto fail;
}

// Allocate memory for the ConnextDynamicClientInfo object.
buf = rmw_allocate(sizeof(ConnextDynamicClientInfo));
if (!buf) {
Expand All @@ -2219,6 +2229,7 @@ rmw_create_client(
buf = nullptr; // Only free the casted pointer; don't need the buf pointer anymore.
client_info->requester_ = requester;
client_info->response_datareader_ = response_datareader;
client_info->read_condition_ = read_condition;
client_info->request_type_support_ = request_type_support;
client_info->response_type_support_ = response_type_support;
client_info->response_type_code_ = response_type_code;
Expand All @@ -2230,6 +2241,13 @@ rmw_create_client(
client->data = client_info;
return client;
fail:
if (response_datareader) {
if (read_condition) {
if (response_datareader->delete_readcondition(read_condition) != DDS::RETCODE_OK) {
fprintf(stderr, "leaking readcondition while handling failure\n");
}
}
}
if (client) {
rmw_client_free(client);
}
Expand Down Expand Up @@ -2296,6 +2314,15 @@ rmw_destroy_client(rmw_client_t * client)
auto result = RMW_RET_OK;
ConnextDynamicClientInfo * client_info = static_cast<ConnextDynamicClientInfo *>(client->data);
if (client_info) {
auto response_datareader = client_info->response_datareader_;
if (response_datareader) {
auto read_condition = client_info->read_condition_;
if (response_datareader->delete_readcondition(read_condition) != DDS_RETCODE_OK) {
RMW_SET_ERROR_MSG("failed to delete readcondition");
return RMW_RET_ERROR;
}
client_info->read_condition_ = nullptr;
}
if (client_info->request_type_code_) {
if (destroy_type_code(client_info->request_type_code_) != RMW_RET_OK) {
RMW_SET_ERROR_MSG("failed to destroy type code");
Expand Down Expand Up @@ -2483,6 +2510,7 @@ rmw_create_service(
DDS_DataWriterQos datawriter_qos;
connext::Replier<DDS_DynamicData, DDS_DynamicData> * replier = nullptr;
DDSDataReader * request_datareader = nullptr;
DDSReadCondition * read_condition = nullptr;
ConnextDynamicServiceInfo * server_info = nullptr;
// Begin initializing elements
service = rmw_service_allocate();
Expand Down Expand Up @@ -2563,6 +2591,13 @@ rmw_create_service(
goto fail;
}

read_condition = request_datareader->create_readcondition(
DDS_ANY_SAMPLE_STATE, DDS_ANY_VIEW_STATE, DDS_ANY_INSTANCE_STATE);
if (!read_condition) {
RMW_SET_ERROR_MSG("failed to create read condition");
goto fail;
}

// Allocate memory for the ConnextDynamicServiceInfo object.
buf = rmw_allocate(sizeof(ConnextDynamicServiceInfo));
if (!buf) {
Expand All @@ -2574,6 +2609,7 @@ rmw_create_service(
buf = nullptr; // Only free the casted pointer; don't need the buf pointer anymore.
server_info->replier_ = replier;
server_info->request_datareader_ = request_datareader;
server_info->read_condition_ = read_condition;
server_info->response_type_support_ = response_type_support;
server_info->request_members_ = request_members;
server_info->response_members_ = response_members;
Expand All @@ -2582,6 +2618,13 @@ rmw_create_service(
service->data = server_info;
return service;
fail:
if (request_datareader) {
if (read_condition) {
if (request_datareader->delete_readcondition(read_condition) != DDS::RETCODE_OK) {
fprintf(stderr, "leaking readcondition while handling failure\n");
}
}
}
if (service) {
rmw_service_free(service);
}
Expand Down Expand Up @@ -2648,6 +2691,15 @@ rmw_destroy_service(rmw_service_t * service)
auto result = RMW_RET_OK;
auto service_info = static_cast<ConnextDynamicServiceInfo *>(service->data);
if (service_info) {
auto request_datareader = service_info->request_datareader_;
if (request_datareader) {
auto read_condition = service_info->read_condition_;
if (request_datareader->delete_readcondition(read_condition) != DDS_RETCODE_OK) {
RMW_SET_ERROR_MSG("failed to delete readcondition");
return RMW_RET_ERROR;
}
service_info->read_condition_ = nullptr;
}
if (service_info->request_type_code_) {
if (destroy_type_code(service_info->request_type_code_) != RMW_RET_OK) {
RMW_SET_ERROR_MSG("failed to destroy type code");
Expand Down
Loading

0 comments on commit 8e8f40c

Please sign in to comment.