Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offload vote cache lookup to a separate thread #4631

Merged
merged 1 commit into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ enum class type
telemetry,
vote_generator,
vote_cache,
vote_cache_processor,
hinting,
blockprocessor,
blockprocessor_source,
Expand Down Expand Up @@ -112,6 +113,7 @@ enum class detail
cache,
rebroadcast,
queue_overflow,
triggered,

// processing queue
queue,
Expand Down
3 changes: 3 additions & 0 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::vote_processing:
thread_role_name_string = "Vote processing";
break;
case nano::thread_role::name::vote_cache_processing:
thread_role_name_string = "Vote cache proc";
break;
case nano::thread_role::name::block_processing:
thread_role_name_string = "Blck processing";
break;
Expand Down
1 change: 1 addition & 0 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ enum class name
work,
message_processing,
vote_processing,
vote_cache_processing,
block_processing,
request_loop,
wallet_actions,
Expand Down
4 changes: 2 additions & 2 deletions nano/node/active_elections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ nano::election_insertion_result nano::active_elections::insert (std::shared_ptr<
{
debug_assert (result.election);

node.vote_router.trigger_vote_cache (hash);
node.vote_cache_processor.trigger (hash);
node.observers.active_started.notify (hash);
vacancy_update ();
}
Expand Down Expand Up @@ -507,7 +507,7 @@ bool nano::active_elections::publish (std::shared_ptr<nano::block> const & block
node.vote_router.connect (block_a->hash (), election);
lock.unlock ();

node.vote_router.trigger_vote_cache (block_a->hash ());
node.vote_cache_processor.trigger (block_a->hash ());

node.stats.inc (nano::stat::type::active, nano::stat::detail::election_block_conflict);
}
Expand Down
10 changes: 10 additions & 0 deletions nano/node/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,20 @@ class ledger;
class local_vote_history;
class logger;
class network;
class network_params;
class node;
class node_config;
class node_flags;
class node_observers;
class online_reps;
class rep_crawler;
class rep_tiers;
class stats;
class vote_cache;
class vote_generator;
class vote_processor;
class vote_router;
class wallets;

enum class vote_code;
}
5 changes: 5 additions & 0 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
vote_router{ *vote_router_impl },
vote_processor_impl{ std::make_unique<nano::vote_processor> (config.vote_processor, vote_router, observers, stats, flags, logger, online_reps, rep_crawler, ledger, network_params, rep_tiers) },
vote_processor{ *vote_processor_impl },
vote_cache_processor_impl{ std::make_unique<nano::vote_cache_processor> (config.vote_processor, vote_router, vote_cache, stats, logger) },
vote_cache_processor{ *vote_cache_processor_impl },
generator_impl{ std::make_unique<nano::vote_generator> (config, *this, ledger, wallets, vote_processor, history, network, stats, logger, /* non-final */ false) },
generator{ *generator_impl },
final_generator_impl{ std::make_unique<nano::vote_generator> (config, *this, ledger, wallets, vote_processor, history, network, stats, logger, /* final */ true) },
Expand Down Expand Up @@ -595,6 +597,7 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (no
composite->add_component (collect_container_info (node.observers, "observers"));
composite->add_component (collect_container_info (node.wallets, "wallets"));
composite->add_component (node.vote_processor.collect_container_info ("vote_processor"));
composite->add_component (node.vote_cache_processor.collect_container_info ("vote_cache_processor"));
composite->add_component (node.rep_crawler.collect_container_info ("rep_crawler"));
composite->add_component (node.block_processor.collect_container_info ("block_processor"));
composite->add_component (collect_container_info (node.online_reps, "online_reps"));
Expand Down Expand Up @@ -713,6 +716,7 @@ void nano::node::start ()
wallets.start ();
rep_tiers.start ();
vote_processor.start ();
vote_cache_processor.start ();
block_processor.start ();
active.start ();
generator.start ();
Expand Down Expand Up @@ -757,6 +761,7 @@ void nano::node::stop ()
unchecked.stop ();
block_processor.stop ();
aggregator.stop ();
vote_cache_processor.stop ();
vote_processor.stop ();
rep_tiers.stop ();
scheduler.stop ();
Expand Down
3 changes: 3 additions & 0 deletions nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class confirming_set;
class message_processor;
class node;
class vote_processor;
class vote_cache_processor;
class vote_router;
class work_pool;
class peer_history;
Expand Down Expand Up @@ -194,6 +195,8 @@ class node final : public std::enable_shared_from_this<node>
nano::vote_router & vote_router;
std::unique_ptr<nano::vote_processor> vote_processor_impl;
nano::vote_processor & vote_processor;
std::unique_ptr<nano::vote_cache_processor> vote_cache_processor_impl;
nano::vote_cache_processor & vote_cache_processor;
std::unique_ptr<nano::vote_generator> generator_impl;
nano::vote_generator & generator;
std::unique_ptr<nano::vote_generator> final_generator_impl;
Expand Down
129 changes: 128 additions & 1 deletion nano/node/vote_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

using namespace std::chrono_literals;

/*
* vote_processor
*/

nano::vote_processor::vote_processor (vote_processor_config const & config_a, nano::vote_router & vote_router, nano::node_observers & observers_a, nano::stats & stats_a, nano::node_flags & flags_a, nano::logger & logger_a, nano::online_reps & online_reps_a, nano::rep_crawler & rep_crawler_a, nano::ledger & ledger_a, nano::network_params & network_params_a, nano::rep_tiers & rep_tiers_a) :
config{ config_a },
vote_router{ vote_router },
Expand Down Expand Up @@ -218,6 +222,129 @@ std::unique_ptr<nano::container_info_component> nano::vote_processor::collect_co
return composite;
}

