diff --git a/autowiring/AutoPacket.h b/autowiring/AutoPacket.h index 0ae999f37..b0fc4695c 100644 --- a/autowiring/AutoPacket.h +++ b/autowiring/AutoPacket.h @@ -114,7 +114,7 @@ class AutoPacket: /// A satisfaction pulse will call any AutoFilter instances which are satisfied by the /// decoration of the passed decoration types. /// - void PulseSatisfaction(DecorationDisposition* pTypeSubs[], size_t nInfos); + void PulseSatisfactionUnsafe(std::unique_lock lk, DecorationDisposition* pTypeSubs [], size_t nInfos); /// Unsynchronized runtime counterpart to Has bool HasUnsafe(const DecorationKey& key) const; @@ -186,7 +186,7 @@ class AutoPacket: template bool Has(int tshift=0) const { std::lock_guard lk(m_lock); - return HasUnsafe(DecorationKey(auto_id::key(), true, tshift)); + return HasUnsafe(DecorationKey(auto_id::key(), tshift)); } /// @@ -198,7 +198,7 @@ class AutoPacket: const T* retVal; if (!Get(retVal, tshift)) - ThrowNotDecoratedException(DecorationKey(auto_id::key(), false, tshift)); + ThrowNotDecoratedException(DecorationKey(auto_id::key(), tshift)); return *retVal; } @@ -211,7 +211,7 @@ class AutoPacket: /// template bool Get(const T*& out, int tshift=0) const { - DecorationKey key(auto_id::key(), false, tshift); + DecorationKey key(auto_id::key(), tshift); const DecorationDisposition* pDisposition = GetDisposition(key); if (pDisposition) { switch (pDisposition->m_decorations.size()) { @@ -251,7 +251,7 @@ class AutoPacket: template bool Get(const std::shared_ptr*& out, int tshift=0) const { // Decoration must be present and the shared pointer itself must also be present - DecorationKey key(auto_id::key(), true, tshift); + DecorationKey key(auto_id::key(), tshift); const DecorationDisposition* pDisposition = GetDisposition(key); if (!pDisposition) { out = nullptr; @@ -314,7 +314,7 @@ class AutoPacket: template const T** GetAll(int tshift = 0) const { std::lock_guard lk(m_lock); - auto q = m_decorations.find(DecorationKey(auto_id::key(), true, tshift)); + auto q = m_decorations.find(DecorationKey(auto_id::key(), tshift)); // If decoration doesn't exist, return empty null-terminated buffer if (q == m_decorations.end()) { @@ -351,8 +351,7 @@ class AutoPacket: /// template void Unsatisfiable(void) { - MarkUnsatisfiable(DecorationKey(auto_id::key(), false, 0)); - MarkUnsatisfiable(DecorationKey(auto_id::key(), true, 0)); + MarkUnsatisfiable(DecorationKey(auto_id::key(), 0)); } /// @@ -369,7 +368,7 @@ class AutoPacket: auto ptr = std::make_shared(std::forward(t)); Decorate( AnySharedPointer(ptr), - DecorationKey(auto_id::key(), true, 0) + DecorationKey(auto_id::key(), 0) ); return *ptr; } @@ -385,7 +384,7 @@ class AutoPacket: /// template void Decorate(std::shared_ptr ptr) { - DecorationKey key(auto_id::key(), true, 0); + DecorationKey key(auto_id::key(), 0); // We don't want to see this overload used on a const T static_assert(!std::is_const::value, "Cannot decorate a shared pointer to const T with this overload"); @@ -435,11 +434,9 @@ class AutoPacket: // Perform standard decoration with a short initialization: std::unique_lock lk(m_lock); DecorationDisposition* pTypeSubs[1 + sizeof...(Ts)] = { - &DecorateImmediateUnsafe(DecorationKey(auto_id::key(), false, 0), &immed), - &DecorateImmediateUnsafe(DecorationKey(auto_id::key(), false, 0), &immeds)... + &DecorateImmediateUnsafe(DecorationKey(auto_id::key(), 0), &immed), + &DecorateImmediateUnsafe(DecorationKey(auto_id::key(), 0), &immeds)... }; - lk.unlock(); - // Pulse satisfaction: MakeAtExit([this, &pTypeSubs] { @@ -451,12 +448,10 @@ class AutoPacket: } // Now trigger a rescan to hit any deferred, unsatisfiable entries: - for (const std::type_info* ti : {&auto_id::key(), &auto_id::key()...}) { - MarkUnsatisfiable(DecorationKey(*ti, true, 0)); - MarkUnsatisfiable(DecorationKey(*ti, false, 0)); - } + for (const std::type_info* ti : {&auto_id::key(), &auto_id::key()...}) + MarkUnsatisfiable(DecorationKey(*ti, 0)); }), - PulseSatisfaction(pTypeSubs, 1 + sizeof...(Ts)); + PulseSatisfactionUnsafe(std::move(lk), pTypeSubs, 1 + sizeof...(Ts)); } /// @@ -509,9 +504,7 @@ class AutoPacket: /// True if the indicated type has been requested for use by some consumer template bool HasSubscribers(void) const { - return - HasSubscribers(DecorationKey(auto_id::key(), false, 0)) || - HasSubscribers(DecorationKey(auto_id::key(), true, 0)); + return HasSubscribers(DecorationKey(auto_id::key(), 0)); } struct SignalStub { diff --git a/autowiring/DecorationDisposition.h b/autowiring/DecorationDisposition.h index 116c68fbd..f82c40fac 100644 --- a/autowiring/DecorationDisposition.h +++ b/autowiring/DecorationDisposition.h @@ -13,28 +13,23 @@ struct DecorationKey { DecorationKey(const DecorationKey& rhs) : ti(rhs.ti), - is_shared(rhs.is_shared), tshift(rhs.tshift) {} - explicit DecorationKey(const std::type_info& ti, bool is_shared, int tshift) : + explicit DecorationKey(const std::type_info& ti, int tshift) : ti(&ti), - is_shared(is_shared), tshift(tshift) {} // The type index const std::type_info* ti = nullptr; - // True if this decoration can be used with AutoFilters that accept a shared_ptr input type - bool is_shared = false; - // Zero refers to a decoration created on this packet, a positive number [tshift] indicates // a decoration attached [tshift] packets ago. int tshift = -1; bool operator==(const DecorationKey& rhs) const { - return ti == rhs.ti && is_shared == rhs.is_shared && tshift == rhs.tshift; + return ti == rhs.ti && tshift == rhs.tshift; } }; @@ -42,10 +37,7 @@ namespace std { template<> struct hash { size_t operator()(const DecorationKey& key) const { - return - key.tshift + - (key.is_shared ? 0x80000 : 0x70000) + - key.ti->hash_code(); + return key.tshift + key.ti->hash_code(); } }; } @@ -61,10 +53,6 @@ enum class DispositionState { // Everything attached, ready to go Satisfied, - // Unsatisfiable, and the callers on this decoration cannot accept a non-null - // entry--IE, they accept const references as inputs. - UnsatisfiableNoCall, - // This decoration will never be satisfied. Calls are generated with a null // shared pointer passed as the value. Unsatisfiable @@ -100,7 +88,24 @@ struct DecorationDisposition std::vector m_publishers; // Satisfaction counters - std::vector m_subscribers; + struct Subscriber { + Subscriber(void) {} + + Subscriber(bool is_optional, SatCounter* satCounter): + is_optional{is_optional}, + satCounter{satCounter} + {} + + // Optional flag. If this flag is set, it indicates that the referenced AutoFilter could still + // be called even if the decoration is not attached to the packet--IE, the AutoFilter accepts a + // shared pointer or an array + bool is_optional = false; + + // The satisfaction counter itself + SatCounter* satCounter = nullptr; + }; + + std::vector m_subscribers; // The current state of this disposition DispositionState m_state = DispositionState::Unsatisfied; diff --git a/src/autowiring/AutoPacket.cpp b/src/autowiring/AutoPacket.cpp index b7ee0de88..1e1f69904 100644 --- a/src/autowiring/AutoPacket.cpp +++ b/src/autowiring/AutoPacket.cpp @@ -30,7 +30,7 @@ AutoPacket::~AutoPacket(void) { // originating from this packet as unsatisfiable for (auto& pair : m_decorations) if (!pair.first.tshift && pair.second.m_state != DispositionState::Satisfied) - MarkSuccessorsUnsatisfiable(DecorationKey(*pair.first.ti, pair.first.is_shared, 0)); + MarkSuccessorsUnsatisfiable(DecorationKey(*pair.first.ti, 0)); // Needed for the AutoPacketGraph NotifyTeardownListeners(); @@ -77,7 +77,7 @@ DecorationDisposition& AutoPacket::DecorateImmediateUnsafe(const DecorationKey& void AutoPacket::AddSatCounterUnsafe(SatCounter& satCounter) { for(auto pCur = satCounter.GetAutoFilterArguments(); *pCur; pCur++) { - DecorationKey key(*pCur->ti, pCur->is_shared, pCur->tshift); + DecorationKey key(*pCur->ti, pCur->tshift); DecorationDisposition& entry = m_decorations[key]; // Decide what to do with this entry: @@ -88,7 +88,7 @@ void AutoPacket::AddSatCounterUnsafe(SatCounter& satCounter) { throw std::runtime_error(ss.str()); } - entry.m_subscribers.push_back(&satCounter); + entry.m_subscribers.push_back({pCur->is_shared, &satCounter}); switch (entry.m_state) { case DispositionState::Satisfied: case DispositionState::Unsatisfiable: @@ -100,8 +100,8 @@ void AutoPacket::AddSatCounterUnsafe(SatCounter& satCounter) { } if (pCur->is_output) { if(!entry.m_publishers.empty()) - for (SatCounter* subscriber : entry.m_subscribers) - for(auto pOther = subscriber->GetAutoFilterArguments(); *pOther; pOther++) + for (const auto& subscriber : entry.m_subscribers) + for(auto pOther = subscriber.satCounter->GetAutoFilterArguments(); *pOther; pOther++) if (!pOther->is_multi) { std::stringstream ss; ss << "Added identical data broadcasts of type " << autowiring::demangle(pCur->ti) << " with existing subscriber."; @@ -113,7 +113,7 @@ void AutoPacket::AddSatCounterUnsafe(SatCounter& satCounter) { // Make sure decorations exist for timeshifts less that key's timeshift for (int tshift = 0; tshift < key.tshift; ++tshift) - m_decorations[DecorationKey(*key.ti, key.is_shared, tshift)]; + m_decorations[DecorationKey(*key.ti, tshift)]; } } @@ -126,10 +126,7 @@ void AutoPacket::MarkUnsatisfiable(const DecorationKey& key) { throw std::runtime_error("Cannot mark a decoration as unsatisfiable when that decoration is already present on this packet"); // Mark the entry as permanently unsatisfiable: - entry.m_state = - key.is_shared ? - DispositionState::Unsatisfiable : - DispositionState::UnsatisfiableNoCall; + entry.m_state = DispositionState::Unsatisfiable; // Notify all consumers UpdateSatisfactionUnsafe(std::move(lk), entry); @@ -157,10 +154,17 @@ void AutoPacket::UpdateSatisfactionUnsafe(std::unique_lock&& lk, con // Update satisfaction inside of lock switch (disposition.m_state) { case DispositionState::Unsatisfiable: + // Call only optional subscribers + for (const auto& subscriber : disposition.m_subscribers) + if (subscriber.is_optional && subscriber.satCounter->Decrement()) + callQueue.push_back(subscriber.satCounter); + break; case DispositionState::Satisfied: - for (SatCounter* satCounter : disposition.m_subscribers) - if (satCounter->Decrement()) - callQueue.push_back(satCounter); + // Identical to the unsatisfiable case, except we don't need to test subscribers to see + // if their inputs are considered optional + for (const auto& subscriber : disposition.m_subscribers) + if (subscriber.satCounter->Decrement()) + callQueue.push_back(subscriber.satCounter); break; default: // Nothing to do @@ -173,39 +177,57 @@ void AutoPacket::UpdateSatisfactionUnsafe(std::unique_lock&& lk, con call->GetCall()(call->GetAutoFilter(), *this); } -void AutoPacket::PulseSatisfaction(DecorationDisposition* pTypeSubs[], size_t nInfos) { +void AutoPacket::PulseSatisfactionUnsafe(std::unique_lock lk, DecorationDisposition* pTypeSubs[], size_t nInfos) { std::vector callQueue; + std::vector reincrement; - // First pass, decrement what we can: + // We will need to loop as long as each pass results in something more to be executed + do { - std::lock_guard lk(m_lock); - for(size_t i = nInfos; i--;) - for(SatCounter* cur : pTypeSubs[i]->m_subscribers) { - if( + // Empty everything first + callQueue.clear(); + reincrement.clear(); + + // First pass, decrement what we can: + for (size_t i = nInfos; i--;) + for (const auto& cur : pTypeSubs[i]->m_subscribers) { + SatCounter* satCounter = cur.satCounter; + if ( + // Shared pointer inputs can't receive a non-null input: + !cur.is_optional && + // We only care about sat counters that aren't deferred--skip everyone else // Deferred calls will be too late. - !cur->IsDeferred() && - - // Now do the decrementation and proceed even if optional > 0, - // since this is the only opportunity to fulfill the arguments - cur->Decrement() + !satCounter->IsDeferred() ) - // Finally, queue a call for this type - callQueue.push_back(cur); + { + if (satCounter->Decrement()) + // This one is satisfied, we will need to call it + callQueue.push_back(satCounter); + else + // Reincrement. We need to retain total control over this immediate-mode decoration. + // If a decrementation happens in another thread context due to a simultaneous call + // to DecorateImmediate, we could wind up with a race condition. Multiple decorations + // passed to DecorateImmediate may satisfy simultaneously, however, so we want to + // delay reincrementation until after the loop concludes + reincrement.push_back(satCounter); + } } - } - // Make calls outside of lock, to avoid deadlock from decorations in methods - for (SatCounter* call : callQueue) - call->GetCall()(call->GetAutoFilter(), *this); - - // Reset all counters, data in this call will not be available on return - { - std::lock_guard lk(m_lock); - for(size_t i = nInfos; i--;) - for(SatCounter* cur : pTypeSubs[i]->m_subscribers) - cur->Increment(); - } + // Reincrement anything that didn't zero out. This is safe to do even if some of these + // entries are present in the callQueue; if there's a call in the callQueue, we will assign + // the saturation counter to zero anyway. + for (auto& cur : reincrement) + cur->Increment(); + + // Run through calls while unsynchronized: + lk.unlock(); + for (SatCounter* call : callQueue) { + call->GetCall()(call->GetAutoFilter(), *this); + call->remaining = 0; + } + lk.lock(); + } while (!callQueue.empty()); } bool AutoPacket::HasUnsafe(const DecorationKey& key) const { @@ -216,59 +238,47 @@ bool AutoPacket::HasUnsafe(const DecorationKey& key) const { } void AutoPacket::DecorateNoPriors(const AnySharedPointer& ptr, DecorationKey key) { - DecorationDisposition* dispositionA; - DecorationDisposition* dispositionB; + DecorationDisposition* disposition; { std::lock_guard lk(m_lock); - auto transition = [&](DecorationKey& key){ - DecorationDisposition& disposition = m_decorations[key]; - switch (disposition.m_state) { - case DispositionState::Satisfied: - { - std::stringstream ss; - ss << "Cannot decorate this packet with type " << autowiring::demangle(ptr) - << ", the requested decoration is already satisfied"; - throw std::runtime_error(ss.str()); - } - break; - case DispositionState::Unsatisfiable: - case DispositionState::UnsatisfiableNoCall: - { - std::stringstream ss; - ss << "Cannot check out decoration of type " << autowiring::demangle(ptr) - << ", it has been marked unsatisfiable"; - throw std::runtime_error(ss.str()); - } - break; - default: - break; + disposition = &m_decorations[key]; + switch (disposition->m_state) { + case DispositionState::Satisfied: + { + std::stringstream ss; + ss << "Cannot decorate this packet with type " << autowiring::demangle(ptr) + << ", the requested decoration is already satisfied"; + throw std::runtime_error(ss.str()); } + break; + case DispositionState::Unsatisfiable: + { + std::stringstream ss; + ss << "Cannot check out decoration of type " << autowiring::demangle(ptr) + << ", it has been marked unsatisfiable"; + throw std::runtime_error(ss.str()); + } + break; + default: + break; + } - // Decoration attaches here - disposition.m_decorations.push_back(ptr); - return &disposition; - }; - - key.is_shared = false; - dispositionA = transition(key); - key.is_shared = true; - dispositionB = transition(key); + // Decoration attaches here + disposition->m_decorations.push_back(ptr); } // Uniformly advance state: - switch (dispositionA->m_state) { + switch (disposition->m_state) { case DispositionState::Unsatisfied: case DispositionState::PartlySatisfied: // Permit a transition to another state - if (dispositionA->IsPublicationComplete() && dispositionB->IsPublicationComplete()) { - dispositionA->m_state = dispositionB->m_state = DispositionState::Satisfied; - - UpdateSatisfactionUnsafe(std::unique_lock(m_lock), *dispositionA); - UpdateSatisfactionUnsafe(std::unique_lock(m_lock), *dispositionB); + if (disposition->IsPublicationComplete()) { + disposition->m_state = DispositionState::Satisfied; + UpdateSatisfactionUnsafe(std::unique_lock(m_lock), *disposition); } else - dispositionA->m_state = dispositionB->m_state = DispositionState::PartlySatisfied; + disposition->m_state = DispositionState::PartlySatisfied; break; default: // Do nothing, no advancing to any states from here @@ -349,11 +359,8 @@ void AutoPacket::ForwardAll(std::shared_ptr recipient) const { { std::lock_guard lk(m_lock); for (const auto& decoration : m_decorations) - // Only fully satisfied shared decorations are considered for propagation - if ( - decoration.first.is_shared && - decoration.second.m_state == DispositionState::Satisfied - ) + // Only fully satisfied decorations are considered for propagation + if (decoration.second.m_state == DispositionState::Satisfied) dd.push_back(decoration); } diff --git a/src/autowiring/AutoPacketGraph.cpp b/src/autowiring/AutoPacketGraph.cpp index 9d43c4630..99593892d 100644 --- a/src/autowiring/AutoPacketGraph.cpp +++ b/src/autowiring/AutoPacketGraph.cpp @@ -84,22 +84,18 @@ void AutoPacketGraph::AutoFilter(AutoPacket& packet) { auto& decoration = cur.second; auto type = cur.first.ti; - for (auto& publisher : decoration.m_publishers) { - if (!publisher->remaining) { + for (auto& publisher : decoration.m_publishers) + if (!publisher->remaining) RecordDelivery(type, *publisher, false); - } - } for (auto& subscriber : decoration.m_subscribers) { // Skip the AutoPacketGraph - const std::type_info& descType = m_factory->GetContext()->GetAutoTypeId(subscriber->GetAutoFilter()); - if (descType == typeid(AutoPacketGraph)) { + const std::type_info& descType = m_factory->GetContext()->GetAutoTypeId(subscriber.satCounter->GetAutoFilter()); + if (descType == typeid(AutoPacketGraph)) continue; - } - if (subscriber->remaining) { - RecordDelivery(type, *subscriber, true); - } + if (subscriber.satCounter->remaining) + RecordDelivery(type, *subscriber.satCounter, true); } } }); diff --git a/src/autowiring/AutoPacketInternal.cpp b/src/autowiring/AutoPacketInternal.cpp index fd8df444b..2094fa07d 100644 --- a/src/autowiring/AutoPacketInternal.cpp +++ b/src/autowiring/AutoPacketInternal.cpp @@ -47,17 +47,15 @@ void AutoPacketInternal::Initialize(bool isFirstPacket) { call->GetCall()(call->GetAutoFilter(), *this); // First-call indicated by argumument type AutoPacket&: - for (bool is_shared : {false, true}) { - std::unique_lock lk(m_lock); + std::unique_lock lk(m_lock); - // Don't modify the decorations set if nobody expects an AutoPacket input - auto q = m_decorations.find(DecorationKey(typeid(auto_arg::id_type), is_shared, 0)); - if (q == m_decorations.end()) - continue; + // Don't modify the decorations set if nobody expects an AutoPacket input + auto q = m_decorations.find(DecorationKey(typeid(auto_arg::id_type), 0)); + if (q == m_decorations.end()) + return; - q->second.m_state = DispositionState::Satisfied; - UpdateSatisfactionUnsafe(std::move(lk), q->second); - } + q->second.m_state = DispositionState::Satisfied; + UpdateSatisfactionUnsafe(std::move(lk), q->second); } std::shared_ptr AutoPacketInternal::SuccessorInternal(void) { diff --git a/src/autowiring/test/ArgumentTypeTest.cpp b/src/autowiring/test/ArgumentTypeTest.cpp index 020881872..4b19fbd37 100644 --- a/src/autowiring/test/ArgumentTypeTest.cpp +++ b/src/autowiring/test/ArgumentTypeTest.cpp @@ -87,7 +87,7 @@ TEST_F(ArgumentTypeTest, TestAutoIn) { // Deduced Type const auto& arg = t_argShared::arg(*packet); - ASSERT_EQ(2UL, arg.use_count()) << "AutoPacket should store exactly two shared pointer references"; + ASSERT_EQ(1UL, arg.use_count()) << "AutoPacket should store exactly one shared pointer reference to a decorated entry"; } TEST_F(ArgumentTypeTest, TestAutoOut) { diff --git a/src/autowiring/test/AutoFilterDiagnosticsTest.cpp b/src/autowiring/test/AutoFilterDiagnosticsTest.cpp index 315010d61..1bfbd1dc5 100644 --- a/src/autowiring/test/AutoFilterDiagnosticsTest.cpp +++ b/src/autowiring/test/AutoFilterDiagnosticsTest.cpp @@ -28,7 +28,7 @@ TEST_F(AutoFilterDiagnosticsTest, CanGetExpectedTrueType) { auto& disposition = decorations.begin()->second; ASSERT_EQ(1UL, disposition.m_subscribers.size()) << "Expected exactly one subscriber for the sole present type"; - const SatCounter* descriptor = disposition.m_subscribers.front(); + const SatCounter* descriptor = disposition.m_subscribers.front().satCounter; AnySharedPointer asp(descriptor->GetAutoFilter()); // Get more information about this object from the enclosing context: