Skip to content

Commit

Permalink
Merge pull request #560 from leapmotion/bug-multirecursive
Browse files Browse the repository at this point in the history
Fix multidecorate filter unsatisfiability case
  • Loading branch information
gtremper committed Jun 12, 2015
2 parents 693bfd8 + 92bc770 commit 52d0369
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 81 deletions.
8 changes: 4 additions & 4 deletions autowiring/AutoPacket.h
Expand Up @@ -105,7 +105,7 @@ class AutoPacket:
/// This method results in a call to the AutoFilter method on any subscribers which are
/// satisfied by this decoration. This method must be called with m_lock held.
/// </remarks>
void UpdateSatisfactionUnsafe(std::unique_lock<std::mutex>&& lk, const DecorationDisposition& disposition);
void UpdateSatisfactionUnsafe(std::unique_lock<std::mutex> lk, const DecorationDisposition& disposition);

/// <summary>
/// Performs a "satisfaction pulse", which will avoid notifying any deferred filters
Expand Down Expand Up @@ -177,7 +177,7 @@ class AutoPacket:
t_decorationMap GetDecorations(void) const;

/// <returns>
/// True if this packet posesses a decoration of the specified type
/// True if this packet posesses one or more instances of a decoration of the specified type
/// </returns>
/// <remarks>
/// Although "AutoPacket &" and "const AutoPacket&" argument types will be
Expand Down Expand Up @@ -283,7 +283,7 @@ class AutoPacket:
bool Get(std::shared_ptr<const T>& out, int tshift = 0) const {
std::lock_guard<std::mutex> lk(m_lock);
auto deco = m_decorations.find(DecorationKey(auto_id<T>::key(), true, tshift));
if(deco != m_decorations.end() && deco->second.m_state == DispositionState::Satisfied) {
if(deco != m_decorations.end() && deco->second.m_state == DispositionState::Complete) {
auto& disposition = deco->second;
if(disposition.m_decorations.size() == 1) {
out = disposition.m_decorations[0]->as_unsafe<T>();
Expand Down Expand Up @@ -444,7 +444,7 @@ class AutoPacket:
// IMPORTANT: isCheckedOut = true prevents subsequent decorations of this type
for(DecorationDisposition* pEntry : pTypeSubs) {
pEntry->m_pImmediate = nullptr;
pEntry->m_state = DispositionState::Unsatisfiable;
pEntry->m_state = DispositionState::Complete;
}

// Now trigger a rescan to hit any deferred, unsatisfiable entries:
Expand Down
47 changes: 30 additions & 17 deletions autowiring/DecorationDisposition.h
Expand Up @@ -50,12 +50,8 @@ enum class DispositionState {
// Some decorations present, but not all of them. Cannot proceed.
PartlySatisfied,

// Everything attached, ready to go
Satisfied,

// This decoration will never be satisfied. Calls are generated with a null
// shared pointer passed as the value.
Unsatisfiable
// All publishers on this decoration have been run or cannot be executed
Complete
};

/// <remarks>
Expand All @@ -73,7 +69,11 @@ struct DecorationDisposition
m_state(source.m_state)
{}

// The decoration proper--potentially, this decoration might be from a prior execution of this
// The number of producers of this decoration type which have concluded. This number may be larger
// than the number of attached decorations if some producers could not run.
size_t m_nProducersRun = 0;

// The decorations proper--potentially, these decorations might be from a prior execution of this
// packet. In the case of immediate decorations, this value will be invalid.
// Valid if and only if is_shared is false.
std::vector<AnySharedPointer> m_decorations;
Expand All @@ -89,17 +89,32 @@ struct DecorationDisposition

// Satisfaction counters
struct Subscriber {
Subscriber(void) {}
enum class Type {
// Ordinary, reference-in, non-optional subscriber
Normal,

// Optional. If this flag is set, it indicates that the referenced AutoFilter could still be
// called even if the decoration is not attached to the packet--IE, the AutoFilter accepts a
// shared pointer or an array
Optional,

Subscriber(bool is_optional, SatCounter* satCounter):
is_optional{is_optional},
// Abides by all of the rules of Optional, but additionally can be called when multiple
// instances of a decoration are present.
Multi
};

Subscriber(void) {}
Subscriber(bool is_shared, Type type, SatCounter* satCounter) :
is_shared{is_shared},
type{type},
satCounter{satCounter}
{}

// Optional flag. If this flag is set, it indicates that the referenced AutoFilter could still
// be called even if the decoration is not attached to the packet--IE, the AutoFilter accepts a
// shared pointer or an array
bool is_optional = false;
// True if a shared pointer will be taken, false otherwise
bool is_shared = false;

// The relationship between this subscriber and the provided decoration
Type type = Type::Normal;

// The satisfaction counter itself
SatCounter* satCounter = nullptr;
Expand All @@ -118,9 +133,7 @@ struct DecorationDisposition
/// and at least one decoration has been attached to this disposition.
/// </remarks>
bool IsPublicationComplete(void) const {
return
!m_decorations.empty() &&
m_decorations.size() >= m_publishers.size();
return !m_decorations.empty() && m_nProducersRun >= m_publishers.size();
}

void Reset(void) {
Expand Down
146 changes: 98 additions & 48 deletions src/autowiring/AutoPacket.cpp
Expand Up @@ -29,7 +29,7 @@ AutoPacket::~AutoPacket(void) {
// Mark decorations of successor packets that use decorations
// originating from this packet as unsatisfiable
for (auto& pair : m_decorations)
if (!pair.first.tshift && pair.second.m_state != DispositionState::Satisfied)
if (!pair.first.tshift && pair.second.m_state != DispositionState::Complete)
MarkSuccessorsUnsatisfiable(DecorationKey(*pair.first.ti, 0));

// Needed for the AutoPacketGraph
Expand Down Expand Up @@ -70,7 +70,7 @@ DecorationDisposition& AutoPacket::DecorateImmediateUnsafe(const DecorationKey&
}

// Mark the entry as appropriate:
dec.m_state = DispositionState::Satisfied;
dec.m_state = DispositionState::Complete;
dec.m_pImmediate = pvImmed;
return dec;
}
Expand All @@ -88,10 +88,17 @@ void AutoPacket::AddSatCounterUnsafe(SatCounter& satCounter) {
throw std::runtime_error(ss.str());
}

entry.m_subscribers.push_back({pCur->is_shared, &satCounter});
entry.m_subscribers.push_back({
pCur->is_shared,
pCur->is_multi ?
DecorationDisposition::Subscriber::Type::Multi :
pCur->is_shared ?
DecorationDisposition::Subscriber::Type::Optional :
DecorationDisposition::Subscriber::Type::Normal,
&satCounter
});
switch (entry.m_state) {
case DispositionState::Satisfied:
case DispositionState::Unsatisfiable:
case DispositionState::Complete:
satCounter.Decrement();
break;
default:
Expand All @@ -101,12 +108,13 @@ void AutoPacket::AddSatCounterUnsafe(SatCounter& satCounter) {
if (pCur->is_output) {
if(!entry.m_publishers.empty())
for (const auto& subscriber : entry.m_subscribers)
for(auto pOther = subscriber.satCounter->GetAutoFilterArguments(); *pOther; pOther++)
if (!pOther->is_multi) {
for (auto pOther = subscriber.satCounter->GetAutoFilterArguments(); *pOther; pOther++) {
if (*pOther->ti == *pCur->ti && !pOther->is_multi) {
std::stringstream ss;
ss << "Added identical data broadcasts of type " << autowiring::demangle(pCur->ti) << " with existing subscriber.";
throw std::runtime_error(ss.str());
}
}

entry.m_publishers.push_back(&satCounter);
}
Expand All @@ -122,11 +130,10 @@ void AutoPacket::MarkUnsatisfiable(const DecorationKey& key) {
std::unique_lock<std::mutex> lk(m_lock);
auto& entry = m_decorations[key];

if (entry.m_state == DispositionState::PartlySatisfied || entry.m_state == DispositionState::Satisfied)
throw std::runtime_error("Cannot mark a decoration as unsatisfiable when that decoration is already present on this packet");

// Mark the entry as permanently unsatisfiable:
entry.m_state = DispositionState::Unsatisfiable;
// Clear all decorations and pointers attached here
entry.m_state = DispositionState::Complete;
entry.m_decorations.clear();
entry.m_pImmediate = nullptr;

// Notify all consumers
UpdateSatisfactionUnsafe(std::move(lk), entry);
Expand All @@ -148,33 +155,78 @@ void AutoPacket::MarkSuccessorsUnsatisfiable(DecorationKey key) {
}
}

void AutoPacket::UpdateSatisfactionUnsafe(std::unique_lock<std::mutex>&& lk, const DecorationDisposition& disposition) {
void AutoPacket::UpdateSatisfactionUnsafe(std::unique_lock<std::mutex> lk, const DecorationDisposition& disposition) {
// Update satisfaction inside of lock
if (disposition.m_state != DispositionState::Complete)
// Nothing to do yet
return;

// Any filter that who can take this decoration as an optional input should be called
std::vector<SatCounter*> callQueue;

// Update satisfaction inside of lock
switch (disposition.m_state) {
case DispositionState::Unsatisfiable:
// Call only optional subscribers
for (const auto& subscriber : disposition.m_subscribers)
if (subscriber.is_optional && subscriber.satCounter->Decrement())
callQueue.push_back(subscriber.satCounter);
// Recursively mark unsatisfiable any single-output arguments on these subscribers:
std::vector<const AutoFilterArgument*> unsatOutputArgs;

switch (disposition.m_decorations.size()) {
case 0:
// No decorations here whatsoever.
// Subscribers that cannot be invoked should have their outputs recursively marked unsatisfiable.
// Subscribers that can be invoked should be.
for (auto subscriber : disposition.m_subscribers) {
auto* satCounter = subscriber.satCounter;
switch (subscriber.type) {
case DecorationDisposition::Subscriber::Type::Multi:
case DecorationDisposition::Subscriber::Type::Optional:
// Optional, we will just generate a call to this subscriber, if possible:
if (satCounter->Decrement())
callQueue.push_back(satCounter);
break;
case DecorationDisposition::Subscriber::Type::Normal:
{
// Non-optional, consider outputs and recursively invalidate
const auto* args = satCounter->GetAutoFilterArguments();
for (size_t i = satCounter->GetArity(); i--;)
// Only consider output arguments:
if (args[i].is_output)
// This output is transitively unsatisfiable, include it for later removal
unsatOutputArgs.push_back(&args[i]);
}
break;
}
}
break;
case DispositionState::Satisfied:
// Identical to the unsatisfiable case, except we don't need to test subscribers to see
// if their inputs are considered optional
for (const auto& subscriber : disposition.m_subscribers)
if (subscriber.satCounter->Decrement())
callQueue.push_back(subscriber.satCounter);
case 1:
// One unique decoration available. We should be able to call everyone.
for (auto subscriber : disposition.m_subscribers) {
auto* satCounter = subscriber.satCounter;
if (satCounter->Decrement())
callQueue.push_back(satCounter);
}
break;
default:
// Nothing to do
return;
// Multiple decorations. Single-input types should never be encountered, but if they are,
// we can't call them. Always call multi-input entries.
for (auto subscriber : disposition.m_subscribers) {
if (subscriber.type == DecorationDisposition::Subscriber::Type::Multi)
callQueue.push_back(subscriber.satCounter);
}
break;
}

// Make calls outside of lock, to avoid deadlock from decorations in methods
lk.unlock();

// Generate all calls
for (SatCounter* call : callQueue)
call->GetCall()(call->GetAutoFilter(), *this);

// Mark all unsatisfiable output types
for (auto unsatOutputArg : unsatOutputArgs) {
// One more producer run, even though we couldn't attach any new decorations
auto& entry = m_decorations[DecorationKey{*unsatOutputArg->ti, 0}];
entry.m_nProducersRun++;

// Now recurse on this entry
UpdateSatisfactionUnsafe(std::unique_lock<std::mutex>{m_lock}, entry);
}
}

void AutoPacket::PulseSatisfactionUnsafe(std::unique_lock<std::mutex> lk, DecorationDisposition* pTypeSubs[], size_t nInfos) {
Expand All @@ -193,8 +245,8 @@ void AutoPacket::PulseSatisfactionUnsafe(std::unique_lock<std::mutex> lk, Decora
for (const auto& cur : pTypeSubs[i]->m_subscribers) {
SatCounter* satCounter = cur.satCounter;
if (
// Shared pointer inputs can't receive a non-null input:
!cur.is_optional &&
// Require that this counter not need a shared pointer, because we can't provide one
!cur.is_shared &&

// We only care about sat counters that aren't deferred--skip everyone else
// Deferred calls will be too late.
Expand Down Expand Up @@ -234,7 +286,7 @@ bool AutoPacket::HasUnsafe(const DecorationKey& key) const {
auto q = m_decorations.find(key);
if(q == m_decorations.end())
return false;
return q->second.m_state == DispositionState::Satisfied;
return !q->second.m_decorations.empty();
}

void AutoPacket::DecorateNoPriors(const AnySharedPointer& ptr, DecorationKey key) {
Expand All @@ -244,19 +296,16 @@ void AutoPacket::DecorateNoPriors(const AnySharedPointer& ptr, DecorationKey key

disposition = &m_decorations[key];
switch (disposition->m_state) {
case DispositionState::Satisfied:
{
std::stringstream ss;
ss << "Cannot decorate this packet with type " << autowiring::demangle(ptr)
<< ", the requested decoration is already satisfied";
throw std::runtime_error(ss.str());
}
break;
case DispositionState::Unsatisfiable:
case DispositionState::Complete:
{
std::stringstream ss;
ss << "Cannot check out decoration of type " << autowiring::demangle(ptr)
<< ", it has been marked unsatisfiable";
if (disposition->m_decorations.empty())
// Completed with no decorations, unsatisfiable
ss << "Cannot check out decoration of type " << autowiring::demangle(ptr)
<< ", it has been marked unsatisfiable";
else
ss << "Cannot decorate this packet with type " << autowiring::demangle(ptr)
<< ", the requested decoration is already satisfied";
throw std::runtime_error(ss.str());
}
break;
Expand All @@ -266,6 +315,7 @@ void AutoPacket::DecorateNoPriors(const AnySharedPointer& ptr, DecorationKey key

// Decoration attaches here
disposition->m_decorations.push_back(ptr);
disposition->m_nProducersRun++;
}

// Uniformly advance state:
Expand All @@ -274,7 +324,7 @@ void AutoPacket::DecorateNoPriors(const AnySharedPointer& ptr, DecorationKey key
case DispositionState::PartlySatisfied:
// Permit a transition to another state
if (disposition->IsPublicationComplete()) {
disposition->m_state = DispositionState::Satisfied;
disposition->m_state = DispositionState::Complete;
UpdateSatisfactionUnsafe(std::unique_lock<std::mutex>(m_lock), *disposition);
}
else
Expand Down Expand Up @@ -308,7 +358,7 @@ const DecorationDisposition* AutoPacket::GetDisposition(const DecorationKey& key
std::lock_guard<std::mutex> lk(m_lock);

auto q = m_decorations.find(key);
if (q != m_decorations.end() && q->second.m_state == DispositionState::Satisfied)
if (q != m_decorations.end() && q->second.m_state == DispositionState::Complete)
return &q->second;

return nullptr;
Expand Down Expand Up @@ -359,8 +409,8 @@ void AutoPacket::ForwardAll(std::shared_ptr<AutoPacket> recipient) const {
{
std::lock_guard<std::mutex> lk(m_lock);
for (const auto& decoration : m_decorations)
// Only fully satisfied decorations are considered for propagation
if (decoration.second.m_state == DispositionState::Satisfied)
// Only fully complete decorations are considered for propagation
if (decoration.second.m_state == DispositionState::Complete)
dd.push_back(decoration);
}

Expand Down

0 comments on commit 52d0369

Please sign in to comment.