/*
* vote_cache_processor
*/

nano::vote_cache_processor::vote_cache_processor (vote_processor_config const & config_a, nano::vote_router & vote_router_a, nano::vote_cache & vote_cache_a, nano::stats & stats_a, nano::logger & logger_a) :
config{ config_a },
vote_router{ vote_router_a },
vote_cache{ vote_cache_a },
stats{ stats_a },
logger{ logger_a }
{
}

nano::vote_cache_processor::~vote_cache_processor ()
{
debug_assert (!thread.joinable ());
}

void nano::vote_cache_processor::start ()
{
debug_assert (!thread.joinable ());

thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::vote_cache_processing);
run ();
});
}

void nano::vote_cache_processor::stop ()
{
{
nano::lock_guard<nano::mutex> guard{ mutex };
stopped = true;
}
condition.notify_all ();
if (thread.joinable ())
{
thread.join ();
}
}

void nano::vote_cache_processor::trigger (nano::block_hash const & hash)
{
{
nano::lock_guard<nano::mutex> guard{ mutex };
if (triggered.size () >= config.max_triggered)
{
triggered.pop_front ();
stats.inc (nano::stat::type::vote_cache_processor, nano::stat::detail::overfill);
}
triggered.push_back (hash);
}
condition.notify_all ();
stats.inc (nano::stat::type::vote_cache_processor, nano::stat::detail::triggered);
}

void nano::vote_cache_processor::run ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
stats.inc (nano::stat::type::vote_cache_processor, nano::stat::detail::loop);

if (!triggered.empty ())
{
run_batch (lock);
debug_assert (!lock.owns_lock ());
lock.lock ();
}
else
{
condition.wait (lock, [&] { return stopped || !triggered.empty (); });
}
}
}

void nano::vote_cache_processor::run_batch (nano::unique_lock<nano::mutex> & lock)
{
debug_assert (lock.owns_lock ());
debug_assert (!mutex.try_lock ());
debug_assert (!triggered.empty ());

// Swap and deduplicate
decltype (triggered) triggered_l;
swap (triggered_l, triggered);

lock.unlock ();

std::unordered_set<nano::block_hash> hashes;
hashes.reserve (triggered_l.size ());
hashes.insert (triggered_l.begin (), triggered_l.end ());

stats.add (nano::stat::type::vote_cache_processor, nano::stat::detail::processed, hashes.size ());

for (auto const & hash : hashes)
{
auto cached = vote_cache.find (hash);
for (auto const & cached_vote : cached)
{
vote_router.vote (cached_vote, nano::vote_source::cache, hash);
}
}
}

std::size_t nano::vote_cache_processor::size () const
{
nano::lock_guard<nano::mutex> guard{ mutex };
return triggered.size ();
}

bool nano::vote_cache_processor::empty () const
{
return size () == 0;
}

std::unique_ptr<nano::container_info_component> nano::vote_cache_processor::collect_container_info (std::string const & name) const
{
nano::lock_guard<nano::mutex> guard{ mutex };
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "triggered", triggered.size (), sizeof (decltype (triggered)::value_type) }));
return composite;
}

/*
* vote_processor_config
*/
Expand All @@ -242,4 +369,4 @@ nano::error nano::vote_processor_config::deserialize (nano::tomlconfig & toml)
toml.get ("batch_size", batch_size);

return toml.get_error ();
}
}
72 changes: 44 additions & 28 deletions nano/node/vote_processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <nano/lib/threading.hpp>
#include <nano/lib/utility.hpp>
#include <nano/node/fair_queue.hpp>
#include <nano/node/fwd.hpp>
#include <nano/node/rep_tiers.hpp>
#include <nano/node/vote_router.hpp>
#include <nano/secure/common.hpp>
Expand All @@ -13,32 +14,6 @@
#include <thread>
#include <unordered_set>

namespace nano
{
namespace store
{
class component;
}
class node_observers;
class stats;
class node_config;
class logger;
class online_reps;
class rep_crawler;
class ledger;
class network_params;
class node_flags;
class stats;
class rep_tiers;
enum class vote_code;
class vote_router;

namespace transport
{
class channel;
}
}

namespace nano
{
class vote_processor_config final
Expand All @@ -53,21 +28,25 @@ class vote_processor_config final
size_t pr_priority{ 3 };
size_t threads{ std::clamp (nano::hardware_concurrency () / 2, 1u, 4u) };
size_t batch_size{ 1024 };
size_t max_triggered{ 16384 };
};

class vote_processor final
{
public:
vote_processor (vote_processor_config const &, nano::vote_router & vote_router, nano::node_observers &, nano::stats &, nano::node_flags &, nano::logger &, nano::online_reps &, nano::rep_crawler &, nano::ledger &, nano::network_params &, nano::rep_tiers &);
vote_processor (vote_processor_config const &, nano::vote_router &, nano::node_observers &, nano::stats &, nano::node_flags &, nano::logger &, nano::online_reps &, nano::rep_crawler &, nano::ledger &, nano::network_params &, nano::rep_tiers &);
~vote_processor ();

void start ();
void stop ();

/** @returns true if the vote was queued for processing */
/** Queue vote for processing. @returns true if the vote was queued */
bool vote (std::shared_ptr<nano::vote> const &, std::shared_ptr<nano::transport::channel> const &, nano::vote_source = nano::vote_source::live);
nano::vote_code vote_blocking (std::shared_ptr<nano::vote> const &, std::shared_ptr<nano::transport::channel> const &, nano::vote_source = nano::vote_source::live);

/** Queue hash for vote cache lookup and processing. */
void trigger (nano::block_hash const & hash);

std::size_t size () const;
bool empty () const;

Expand Down Expand Up @@ -101,4 +80,41 @@ class vote_processor final
mutable nano::mutex mutex{ mutex_identifier (mutexes::vote_processor) };
std::vector<std::thread> threads;
};

class vote_cache_processor final
{
public:
vote_cache_processor (vote_processor_config const &, nano::vote_router &, nano::vote_cache &, nano::stats &, nano::logger &);
~vote_cache_processor ();

void start ();
void stop ();

/** Queue hash for vote cache lookup and processing. */
void trigger (nano::block_hash const & hash);

std::size_t size () const;
bool empty () const;

std::unique_ptr<container_info_component> collect_container_info (std::string const & name) const;

private:
void run ();
void run_batch (nano::unique_lock<nano::mutex> &);

private: // Dependencies
vote_processor_config const & config;
nano::vote_router & vote_router;
nano::vote_cache & vote_cache;
nano::stats & stats;
nano::logger & logger;

private:
std::deque<nano::block_hash> triggered;

bool stopped{ false };
nano::condition_variable condition;
mutable nano::mutex mutex;
std::thread thread;
};
}
10 changes: 0 additions & 10 deletions nano/node/vote_router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,16 +123,6 @@ std::unordered_map<nano::block_hash, nano::vote_code> nano::vote_router::vote (s
return results;
}

bool nano::vote_router::trigger_vote_cache (nano::block_hash const & hash)
{
auto cached = cache.find (hash);
for (auto const & cached_vote : cached)
{
vote (cached_vote, nano::vote_source::cache, hash);
}
return !cached.empty ();
}

bool nano::vote_router::active (nano::block_hash const & hash) const
{
std::shared_lock lock{ mutex };
Expand Down
Loading
Loading