Skip to content

Commit

Permalink
Pessimisation fix for condition_variable notify_* (#1461)
Browse files Browse the repository at this point in the history
* Pessimisation fix done for wallet

* Trivial pessimisation fixes done for bootstrap

* All pessimisation done for voting

* Pessimisation done for work

* Trivial pessimisation fixes done for node

* Finished pessimisation for node

* Pessimisation for processor_service tests

* Fixed indentation for CI code linter
  • Loading branch information
termoose authored and clemahieu committed Dec 15, 2018
1 parent ec8b44e commit 1a3ce38
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 69 deletions.
12 changes: 8 additions & 4 deletions rai/core_test/processor_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ TEST (alarm, one)
std::mutex mutex;
std::condition_variable condition;
alarm.add (std::chrono::steady_clock::now (), [&]() {
std::lock_guard<std::mutex> lock (mutex);
done = true;
{
std::lock_guard<std::mutex> lock (mutex);
done = true;
}
condition.notify_one ();
});
boost::asio::io_service::work work (service);
Expand All @@ -76,8 +78,10 @@ TEST (alarm, many)
for (auto i (0); i < 50; ++i)
{
alarm.add (std::chrono::steady_clock::now (), [&]() {
std::lock_guard<std::mutex> lock (mutex);
count += 1;
{
std::lock_guard<std::mutex> lock (mutex);
count += 1;
}
condition.notify_one ();
});
}
Expand Down
12 changes: 8 additions & 4 deletions rai/lib/work.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,10 @@ void rai::work_pool::cancel (rai::uint256_union const & root_a)

void rai::work_pool::stop ()
{
std::lock_guard<std::mutex> lock (mutex);
done = true;
{
std::lock_guard<std::mutex> lock (mutex);
done = true;
}
producer_condition.notify_all ();
}

Expand All @@ -169,8 +171,10 @@ void rai::work_pool::generate (rai::uint256_union const & root_a, std::function<
}
if (!result)
{
std::lock_guard<std::mutex> lock (mutex);
pending.push_back ({ root_a, callback_a, difficulty_a });
{
std::lock_guard<std::mutex> lock (mutex);
pending.push_back ({ root_a, callback_a, difficulty_a });
}
producer_condition.notify_all ();
}
else
Expand Down
64 changes: 38 additions & 26 deletions rai/node/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,10 @@ rai::bulk_pull_client::~bulk_pull_client ()
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Bulk pull end block is not expected %1% for account %2%") % pull.end.to_string () % pull.account.to_account ());
}
}
std::lock_guard<std::mutex> mutex (connection->attempt->mutex);
--connection->attempt->pulling;
{
std::lock_guard<std::mutex> mutex (connection->attempt->mutex);
--connection->attempt->pulling;
}
connection->attempt->condition.notify_all ();
}

Expand Down Expand Up @@ -1108,8 +1110,10 @@ void rai::bootstrap_attempt::add_connection (rai::endpoint const & endpoint_a)

void rai::bootstrap_attempt::pool_connection (std::shared_ptr<rai::bootstrap_client> client_a)
{
std::lock_guard<std::mutex> lock (mutex);
idle.push_front (client_a);
{
std::lock_guard<std::mutex> lock (mutex);
idle.push_front (client_a);
}
condition.notify_all ();
}

Expand Down Expand Up @@ -1149,8 +1153,10 @@ void rai::bootstrap_attempt::stop ()

void rai::bootstrap_attempt::add_pull (rai::pull_info const & pull)
{
std::lock_guard<std::mutex> lock (mutex);
pulls.push_back (pull);
{
std::lock_guard<std::mutex> lock (mutex);
pulls.push_back (pull);
}
condition.notify_all ();
}

