Skip to content

Commit

Permalink
Fastrtps18 event callbacks policies (#275)
Browse files Browse the repository at this point in the history
* WIP support Deadline, Lifespan, Liveliness QoS Policy types and event callbacks in Fast-RTPS 1.8

Signed-off-by: Emerson Knapp <eknapp@amazon.com>

* address some PR comments

Signed-off-by: Miaofei <miaofei@amazon.com>

* fix issues uncovered by unit tests

Signed-off-by: Miaofei <miaofei@amazon.com>

* disable manual liveliness support

Signed-off-by: Miaofei <miaofei@amazon.com>

* fix windows build

Signed-off-by: Miaofei <miaofei@amazon.com>

* fix windows compiler warnings

Signed-off-by: Miaofei <miaofei@amazon.com>
  • Loading branch information
emersonknapp authored and dirk-thomas committed May 16, 2019
1 parent f629b2f commit d2e8a8d
Show file tree
Hide file tree
Showing 10 changed files with 567 additions and 164 deletions.
2 changes: 2 additions & 0 deletions rmw_fastrtps_shared_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ find_package(rmw REQUIRED)
include_directories(include)

add_library(rmw_fastrtps_shared_cpp
src/custom_publisher_info.cpp
src/custom_subscriber_info.cpp
src/demangle.cpp
src/namespace_prefix.cpp
src/qos.cpp
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_
#define RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_

#include <atomic>
#include <condition_variable>
#include <list>
#include <memory>
#include <mutex>
#include <utility>

#include "fastcdr/FastBuffer.h"

#include "fastrtps/subscriber/SampleInfo.h"
#include "fastrtps/subscriber/Subscriber.h"
#include "fastrtps/subscriber/SubscriberListener.h"
#include "fastrtps/participant/Participant.h"
#include "fastrtps/publisher/Publisher.h"
#include "fastrtps/publisher/PublisherListener.h"

#include "rmw/event.h"

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"


class EventListenerInterface
{
protected:
class ConditionalScopedLock;

public:
/// Connect a condition variable so a waiter can be notified of new data.
virtual void attachCondition(
std::mutex * conditionMutex,
std::condition_variable * conditionVariable) = 0;

/// Unset the information from attachCondition.
virtual void detachCondition() = 0;

/// Check if there is new data available for a specific event type.
/**
* \param event_type The event type to check on.
* \return `true` if new data is available.
*/
virtual bool hasEvent(rmw_event_type_t event_type) const = 0;

/// Take ready data for an event type.
/**
* \param event_type The event type to get data for.
* \param event_info A preallocated event information (from rmw/types.h) to fill with data
* \return `true` if data was successfully taken.
* \return `false` if data was not available, in this case nothing was written to event_info.
*/
virtual bool takeNextEvent(rmw_event_type_t event_type, void * event_info) = 0;
};

class EventListenerInterface::ConditionalScopedLock
{
public:
ConditionalScopedLock(
std::mutex * mutex,
std::condition_variable * condition_variable = nullptr)
: mutex_(mutex), cv_(condition_variable)
{
if (nullptr != mutex_) {
mutex_->lock();
}
}

~ConditionalScopedLock()
{
if (nullptr != mutex_) {
mutex_->unlock();
if (nullptr != cv_) {
cv_->notify_all();
}
}
}

private:
std::mutex * mutex_;
std::condition_variable * cv_;
};

struct CustomEventInfo
{
virtual EventListenerInterface * getListener() const = 0;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_EVENT_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
#ifndef RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_
#define RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_

#include <atomic>
#include <mutex>
#include <condition_variable>
#include <set>

#include "fastrtps/publisher/Publisher.h"
Expand All @@ -25,29 +27,43 @@
#include "rmw/rmw.h"

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"
#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp"


class PubListener;

typedef struct CustomPublisherInfo
typedef struct CustomPublisherInfo : public CustomEventInfo
{
virtual ~CustomPublisherInfo() = default;

eprosima::fastrtps::Publisher * publisher_;
PubListener * listener_;
rmw_fastrtps_shared_cpp::TypeSupport * type_support_;
rmw_gid_t publisher_gid;
const char * typesupport_identifier_;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
EventListenerInterface *
getListener() const final;
} CustomPublisherInfo;

class PubListener : public eprosima::fastrtps::PublisherListener
class PubListener : public EventListenerInterface, public eprosima::fastrtps::PublisherListener
{
public:
explicit PubListener(CustomPublisherInfo * info)
: deadline_changes_(false),
liveliness_changes_(false),
conditionMutex_(nullptr),
conditionVariable_(nullptr)
{
(void) info;
}

// PublisherListener implementation
RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
onPublicationMatched(
eprosima::fastrtps::Publisher * pub, eprosima::fastrtps::rtps::MatchingInfo & info)
eprosima::fastrtps::Publisher * pub, eprosima::fastrtps::rtps::MatchingInfo & info) final
{
(void) pub;
std::lock_guard<std::mutex> lock(internalMutex_);
Expand All @@ -58,16 +74,67 @@ class PubListener : public eprosima::fastrtps::PublisherListener
}
}

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
on_offered_deadline_missed(
eprosima::fastrtps::Publisher * publisher,
const eprosima::fastrtps::OfferedDeadlineMissedStatus & status) final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
on_liveliness_lost(
eprosima::fastrtps::Publisher * publisher,
const eprosima::fastrtps::LivelinessLostStatus & status) final;


// EventListenerInterface implementation
RMW_FASTRTPS_SHARED_CPP_PUBLIC
bool
hasEvent(rmw_event_type_t event_type) const final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
bool
takeNextEvent(rmw_event_type_t event_type, void * event_info) final;

// PubListener API
size_t subscriptionCount()
{
std::lock_guard<std::mutex> lock(internalMutex_);
return subscriptions_.size();
}

void
attachCondition(std::mutex * conditionMutex, std::condition_variable * conditionVariable)
{
std::lock_guard<std::mutex> lock(internalMutex_);
conditionMutex_ = conditionMutex;
conditionVariable_ = conditionVariable;
}

void
detachCondition()
{
std::lock_guard<std::mutex> lock(internalMutex_);
conditionMutex_ = nullptr;
conditionVariable_ = nullptr;
}

private:
std::mutex internalMutex_;
std::set<eprosima::fastrtps::rtps::GUID_t>
subscriptions_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
mutable std::mutex internalMutex_;

std::set<eprosima::fastrtps::rtps::GUID_t> subscriptions_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::atomic_bool deadline_changes_;
eprosima::fastrtps::OfferedDeadlineMissedStatus offered_deadline_missed_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::atomic_bool liveliness_changes_;
eprosima::fastrtps::LivelinessLostStatus liveliness_lost_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_PUBLISHER_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -26,35 +26,47 @@

#include "rcpputils/thread_safety_annotations.hpp"

#include "rmw/impl/cpp/macros.hpp"

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"
#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp"


class SubListener;

typedef struct CustomSubscriberInfo
typedef struct CustomSubscriberInfo : public CustomEventInfo
{
virtual ~CustomSubscriberInfo() = default;

eprosima::fastrtps::Subscriber * subscriber_;
SubListener * listener_;
rmw_fastrtps_shared_cpp::TypeSupport * type_support_;
const char * typesupport_identifier_;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
EventListenerInterface *
getListener() const final;
} CustomSubscriberInfo;

class SubListener : public eprosima::fastrtps::SubscriberListener
class SubListener : public EventListenerInterface, public eprosima::fastrtps::SubscriberListener
{
public:
explicit SubListener(CustomSubscriberInfo * info)
: data_(0),
conditionMutex_(nullptr), conditionVariable_(nullptr)
deadline_changes_(false),
liveliness_changes_(false),
conditionMutex_(nullptr),
conditionVariable_(nullptr)
{
// Field is not used right now
(void)info;
}

// SubscriberListener implementation
void
onSubscriptionMatched(
eprosima::fastrtps::Subscriber * sub, eprosima::fastrtps::rtps::MatchingInfo & info)
eprosima::fastrtps::Subscriber * /*sub*/, eprosima::fastrtps::rtps::MatchingInfo & info) final
{
(void)sub;

std::lock_guard<std::mutex> lock(internalMutex_);
if (eprosima::fastrtps::rtps::MATCHED_MATCHING == info.status) {
publishers_.insert(info.remoteEndpointGuid);
Expand All @@ -64,23 +76,39 @@ class SubListener : public eprosima::fastrtps::SubscriberListener
}

void
onNewDataMessage(eprosima::fastrtps::Subscriber * sub)
onNewDataMessage(eprosima::fastrtps::Subscriber * sub) final
{
(void)sub;
std::lock_guard<std::mutex> lock(internalMutex_);

if (conditionMutex_ != nullptr) {
std::unique_lock<std::mutex> clock(*conditionMutex_);
// the change to data_ needs to be mutually exclusive with rmw_wait()
// which checks hasData() and decides if wait() needs to be called
data_ = sub->getUnreadCount();
clock.unlock();
conditionVariable_->notify_one();
} else {
data_ = sub->getUnreadCount();
}
// the change to liveliness_lost_count_ needs to be mutually exclusive with
// rmw_wait() which checks hasEvent() and decides if wait() needs to be called
ConditionalScopedLock clock(conditionMutex_, conditionVariable_);

data_.store(sub->getUnreadCount(), std::memory_order_relaxed);
}

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
on_requested_deadline_missed(
eprosima::fastrtps::Subscriber *,
const eprosima::fastrtps::RequestedDeadlineMissedStatus &) final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void
on_liveliness_changed(
eprosima::fastrtps::Subscriber *,
const eprosima::fastrtps::LivelinessChangedStatus &) final;

// EventListenerInterface implementation
RMW_FASTRTPS_SHARED_CPP_PUBLIC
bool
hasEvent(rmw_event_type_t event_type) const final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
bool
takeNextEvent(rmw_event_type_t event_type, void * event_info) final;

// SubListener API
void
attachCondition(std::mutex * conditionMutex, std::condition_variable * conditionVariable)
{
Expand All @@ -98,22 +126,17 @@ class SubListener : public eprosima::fastrtps::SubscriberListener
}

bool
hasData()
hasData() const
{
return data_ > 0;
return data_.load(std::memory_order_relaxed) > 0;
}

void
data_taken(eprosima::fastrtps::Subscriber * sub)
{
std::lock_guard<std::mutex> lock(internalMutex_);

if (conditionMutex_ != nullptr) {
std::unique_lock<std::mutex> clock(*conditionMutex_);
data_ = sub->getUnreadCount();
} else {
data_ = sub->getUnreadCount();
}
ConditionalScopedLock clock(conditionMutex_);
data_.store(sub->getUnreadCount(), std::memory_order_relaxed);
}

size_t publisherCount()
Expand All @@ -123,8 +146,18 @@ class SubListener : public eprosima::fastrtps::SubscriberListener
}

private:
std::mutex internalMutex_;
mutable std::mutex internalMutex_;

std::atomic_size_t data_;

std::atomic_bool deadline_changes_;
eprosima::fastrtps::RequestedDeadlineMissedStatus requested_deadline_missed_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::atomic_bool liveliness_changes_;
eprosima::fastrtps::LivelinessChangedStatus liveliness_changed_status_
RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

Expand Down
Loading

0 comments on commit d2e8a8d

Please sign in to comment.