Skip to content

Commit

Permalink
Simplify request aggregator mutex lock behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
guilhermelawless committed Mar 2, 2020
1 parent 5dac531 commit 9c32d70
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 24 deletions.
32 changes: 11 additions & 21 deletions nano/node/request_aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ nano::request_aggregator::request_aggregator (nano::network_constants const & ne
max_delay (network_constants_a.is_test_network () ? 50 : 300),
small_delay (network_constants_a.is_test_network () ? 10 : 50),
max_channel_requests (config_a.max_queued_requests),
max_consecutive_requests (network_constants_a.is_test_network () ? 1 : 10),
stats (stats_a),
votes_cache (cache_a),
store (store_a),
Expand Down Expand Up @@ -68,7 +67,6 @@ void nano::request_aggregator::run ()
lock.unlock ();
condition.notify_all ();
lock.lock ();
unsigned consecutive_requests = 0;
while (!stopped)
{
if (!requests.empty ())
Expand All @@ -85,31 +83,28 @@ void nano::request_aggregator::run ()
auto channel = pool.channel;
// Safely erase this pool
requests_by_deadline.erase (front);
if (!remaining.empty ())
lock.unlock ();
// Send cached votes
for (auto const & vote : remaining.first)
{
lock.unlock ();
// Generate votes for the remaining hashes
generate (transaction, std::move (remaining), channel);
consecutive_requests = 0;
lock.lock ();
nano::confirm_ack confirm (vote);
channel->send (confirm);
}
else if (++consecutive_requests == max_consecutive_requests)
if (!remaining.second.empty ())
{
lock.unlock ();
consecutive_requests = 0;
lock.lock ();
// Generate votes for the remaining hashes
generate (transaction, std::move (remaining.second), channel);
}
lock.lock ();
}
else
{
consecutive_requests = 0;
auto deadline = front->deadline;
condition.wait_until (lock, deadline, [this, &deadline]() { return this->stopped || deadline < std::chrono::steady_clock::now (); });
}
}
else
{
consecutive_requests = 0;
condition.wait_for (lock, small_delay, [this]() { return this->stopped || !this->requests.empty (); });
}
}
Expand Down Expand Up @@ -139,7 +134,7 @@ bool nano::request_aggregator::empty ()
return size () == 0;
}

std::vector<nano::block_hash> nano::request_aggregator::aggregate (nano::transaction const & transaction_a, channel_pool & pool_a) const
std::pair<std::vector<std::shared_ptr<nano::vote>>, std::vector<nano::block_hash>> nano::request_aggregator::aggregate (nano::transaction const & transaction_a, channel_pool & pool_a) const
{
// Unique hashes
using pair = decltype (pool_a.hashes_roots)::value_type;
Expand Down Expand Up @@ -205,14 +200,9 @@ std::vector<nano::block_hash> nano::request_aggregator::aggregate (nano::transac
// Unique votes
std::sort (cached_votes.begin (), cached_votes.end ());
cached_votes.erase (std::unique (cached_votes.begin (), cached_votes.end ()), cached_votes.end ());
for (auto const & vote : cached_votes)
{
nano::confirm_ack confirm (vote);
pool_a.channel->send (confirm);
}
stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached_hashes, stat::dir::in, cached_hashes);
stats.add (nano::stat::type::requests, nano::stat::detail::requests_cached_votes, stat::dir::in, cached_votes.size ());
return to_generate;
return { cached_votes, to_generate };
}

void nano::request_aggregator::generate (nano::transaction const & transaction_a, std::vector<nano::block_hash> hashes_a, std::shared_ptr<nano::transport::channel> & channel_a) const
Expand Down
4 changes: 1 addition & 3 deletions nano/node/request_aggregator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,10 @@ class request_aggregator final
private:
void run ();
/** Aggregate and send cached votes for \p pool_a, returning the leftovers that were not found in cached votes **/
std::vector<nano::block_hash> aggregate (nano::transaction const &, channel_pool & pool_a) const;
std::pair<std::vector<std::shared_ptr<nano::vote>>, std::vector<nano::block_hash>> aggregate (nano::transaction const &, channel_pool & pool_a) const;
/** Generate and send votes from \p hashes_a to \p channel_a, does not need a lock on the mutex **/
void generate (nano::transaction const &, std::vector<nano::block_hash> hashes_a, std::shared_ptr<nano::transport::channel> & channel_a) const;

unsigned const max_consecutive_requests;

nano::stat & stats;
nano::votes_cache & votes_cache;
nano::block_store & store;
Expand Down

0 comments on commit 9c32d70

Please sign in to comment.