Expand All @@ -1165,10 +1171,12 @@ void rai::bootstrap_attempt::requeue_pull (rai::pull_info const & pull_a)
}
else if (lazy_mode)
{
// Retry for lazy pulls (not weak state block link assumptions)
std::lock_guard<std::mutex> lock (mutex);
pull.attempts++;
pulls.push_back (pull);
{
// Retry for lazy pulls (not weak state block link assumptions)
std::lock_guard<std::mutex> lock (mutex);
pull.attempts++;
pulls.push_back (pull);
}
condition.notify_all ();
}
else
Expand Down Expand Up @@ -1521,22 +1529,24 @@ void rai::bootstrap_initiator::bootstrap (rai::endpoint const & endpoint_a, bool

void rai::bootstrap_initiator::bootstrap_lazy (rai::block_hash const & hash_a, bool force)
{
std::unique_lock<std::mutex> lock (mutex);
if (force)
{
while (attempt != nullptr)
std::unique_lock<std::mutex> lock (mutex);
if (force)
{
attempt->stop ();
condition.wait (lock);
while (attempt != nullptr)
{
attempt->stop ();
condition.wait (lock);
}
}
node.stats.inc (rai::stat::type::bootstrap, rai::stat::detail::initiate_lazy, rai::stat::dir::out);
if (attempt == nullptr)
{
attempt = std::make_shared<rai::bootstrap_attempt> (node.shared ());
attempt->lazy_mode = true;
}
attempt->lazy_start (hash_a);
}
node.stats.inc (rai::stat::type::bootstrap, rai::stat::detail::initiate_lazy, rai::stat::dir::out);
if (attempt == nullptr)
{
attempt = std::make_shared<rai::bootstrap_attempt> (node.shared ());
attempt->lazy_mode = true;
}
attempt->lazy_start (hash_a);
condition.notify_all ();
}

Expand Down Expand Up @@ -1586,11 +1596,13 @@ std::shared_ptr<rai::bootstrap_attempt> rai::bootstrap_initiator::current_attemp

void rai::bootstrap_initiator::stop ()
{
std::unique_lock<std::mutex> lock (mutex);
stopped = true;
if (attempt != nullptr)
{
attempt->stop ();
std::unique_lock<std::mutex> lock (mutex);
stopped = true;
if (attempt != nullptr)
{
attempt->stop ();
}
}
condition.notify_all ();
}
Expand Down
98 changes: 68 additions & 30 deletions rai/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -755,8 +755,10 @@ void rai::alarm::run ()

void rai::alarm::add (std::chrono::steady_clock::time_point const & wakeup_a, std::function<void()> const & operation)
{
std::lock_guard<std::mutex> lock (mutex);
operations.push (rai::operation ({ wakeup_a, operation }));
{
std::lock_guard<std::mutex> lock (mutex);
operations.push (rai::operation ({ wakeup_a, operation }));
}
condition.notify_all ();
}

Expand Down Expand Up @@ -798,7 +800,11 @@ void rai::vote_processor::process_loop ()

std::unique_lock<std::mutex> lock (mutex);
started = true;

lock.unlock ();
condition.notify_all ();
lock.lock ();

while (!stopped)
{
if (!votes.empty ())
Expand Down Expand Up @@ -837,7 +843,10 @@ void rai::vote_processor::process_loop ()
}
lock.lock ();
active = false;

lock.unlock ();
condition.notify_all ();
lock.lock ();

if (log_this_iteration)
{
Expand Down Expand Up @@ -867,7 +876,7 @@ void rai::vote_processor::process_loop ()
void rai::vote_processor::vote (std::shared_ptr<rai::vote> vote_a, rai::endpoint endpoint_a)
{
assert (endpoint_a.address ().is_v6 ());
std::lock_guard<std::mutex> lock (mutex);
std::unique_lock<std::mutex> lock (mutex);
if (!stopped)
{
bool process (false);
Expand Down Expand Up @@ -905,7 +914,10 @@ void rai::vote_processor::vote (std::shared_ptr<rai::vote> vote_a, rai::endpoint
if (process)
{
votes.push_back (std::make_pair (vote_a, endpoint_a));

lock.unlock ();
condition.notify_all ();
lock.lock ();
}
else
{
Expand Down Expand Up @@ -1018,8 +1030,8 @@ void rai::vote_processor::stop ()
{
std::lock_guard<std::mutex> lock (mutex);
stopped = true;
condition.notify_all ();
}
condition.notify_all ();
if (thread.joinable ())
{
thread.join ();
Expand Down Expand Up @@ -1102,8 +1114,10 @@ rai::signature_checker::~signature_checker ()

void rai::signature_checker::add (rai::signature_check_set & check_a)
{
std::lock_guard<std::mutex> lock (mutex);
checks.push_back (check_a);
{
std::lock_guard<std::mutex> lock (mutex);
checks.push_back (check_a);
}
condition.notify_all ();
}

Expand Down Expand Up @@ -1143,7 +1157,11 @@ void rai::signature_checker::run ()
rai::thread_role::set (rai::thread_role::name::signature_checking);
std::unique_lock<std::mutex> lock (mutex);
started = true;

lock.unlock ();
condition.notify_all ();
lock.lock ();

while (!stopped)
{
if (!checks.empty ())
Expand All @@ -1152,8 +1170,8 @@ void rai::signature_checker::run ()
checks.pop_front ();
lock.unlock ();
verify (check);
lock.lock ();
condition.notify_all ();
lock.lock ();
}
else
{
Expand All @@ -1179,8 +1197,10 @@ rai::block_processor::~block_processor ()
void rai::block_processor::stop ()
{
generator.stop ();
std::lock_guard<std::mutex> lock (mutex);
stopped = true;
{
std::lock_guard<std::mutex> lock (mutex);
stopped = true;
}
condition.notify_all ();
}

Expand All @@ -1204,16 +1224,18 @@ void rai::block_processor::add (std::shared_ptr<rai::block> block_a, std::chrono
{
if (!rai::work_validate (block_a->root (), block_a->block_work ()))
{
std::lock_guard<std::mutex> lock (mutex);
if (blocks_hashes.find (block_a->hash ()) == blocks_hashes.end ())
{
if (block_a->type () == rai::block_type::state && !node.ledger.is_epoch_link (block_a->link ()))
{
state_blocks.push_back (std::make_pair (block_a, origination));
}
else
std::lock_guard<std::mutex> lock (mutex);
if (blocks_hashes.find (block_a->hash ()) == blocks_hashes.end ())
{
blocks.push_back (std::make_pair (block_a, origination));
if (block_a->type () == rai::block_type::state && !node.ledger.is_epoch_link (block_a->link ()))
{
state_blocks.push_back (std::make_pair (block_a, origination));
}
else
{
blocks.push_back (std::make_pair (block_a, origination));
}
}
condition.notify_all ();
}
Expand All @@ -1227,8 +1249,10 @@ void rai::block_processor::add (std::shared_ptr<rai::block> block_a, std::chrono

void rai::block_processor::force (std::shared_ptr<rai::block> block_a)
{
std::lock_guard<std::mutex> lock (mutex);
forced.push_back (block_a);
{
std::lock_guard<std::mutex> lock (mutex);
forced.push_back (block_a);
}
condition.notify_all ();
}

Expand All @@ -1247,7 +1271,10 @@ void rai::block_processor::process_blocks ()
}
else
{
lock.unlock ();
condition.notify_all ();
lock.lock ();

condition.wait (lock);
}
}
Expand Down Expand Up @@ -3298,7 +3325,11 @@ void rai::active_transactions::announce_loop ()
{
std::unique_lock<std::mutex> lock (mutex);
started = true;

lock.unlock ();
condition.notify_all ();
lock.lock ();

while (!stopped)
{
announce_votes (lock);
Expand All @@ -3309,14 +3340,15 @@ void rai::active_transactions::announce_loop ()

void rai::active_transactions::stop ()
{
std::unique_lock<std::mutex> lock (mutex);
while (!started)
{
condition.wait (lock);
std::unique_lock<std::mutex> lock (mutex);
while (!started)
{
condition.wait (lock);
}
stopped = true;
}
stopped = true;
condition.notify_all ();
lock.unlock ();
if (thread.joinable ())
{
thread.join ();
Expand Down Expand Up @@ -3594,8 +3626,10 @@ rai::udp_data * rai::udp_buffer::allocate ()
void rai::udp_buffer::enqueue (rai::udp_data * data_a)
{
assert (data_a != nullptr);
std::lock_guard<std::mutex> lock (mutex);
full.push_back (data_a);
{
std::lock_guard<std::mutex> lock (mutex);
full.push_back (data_a);
}
condition.notify_one ();
}
rai::udp_data * rai::udp_buffer::dequeue ()
Expand All @@ -3616,13 +3650,17 @@ rai::udp_data * rai::udp_buffer::dequeue ()
void rai::udp_buffer::release (rai::udp_data * data_a)
{
assert (data_a != nullptr);
std::lock_guard<std::mutex> lock (mutex);
free.push_back (data_a);
{
std::lock_guard<std::mutex> lock (mutex);
free.push_back (data_a);
}
condition.notify_one ();
}
void rai::udp_buffer::stop ()
{
std::lock_guard<std::mutex> lock (mutex);
stopped = true;
{
std::lock_guard<std::mutex> lock (mutex);
stopped = true;
}
condition.notify_all ();
}
Loading

0 comments on commit 1a3ce38

Please sign in to comment.