Skip to content

Commit

Permalink
fix issues uncovered by unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Miaofei <miaofei@amazon.com>
  • Loading branch information
mm318 committed May 10, 2019
1 parent 9c3a8c7 commit 8044d51
Show file tree
Hide file tree
Showing 7 changed files with 63 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ class EventListenerInterface
class ConditionalScopedLock
{
public:
ConditionalScopedLock(std::mutex * mutex, std::condition_variable * condition_variable)
: mutex_(mutex), cv_(condition_variable)
ConditionalScopedLock(
std::mutex * mutex,
std::condition_variable * condition_variable = nullptr)
: mutex_(mutex), cv_(condition_variable)
{
if (nullptr != mutex_) {
mutex_->lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_
#define RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_

#include <atomic>
#include <mutex>
#include <condition_variable>
#include <set>
Expand Down Expand Up @@ -48,6 +49,10 @@ class PubListener : public EventListenerInterface, public eprosima::fastrtps::Pu
{
public:
explicit PubListener(CustomPublisherInfo * info)
: deadline_changes_(false),
liveliness_changes_(false),
conditionMutex_(nullptr),
conditionVariable_(nullptr)
{
(void) info;
}
Expand Down Expand Up @@ -77,10 +82,10 @@ class PubListener : public EventListenerInterface, public eprosima::fastrtps::Pu

// EventListenerInterface implementation
bool
hasEvent(rmw_event_type_t /* event_type */) const final;
hasEvent(rmw_event_type_t event_type) const final;

bool
takeNextEvent(rmw_event_type_t /* event_type */, void * /* event_info */) final;
takeNextEvent(rmw_event_type_t event_type, void * event_info) final;

// PubListener API
size_t subscriptionCount()
Expand All @@ -106,13 +111,19 @@ class PubListener : public EventListenerInterface, public eprosima::fastrtps::Pu
}

private:
std::mutex internalMutex_;
mutable std::mutex internalMutex_;

std::set<eprosima::fastrtps::rtps::GUID_t> subscriptions_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::atomic_bool deadline_changes_;
eprosima::fastrtps::OfferedDeadlineMissedStatus offered_deadline_missed_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::atomic_bool liveliness_changes_;
eprosima::fastrtps::LivelinessLostStatus liveliness_lost_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su
public:
explicit SubListener(CustomSubscriberInfo * info)
: data_(0),
conditionMutex_(nullptr), conditionVariable_(nullptr)
deadline_changes_(false),
liveliness_changes_(false),
conditionMutex_(nullptr),
conditionVariable_(nullptr)
{
// Field is not used right now
(void)info;
Expand All @@ -60,10 +63,8 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su
// SubscriberListener implementation
void
onSubscriptionMatched(
eprosima::fastrtps::Subscriber * sub, eprosima::fastrtps::rtps::MatchingInfo & info) final
eprosima::fastrtps::Subscriber * /*sub*/, eprosima::fastrtps::rtps::MatchingInfo & info) final
{
(void)sub;

std::lock_guard<std::mutex> lock(internalMutex_);
if (eprosima::fastrtps::rtps::MATCHED_MATCHING == info.status) {
publishers_.insert(info.remoteEndpointGuid);
Expand All @@ -75,19 +76,13 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su
void
onNewDataMessage(eprosima::fastrtps::Subscriber * sub) final
{
(void)sub;
std::lock_guard<std::mutex> lock(internalMutex_);

if (conditionMutex_ != nullptr) {
std::unique_lock<std::mutex> clock(*conditionMutex_);
// the change to data_ needs to be mutually exclusive with rmw_wait()
// which checks hasData() and decides if wait() needs to be called
data_ = sub->getUnreadCount();
clock.unlock();
conditionVariable_->notify_one();
} else {
data_ = sub->getUnreadCount();
}
// the change to liveliness_lost_count_ needs to be mutually exclusive with
// rmw_wait() which checks hasEvent() and decides if wait() needs to be called
ConditionalScopedLock clock(conditionMutex_, conditionVariable_);

data_.store(sub->getUnreadCount(), std::memory_order_relaxed);
}

void on_requested_deadline_missed(
Expand Down Expand Up @@ -125,20 +120,15 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su
bool
hasData() const
{
return data_ > 0;
return data_.load(std::memory_order_relaxed) > 0;
}

void
data_taken(eprosima::fastrtps::Subscriber * sub)
{
std::lock_guard<std::mutex> lock(internalMutex_);

if (conditionMutex_ != nullptr) {
std::unique_lock<std::mutex> clock(*conditionMutex_);
data_ = sub->getUnreadCount();
} else {
data_ = sub->getUnreadCount();
}
ConditionalScopedLock clock(conditionMutex_);
data_.store(sub->getUnreadCount(), std::memory_order_relaxed);
}

size_t publisherCount()
Expand All @@ -148,13 +138,18 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su
}

private:
std::mutex internalMutex_;
mutable std::mutex internalMutex_;

std::atomic_size_t data_;
eprosima::fastrtps::LivelinessChangedStatus liveliness_changed_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::atomic_bool deadline_changes_;
eprosima::fastrtps::RequestedDeadlineMissedStatus requested_deadline_missed_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::atomic_bool liveliness_changes_;
eprosima::fastrtps::LivelinessChangedStatus liveliness_changed_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

Expand Down
13 changes: 8 additions & 5 deletions rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ PubListener::on_offered_deadline_missed(
offered_deadline_missed_status_.total_count = status.total_count;
// Accumulate deltas
offered_deadline_missed_status_.total_count_change += status.total_count_change;

deadline_changes_.store(true, std::memory_order_relaxed);
}

void PubListener::on_liveliness_lost(
Expand All @@ -51,15 +53,17 @@ void PubListener::on_liveliness_lost(
liveliness_lost_status_.total_count = status.total_count;
// Accumulate deltas
liveliness_lost_status_.total_count_change += status.total_count_change;

liveliness_changes_.store(true, std::memory_order_relaxed);
}

bool PubListener::hasEvent(rmw_event_type_t event_type) const
{
switch (event_type) {
case RMW_EVENT_LIVELINESS_LOST:
return liveliness_lost_status_.total_count_change != 0;
return liveliness_changes_.load(std::memory_order_relaxed);
case RMW_EVENT_OFFERED_DEADLINE_MISSED:
return offered_deadline_missed_status_.total_count_change != 0;
return deadline_changes_.load(std::memory_order_relaxed);
default:
break;
}
Expand All @@ -68,9 +72,6 @@ bool PubListener::hasEvent(rmw_event_type_t event_type) const

bool PubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info)
{
if (!hasEvent(event_type)) {
return false;
}
std::lock_guard<std::mutex> lock(internalMutex_);
switch (event_type) {
case RMW_EVENT_LIVELINESS_LOST: {
Expand All @@ -79,13 +80,15 @@ bool PubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info)
rmw_data->total_count = liveliness_lost_status_.total_count;
rmw_data->total_count_change = liveliness_lost_status_.total_count_change;
liveliness_lost_status_.total_count_change = 0;
liveliness_changes_.store(false, std::memory_order_relaxed);
} break;
case RMW_EVENT_OFFERED_DEADLINE_MISSED: {
rmw_offered_deadline_missed_status_t * rmw_data =
static_cast<rmw_offered_deadline_missed_status_t *>(event_info);
rmw_data->total_count = offered_deadline_missed_status_.total_count;
rmw_data->total_count_change = offered_deadline_missed_status_.total_count_change;
offered_deadline_missed_status_.total_count_change = 0;
deadline_changes_.store(false, std::memory_order_relaxed);
} break;
default:
return false;
Expand Down
14 changes: 8 additions & 6 deletions rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ SubListener::on_requested_deadline_missed(
requested_deadline_missed_status_.total_count = status.total_count;
// Accumulate deltas
requested_deadline_missed_status_.total_count_change += status.total_count_change;

deadline_changes_.store(true, std::memory_order_relaxed);
}

void SubListener::on_liveliness_changed(
Expand All @@ -53,16 +55,17 @@ void SubListener::on_liveliness_changed(
// Accumulate deltas
liveliness_changed_status_.alive_count_change += status.alive_count_change;
liveliness_changed_status_.not_alive_count_change += status.not_alive_count_change;

liveliness_changes_.store(true, std::memory_order_relaxed);
}

bool SubListener::hasEvent(rmw_event_type_t event_type) const
{
switch (event_type) {
case RMW_EVENT_LIVELINESS_CHANGED:
return liveliness_changed_status_.alive_count_change +
liveliness_changed_status_.not_alive_count_change != 0;
return liveliness_changes_.load(std::memory_order_relaxed);
case RMW_EVENT_REQUESTED_DEADLINE_MISSED:
return requested_deadline_missed_status_.total_count_change != 0;
return deadline_changes_.load(std::memory_order_relaxed);
default:
break;
}
Expand All @@ -71,9 +74,6 @@ bool SubListener::hasEvent(rmw_event_type_t event_type) const

bool SubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info)
{
if (!hasEvent(event_type)) {
return false;
}
std::lock_guard<std::mutex> lock(internalMutex_);
switch (event_type) {
case RMW_EVENT_LIVELINESS_CHANGED: {
Expand All @@ -85,13 +85,15 @@ bool SubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info)
rmw_data->not_alive_count_change = liveliness_changed_status_.not_alive_count_change;
liveliness_changed_status_.alive_count_change = 0;
liveliness_changed_status_.not_alive_count_change = 0;
liveliness_changes_.store(false, std::memory_order_relaxed);
} break;
case RMW_EVENT_REQUESTED_DEADLINE_MISSED: {
rmw_requested_deadline_missed_status_t * rmw_data =
static_cast<rmw_requested_deadline_missed_status_t *>(event_info);
rmw_data->total_count = requested_deadline_missed_status_.total_count;
rmw_data->total_count_change = requested_deadline_missed_status_.total_count_change;
requested_deadline_missed_status_.total_count_change = 0;
deadline_changes_.store(false, std::memory_order_relaxed);
} break;
default:
return false;
Expand Down
15 changes: 7 additions & 8 deletions rmw_fastrtps_shared_cpp/src/qos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,14 +157,13 @@ get_datawriter_qos(
}

bool
is_valid_qos(
const rmw_qos_profile_t & qos_policies)
is_valid_qos(const rmw_qos_profile_t & /*qos_policies*/)
{
if (qos_policies.liveliness == RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE ||
qos_policies.liveliness == RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC)
{
RMW_SET_ERROR_MSG("Manual liveliness unsupported for fastrtps");
return false;
}
// if (qos_policies.liveliness == RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_NODE ||
// qos_policies.liveliness == RMW_QOS_POLICY_LIVELINESS_MANUAL_BY_TOPIC)
// {
// RMW_SET_ERROR_MSG("Manual liveliness unsupported for fastrtps");
// return false;
// }
return true;
}
2 changes: 1 addition & 1 deletion rmw_fastrtps_shared_cpp/src/rmw_wait.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ check_wait_set_for_data(
for (size_t i = 0; i < events->event_count; ++i) {
auto event = static_cast<rmw_event_t *>(events->events[i]);
auto custom_event_info = static_cast<CustomEventInfo *>(event->data);
if (!custom_event_info->getListener()->hasEvent(event->event_type)) {
if (custom_event_info->getListener()->hasEvent(event->event_type)) {
return true;
}
}
Expand Down

0 comments on commit 8044d51

Please sign in to comment.