Skip to content

Commit be83ddb

Browse files
authored
Merge pull request #4085 from jrw972/internal-dds-improvements
Internal DDS improvements
2 parents 6e26024 + e1f3bad commit be83ddb

19 files changed

+2247
-485
lines changed

dds/DCPS/InternalDataReader.h

Lines changed: 408 additions & 91 deletions
Large diffs are not rendered by default.

dds/DCPS/InternalDataWriter.h

Lines changed: 71 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,34 @@ OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
2222
namespace OpenDDS {
2323
namespace DCPS {
2424

25+
/*
26+
The InternalDataWriter supports the following QoS:
27+
28+
DurabilityQosPolicy durability; => VOLATILE, TRANSIENT_LOCAL
29+
DurabilityServiceQosPolicy durability_service; => None
30+
DeadlineQosPolicy deadline; => None
31+
LatencyBudgetQosPolicy latency_budget => None
32+
LivelinessQosPolicy liveliness => None
33+
ReliabilityQosPolicy reliability => RELIABLE
34+
DestinationOrderQosPolicy destination_order; => None
35+
HistoryQosPolicy history; => KEEP_LAST_HISTORY, KEEP_ALL_HISTORY
36+
ResourceLimitsQosPolicy resource_limits; => None
37+
TransportPriorityQosPolicy transport_priority; => None
38+
LifespanQosPolicy lifespan; => None
39+
UserDataQosPolicy user_data; => None
40+
OwnershipQosPolicy ownership; => None
41+
OwnershipStrengthQosPolicy ownership_strength; => None
42+
WriterDataLifecycleQosPolicy writer_data_lifecycle; => Yes
43+
*/
44+
2545
template <typename T>
2646
class InternalDataWriter : public InternalEntity {
2747
public:
2848
typedef RcHandle<InternalDataReader<T> > InternalDataReader_rch;
2949
typedef WeakRcHandle<InternalDataReader<T> > InternalDataReader_wrch;
3050

31-
explicit InternalDataWriter(bool durable)
32-
: durable_(durable)
51+
explicit InternalDataWriter(const DDS::DataWriterQos& qos)
52+
: qos_(qos)
3353
{}
3454

3555
/// @name InternalTopic Interface
@@ -39,14 +59,10 @@ class InternalDataWriter : public InternalEntity {
3959
ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
4060
readers_.insert(reader);
4161

42-
if (durable_ && reader->durable()) {
43-
for (typename InstanceMap::const_iterator pos = instance_map_.begin(), limit = instance_map_.end();
62+
if (qos_.durability.kind == DDS::TRANSIENT_LOCAL_DURABILITY_QOS && reader->durable()) {
63+
for (typename InstanceMap::iterator pos = instance_map_.begin(), limit = instance_map_.end();
4464
pos != limit; ++pos) {
45-
if (pos->second.valid_data) {
46-
reader->write(static_rchandle_cast<InternalEntity>(rchandle_from(this)), pos->second.sample);
47-
} else {
48-
reader->register_instance(static_rchandle_cast<InternalEntity>(rchandle_from(this)), pos->first);
49-
}
65+
pos->second.add_reader(reader, static_rchandle_cast<InternalEntity>(rchandle_from(this)));
5066
}
5167
}
5268
}
@@ -55,7 +71,7 @@ class InternalDataWriter : public InternalEntity {
5571
{
5672
ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
5773
if (readers_.erase(reader)) {
58-
reader->remove_publication(static_rchandle_cast<InternalEntity>(rchandle_from(this)));
74+
reader->remove_publication(static_rchandle_cast<InternalEntity>(rchandle_from(this)), qos_.writer_data_lifecycle.autodispose_unregistered_instances);
5975
}
6076
}
6177

@@ -73,30 +89,13 @@ class InternalDataWriter : public InternalEntity {
7389

7490
/// @name User Interface
7591
/// @{
76-
void register_instance(const T& sample)
77-
{
78-
ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
79-
80-
if (durable_) {
81-
instance_map_.insert(std::make_pair(sample, SampleHolder()));
82-
}
83-
84-
for (typename ReaderSet::const_iterator pos = readers_.begin(), limit = readers_.end(); pos != limit; ++pos) {
85-
InternalDataReader_rch reader = pos->lock();
86-
if (reader) {
87-
reader->register_instance(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample);
88-
}
89-
}
90-
}
91-
9292
void write(const T& sample)
9393
{
9494
ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
9595

96-
if (durable_) {
96+
if (qos_.durability.kind == DDS::TRANSIENT_LOCAL_DURABILITY_QOS) {
9797
const std::pair<typename InstanceMap::iterator, bool> p = instance_map_.insert(std::make_pair(sample, SampleHolder()));
98-
p.first->second.sample = sample;
99-
p.first->second.valid_data = true;
98+
p.first->second.write(sample, qos_);
10099
}
101100

102101
for (typename ReaderSet::const_iterator pos = readers_.begin(), limit = readers_.end(); pos != limit; ++pos) {
@@ -107,52 +106,81 @@ class InternalDataWriter : public InternalEntity {
107106
}
108107
}
109108

110-
void unregister_instance(const T& sample)
109+
void dispose(const T& sample)
111110
{
112111
ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
113112

114-
if (durable_) {
115-
instance_map_.erase(sample);
113+
if (qos_.durability.kind == DDS::TRANSIENT_LOCAL_DURABILITY_QOS) {
114+
typename InstanceMap::iterator pos = instance_map_.find(sample);
115+
if (pos != instance_map_.end()) {
116+
pos->second.dispose();
117+
}
116118
}
117119

118120
for (typename ReaderSet::const_iterator pos = readers_.begin(), limit = readers_.end(); pos != limit; ++pos) {
119121
InternalDataReader_rch reader = pos->lock();
120122
if (reader) {
121-
reader->unregister_instance(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample);
123+
reader->dispose(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample);
122124
}
123125
}
124126
}
125127

126-
void dispose(const T& sample)
128+
void unregister_instance(const T& sample)
127129
{
128130
ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
129131

130-
if (durable_) {
132+
if (qos_.durability.kind == DDS::TRANSIENT_LOCAL_DURABILITY_QOS) {
131133
instance_map_.erase(sample);
132134
}
133135

134136
for (typename ReaderSet::const_iterator pos = readers_.begin(), limit = readers_.end(); pos != limit; ++pos) {
135137
InternalDataReader_rch reader = pos->lock();
136138
if (reader) {
137-
reader->dispose(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample);
139+
if (qos_.writer_data_lifecycle.autodispose_unregistered_instances) {
140+
reader->dispose(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample);
141+
}
142+
reader->unregister_instance(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample);
138143
}
139144
}
140145
}
141146
/// @}
142147

143148
private:
144-
const bool durable_;
149+
const DDS::DataWriterQos qos_;
145150

146151
typedef OPENDDS_SET(InternalDataReader_wrch) ReaderSet;
147152
ReaderSet readers_;
148153

149-
struct SampleHolder {
150-
T sample;
151-
bool valid_data;
154+
class SampleHolder {
155+
public:
156+
bool empty() const { return samples_.empty(); }
157+
158+
void add_reader(InternalDataReader_rch reader, RcHandle<InternalEntity> writer)
159+
{
160+
for (typename SampleList::const_iterator pos = samples_.begin(), limit = samples_.end(); pos != limit; ++pos) {
161+
reader->write(writer, *pos);
162+
}
163+
}
164+
165+
void write(const T& sample,
166+
const DDS::DataWriterQos& qos)
167+
{
168+
samples_.push_back(sample);
169+
if (qos.history.kind == DDS::KEEP_LAST_HISTORY_QOS) {
170+
while (samples_.size() > static_cast<std::size_t>(qos.history.depth)) {
171+
samples_.pop_front();
172+
}
173+
}
174+
}
175+
176+
void dispose()
177+
{
178+
samples_.clear();
179+
}
152180

153-
SampleHolder()
154-
: valid_data(false)
155-
{}
181+
private:
182+
typedef OPENDDS_LIST(T) SampleList;
183+
SampleList samples_;
156184
};
157185

158186
typedef OPENDDS_MAP_T(T, SampleHolder) InstanceMap;

0 commit comments

Comments
 (0)