Skip to content

Commit

Permalink
(SW-8901) send_modifieds without holding context lock (#123)
Browse files Browse the repository at this point in the history
* Avoid keeping context lock in send_modifieds during send operation

* Move send_modifieds calls outside of context locks

* Fix race condition
  • Loading branch information
dskyle committed Oct 12, 2018
1 parent 5feef55 commit 2590756
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 61 deletions.
140 changes: 88 additions & 52 deletions include/madara/knowledge/KnowledgeBaseImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,10 @@ KnowledgeRecord KnowledgeBaseImpl::wait(
"KnowledgeBaseImpl::wait:"
" completed first eval to get %s\n",
last_value.to_string().c_str());

send_modifieds("KnowledgeBaseImpl:wait", settings);
}

send_modifieds("KnowledgeBaseImpl:wait", settings);

// wait for expression to be true
while (!last_value.to_integer() &&
(settings.max_wait_time < 0 || !enforcer.is_done()))
Expand Down Expand Up @@ -297,9 +297,9 @@ KnowledgeRecord KnowledgeBaseImpl::wait(
"KnowledgeBaseImpl::wait:"
" completed eval to get %s\n",
last_value.to_string().c_str());

send_modifieds("KnowledgeBaseImpl:wait", settings);
}

send_modifieds("KnowledgeBaseImpl:wait", settings);
map_.signal();

} // end while (!last)
Expand Down Expand Up @@ -337,11 +337,13 @@ KnowledgeRecord KnowledgeBaseImpl::evaluate(

// lock the context from being updated by any ongoing threads
{
MADARA_GUARD_TYPE guard(map_.mutex_);
{
MADARA_GUARD_TYPE guard(map_.mutex_);

// interpret the current expression and then evaluate it
// tree = interpreter_.interpret (map_, expression);
last_value = ce.expression.evaluate(settings);
// interpret the current expression and then evaluate it
// tree = interpreter_.interpret (map_, expression);
last_value = ce.expression.evaluate(settings);
}

send_modifieds("KnowledgeBaseImpl:evaluate", settings);

Expand Down Expand Up @@ -371,11 +373,13 @@ KnowledgeRecord KnowledgeBaseImpl::evaluate(

// lock the context from being updated by any ongoing threads
{
MADARA_GUARD_TYPE guard(map_.mutex_);
{
MADARA_GUARD_TYPE guard(map_.mutex_);

// interpret the current expression and then evaluate it
// tree = interpreter_.interpret (map_, expression);
last_value = map_.evaluate(root, settings);
// interpret the current expression and then evaluate it
// tree = interpreter_.interpret (map_, expression);
last_value = map_.evaluate(root, settings);
}

send_modifieds("KnowledgeBaseImpl:evaluate", settings);

Expand All @@ -392,60 +396,92 @@ KnowledgeRecord KnowledgeBaseImpl::evaluate(
int KnowledgeBaseImpl::send_modifieds(
const std::string& prefix, const EvalSettings& settings)
{
int result = 0;
if (settings.delay_sending_modifieds)
{
madara_logger_log(map_.get_logger(), logger::LOG_DETAILED,
"%s: user requested to not send modifieds\n", prefix.c_str());

MADARA_GUARD_TYPE map_guard(map_.mutex_);
MADARA_GUARD_TYPE transport_guard(transport_mutex_);
return -3;
}

if (transports_.size() > 0 && !settings.delay_sending_modifieds)
{
KnowledgeMap modified;

// get the modifieds and reset those that will be sent, atomically
{
modified = map_.get_modifieds_current(settings.send_list, true);
}
MADARA_GUARD_TYPE done_sending_guard(done_sending_mutex_);
done_sending_ = false;
}

if (modified.size() > 0)
// Loop until threads stop asking us to repeat, which will occur if they
// try to send while we're sending.
for (;;)
{
// Limit scope of send_guard
{
// send across each transport
for (auto& transport : transports_)
std::unique_lock<MADARA_LOCK_TYPE> send_guard;

// Limit scope of done_sending_guard
{
transport->send_data(modified);
MADARA_GUARD_TYPE done_sending_guard(done_sending_mutex_);

std::unique_lock<MADARA_LOCK_TYPE>
send_guard_tmp(send_mutex_, std::try_to_lock);

if (!send_guard_tmp.owns_lock())
{
// Some other thread is currently doing send_modifieds. Signal it to
// repeat in case there are new updates to send.
done_sending_ = false;
return 0;
}

// If flag is already set, stop looping. Other threads will clear while
// we're in this loop if they fail to take send_mutex_.
if (done_sending_)
{
return 0;
}

send_guard = std::move(send_guard_tmp);

done_sending_ = true;
}
// release done_sending_mutex_

map_.inc_clock(settings);
// We hold send_mutex_ here

if (settings.signal_changes)
map_.signal(false);
}
else
{
madara_logger_log(map_.get_logger(), logger::LOG_DETAILED,
"%s: no modifications to send\n", prefix.c_str());
auto transports = get_transports();

result = -1;
}
}
else
{
if (transports_.size() == 0)
{
madara_logger_log(map_.get_logger(), logger::LOG_DETAILED,
"%s: no transport configured\n", prefix.c_str());
if (transports.size() == 0)
{
madara_logger_log(map_.get_logger(), logger::LOG_DETAILED,
"%s: no transport configured\n", prefix.c_str());

result = -2;
}
else if (settings.delay_sending_modifieds)
{
madara_logger_log(map_.get_logger(), logger::LOG_DETAILED,
"%s: user requested to not send modifieds\n", prefix.c_str());
return -2;
}

// get the modifieds and reset those that will be sent, atomically
auto modified = map_.get_modifieds_current(settings.send_list, true);

if (modified.size() == 0)
{
madara_logger_log(map_.get_logger(), logger::LOG_DETAILED,
"%s: no modifications to send\n", prefix.c_str());

result = -3;
return -1;
}

// send across each transport
for (auto& transport : transports)
{
transport->send_data(modified);
}
}
}
// Released send_mutex_

return result;
map_.inc_clock(settings);

if (settings.signal_changes)
map_.signal(false);
}
}

}
}
16 changes: 15 additions & 1 deletion include/madara/knowledge/KnowledgeBaseImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include <ostream>
#include <vector>
#include <atomic>

#include "madara/knowledge/CompiledExpression.h"
#include "madara/knowledge/WaitSettings.h"
Expand Down Expand Up @@ -1133,8 +1134,21 @@ class KnowledgeBaseImpl
transport::QoSTransportSettings settings_;

mutable MADARA_LOCK_TYPE transport_mutex_;
mutable MADARA_LOCK_TYPE send_mutex_;
mutable MADARA_LOCK_TYPE done_sending_mutex_;

std::vector<std::unique_ptr<transport::Base>> transports_;
std::vector<std::shared_ptr<transport::Base>> transports_;

/**
* Atomically retrieve the set of transports
**/
std::vector<std::shared_ptr<transport::Base>> get_transports()
{
MADARA_GUARD_TYPE transport_guard(transport_mutex_);
return transports_;
}

bool done_sending_ = false;
};
}
}
Expand Down
10 changes: 2 additions & 8 deletions include/madara/knowledge/KnowledgeBaseImpl.inl
Original file line number Diff line number Diff line change
Expand Up @@ -136,17 +136,11 @@ inline bool KnowledgeBaseImpl::exists(const VariableReference& variable,
**/
inline int KnowledgeBaseImpl::apply_modified(const EvalSettings& settings)
{
// lock the context and apply modified flags and current clock to
// all global variables
MADARA_GUARD_TYPE guard(map_.mutex_);

map_.apply_modified();

int ret = 0;

send_modifieds("KnowledgeBaseImpl:apply_modified", settings);

return ret;
return 0;
}

inline void KnowledgeBaseImpl::mark_modified(
Expand Down Expand Up @@ -407,7 +401,7 @@ inline size_t KnowledgeBaseImpl::get_num_transports(void)

inline size_t KnowledgeBaseImpl::remove_transport(size_t index)
{
std::unique_ptr<transport::Base> transport;
std::shared_ptr<transport::Base> transport;
size_t size = 0;
{
MADARA_GUARD_TYPE guard(transport_mutex_);
Expand Down

0 comments on commit 2590756

Please sign in to comment.