From 30f2de0439746e213876a57294b6256abfb9d674 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20W=C3=B3jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Fri, 24 May 2024 10:08:34 +0200 Subject: [PATCH] Vote cache processor (#4631) --- nano/lib/stats_enums.hpp | 2 + nano/lib/thread_roles.cpp | 3 + nano/lib/thread_roles.hpp | 1 + nano/node/active_elections.cpp | 4 +- nano/node/fwd.hpp | 10 +++ nano/node/node.cpp | 5 ++ nano/node/node.hpp | 3 + nano/node/vote_processor.cpp | 129 ++++++++++++++++++++++++++++++++- nano/node/vote_processor.hpp | 72 +++++++++++------- nano/node/vote_router.cpp | 10 --- nano/node/vote_router.hpp | 1 - 11 files changed, 198 insertions(+), 42 deletions(-) diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index c5b828927c..27eb16bce4 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -48,6 +48,7 @@ enum class type telemetry, vote_generator, vote_cache, + vote_cache_processor, hinting, blockprocessor, blockprocessor_source, @@ -112,6 +113,7 @@ enum class detail cache, rebroadcast, queue_overflow, + triggered, // processing queue queue, diff --git a/nano/lib/thread_roles.cpp b/nano/lib/thread_roles.cpp index 810514a14b..76211682e4 100644 --- a/nano/lib/thread_roles.cpp +++ b/nano/lib/thread_roles.cpp @@ -28,6 +28,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::vote_processing: thread_role_name_string = "Vote processing"; break; + case nano::thread_role::name::vote_cache_processing: + thread_role_name_string = "Vote cache proc"; + break; case nano::thread_role::name::block_processing: thread_role_name_string = "Blck processing"; break; diff --git a/nano/lib/thread_roles.hpp b/nano/lib/thread_roles.hpp index 1b8d50b639..d6d78b694f 100644 --- a/nano/lib/thread_roles.hpp +++ b/nano/lib/thread_roles.hpp @@ -14,6 +14,7 @@ enum class name work, message_processing, vote_processing, + vote_cache_processing, block_processing, request_loop, wallet_actions, diff --git a/nano/node/active_elections.cpp b/nano/node/active_elections.cpp index 85b3a99726..780b22d39c 100644 --- a/nano/node/active_elections.cpp +++ b/nano/node/active_elections.cpp @@ -440,7 +440,7 @@ nano::election_insertion_result nano::active_elections::insert (std::shared_ptr< { debug_assert (result.election); - node.vote_router.trigger_vote_cache (hash); + node.vote_cache_processor.trigger (hash); node.observers.active_started.notify (hash); vacancy_update (); } @@ -523,7 +523,7 @@ bool nano::active_elections::publish (std::shared_ptr const & block node.vote_router.connect (block_a->hash (), election); lock.unlock (); - node.vote_router.trigger_vote_cache (block_a->hash ()); + node.vote_cache_processor.trigger (block_a->hash ()); node.stats.inc (nano::stat::type::active, nano::stat::detail::election_block_conflict); } diff --git a/nano/node/fwd.hpp b/nano/node/fwd.hpp index 83233179c9..0da465ae22 100644 --- a/nano/node/fwd.hpp +++ b/nano/node/fwd.hpp @@ -11,10 +11,20 @@ class ledger; class local_vote_history; class logger; class network; +class network_params; class node; class node_config; +class node_flags; +class node_observers; +class online_reps; +class rep_crawler; +class rep_tiers; class stats; +class vote_cache; class vote_generator; +class vote_processor; class vote_router; class wallets; + +enum class vote_code; } \ No newline at end of file diff --git a/nano/node/node.cpp b/nano/node/node.cpp index da948aad53..2391274812 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -200,6 +200,8 @@ nano::node::node (std::shared_ptr io_ctx_a, std::filesy vote_router{ *vote_router_impl }, vote_processor_impl{ std::make_unique (config.vote_processor, vote_router, observers, stats, flags, logger, online_reps, rep_crawler, ledger, network_params, rep_tiers) }, vote_processor{ *vote_processor_impl }, + vote_cache_processor_impl{ std::make_unique (config.vote_processor, vote_router, vote_cache, stats, logger) }, + vote_cache_processor{ *vote_cache_processor_impl }, generator_impl{ std::make_unique (config, *this, ledger, wallets, vote_processor, history, network, stats, logger, /* non-final */ false) }, generator{ *generator_impl }, final_generator_impl{ std::make_unique (config, *this, ledger, wallets, vote_processor, history, network, stats, logger, /* final */ true) }, @@ -572,6 +574,7 @@ std::unique_ptr nano::collect_container_info (no composite->add_component (collect_container_info (node.observers, "observers")); composite->add_component (collect_container_info (node.wallets, "wallets")); composite->add_component (node.vote_processor.collect_container_info ("vote_processor")); + composite->add_component (node.vote_cache_processor.collect_container_info ("vote_cache_processor")); composite->add_component (node.rep_crawler.collect_container_info ("rep_crawler")); composite->add_component (node.block_processor.collect_container_info ("block_processor")); composite->add_component (collect_container_info (node.online_reps, "online_reps")); @@ -690,6 +693,7 @@ void nano::node::start () wallets.start (); rep_tiers.start (); vote_processor.start (); + vote_cache_processor.start (); block_processor.start (); active.start (); generator.start (); @@ -734,6 +738,7 @@ void nano::node::stop () unchecked.stop (); block_processor.stop (); aggregator.stop (); + vote_cache_processor.stop (); vote_processor.stop (); rep_tiers.stop (); scheduler.stop (); diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 6e95a84a86..f63ba5ec47 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -48,6 +48,7 @@ class confirming_set; class message_processor; class node; class vote_processor; +class vote_cache_processor; class vote_router; class work_pool; class peer_history; @@ -194,6 +195,8 @@ class node final : public std::enable_shared_from_this nano::vote_router & vote_router; std::unique_ptr vote_processor_impl; nano::vote_processor & vote_processor; + std::unique_ptr vote_cache_processor_impl; + nano::vote_cache_processor & vote_cache_processor; std::unique_ptr generator_impl; nano::vote_generator & generator; std::unique_ptr final_generator_impl; diff --git a/nano/node/vote_processor.cpp b/nano/node/vote_processor.cpp index ebf96af0b9..d2d22f45b4 100644 --- a/nano/node/vote_processor.cpp +++ b/nano/node/vote_processor.cpp @@ -15,6 +15,10 @@ using namespace std::chrono_literals; +/* + * vote_processor + */ + nano::vote_processor::vote_processor (vote_processor_config const & config_a, nano::vote_router & vote_router, nano::node_observers & observers_a, nano::stats & stats_a, nano::node_flags & flags_a, nano::logger & logger_a, nano::online_reps & online_reps_a, nano::rep_crawler & rep_crawler_a, nano::ledger & ledger_a, nano::network_params & network_params_a, nano::rep_tiers & rep_tiers_a) : config{ config_a }, vote_router{ vote_router }, @@ -218,6 +222,129 @@ std::unique_ptr nano::vote_processor::collect_co return composite; } +/* + * vote_cache_processor + */ + +nano::vote_cache_processor::vote_cache_processor (vote_processor_config const & config_a, nano::vote_router & vote_router_a, nano::vote_cache & vote_cache_a, nano::stats & stats_a, nano::logger & logger_a) : + config{ config_a }, + vote_router{ vote_router_a }, + vote_cache{ vote_cache_a }, + stats{ stats_a }, + logger{ logger_a } +{ +} + +nano::vote_cache_processor::~vote_cache_processor () +{ + debug_assert (!thread.joinable ()); +} + +void nano::vote_cache_processor::start () +{ + debug_assert (!thread.joinable ()); + + thread = std::thread ([this] () { + nano::thread_role::set (nano::thread_role::name::vote_cache_processing); + run (); + }); +} + +void nano::vote_cache_processor::stop () +{ + { + nano::lock_guard guard{ mutex }; + stopped = true; + } + condition.notify_all (); + if (thread.joinable ()) + { + thread.join (); + } +} + +void nano::vote_cache_processor::trigger (nano::block_hash const & hash) +{ + { + nano::lock_guard guard{ mutex }; + if (triggered.size () >= config.max_triggered) + { + triggered.pop_front (); + stats.inc (nano::stat::type::vote_cache_processor, nano::stat::detail::overfill); + } + triggered.push_back (hash); + } + condition.notify_all (); + stats.inc (nano::stat::type::vote_cache_processor, nano::stat::detail::triggered); +} + +void nano::vote_cache_processor::run () +{ + nano::unique_lock lock{ mutex }; + while (!stopped) + { + stats.inc (nano::stat::type::vote_cache_processor, nano::stat::detail::loop); + + if (!triggered.empty ()) + { + run_batch (lock); + debug_assert (!lock.owns_lock ()); + lock.lock (); + } + else + { + condition.wait (lock, [&] { return stopped || !triggered.empty (); }); + } + } +} + +void nano::vote_cache_processor::run_batch (nano::unique_lock & lock) +{ + debug_assert (lock.owns_lock ()); + debug_assert (!mutex.try_lock ()); + debug_assert (!triggered.empty ()); + + // Swap and deduplicate + decltype (triggered) triggered_l; + swap (triggered_l, triggered); + + lock.unlock (); + + std::unordered_set hashes; + hashes.reserve (triggered_l.size ()); + hashes.insert (triggered_l.begin (), triggered_l.end ()); + + stats.add (nano::stat::type::vote_cache_processor, nano::stat::detail::processed, hashes.size ()); + + for (auto const & hash : hashes) + { + auto cached = vote_cache.find (hash); + for (auto const & cached_vote : cached) + { + vote_router.vote (cached_vote, nano::vote_source::cache, hash); + } + } +} + +std::size_t nano::vote_cache_processor::size () const +{ + nano::lock_guard guard{ mutex }; + return triggered.size (); +} + +bool nano::vote_cache_processor::empty () const +{ + return size () == 0; +} + +std::unique_ptr nano::vote_cache_processor::collect_container_info (std::string const & name) const +{ + nano::lock_guard guard{ mutex }; + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "triggered", triggered.size (), sizeof (decltype (triggered)::value_type) })); + return composite; +} + /* * vote_processor_config */ @@ -242,4 +369,4 @@ nano::error nano::vote_processor_config::deserialize (nano::tomlconfig & toml) toml.get ("batch_size", batch_size); return toml.get_error (); -} +} \ No newline at end of file diff --git a/nano/node/vote_processor.hpp b/nano/node/vote_processor.hpp index 592afff297..2a22f65eea 100644 --- a/nano/node/vote_processor.hpp +++ b/nano/node/vote_processor.hpp @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include @@ -13,32 +14,6 @@ #include #include -namespace nano -{ -namespace store -{ - class component; -} -class node_observers; -class stats; -class node_config; -class logger; -class online_reps; -class rep_crawler; -class ledger; -class network_params; -class node_flags; -class stats; -class rep_tiers; -enum class vote_code; -class vote_router; - -namespace transport -{ - class channel; -} -} - namespace nano { class vote_processor_config final @@ -53,21 +28,25 @@ class vote_processor_config final size_t pr_priority{ 3 }; size_t threads{ std::clamp (nano::hardware_concurrency () / 2, 1u, 4u) }; size_t batch_size{ 1024 }; + size_t max_triggered{ 16384 }; }; class vote_processor final { public: - vote_processor (vote_processor_config const &, nano::vote_router & vote_router, nano::node_observers &, nano::stats &, nano::node_flags &, nano::logger &, nano::online_reps &, nano::rep_crawler &, nano::ledger &, nano::network_params &, nano::rep_tiers &); + vote_processor (vote_processor_config const &, nano::vote_router &, nano::node_observers &, nano::stats &, nano::node_flags &, nano::logger &, nano::online_reps &, nano::rep_crawler &, nano::ledger &, nano::network_params &, nano::rep_tiers &); ~vote_processor (); void start (); void stop (); - /** @returns true if the vote was queued for processing */ + /** Queue vote for processing. @returns true if the vote was queued */ bool vote (std::shared_ptr const &, std::shared_ptr const &, nano::vote_source = nano::vote_source::live); nano::vote_code vote_blocking (std::shared_ptr const &, std::shared_ptr const &, nano::vote_source = nano::vote_source::live); + /** Queue hash for vote cache lookup and processing. */ + void trigger (nano::block_hash const & hash); + std::size_t size () const; bool empty () const; @@ -101,4 +80,41 @@ class vote_processor final mutable nano::mutex mutex{ mutex_identifier (mutexes::vote_processor) }; std::vector threads; }; + +class vote_cache_processor final +{ +public: + vote_cache_processor (vote_processor_config const &, nano::vote_router &, nano::vote_cache &, nano::stats &, nano::logger &); + ~vote_cache_processor (); + + void start (); + void stop (); + + /** Queue hash for vote cache lookup and processing. */ + void trigger (nano::block_hash const & hash); + + std::size_t size () const; + bool empty () const; + + std::unique_ptr collect_container_info (std::string const & name) const; + +private: + void run (); + void run_batch (nano::unique_lock &); + +private: // Dependencies + vote_processor_config const & config; + nano::vote_router & vote_router; + nano::vote_cache & vote_cache; + nano::stats & stats; + nano::logger & logger; + +private: + std::deque triggered; + + bool stopped{ false }; + nano::condition_variable condition; + mutable nano::mutex mutex; + std::thread thread; +}; } diff --git a/nano/node/vote_router.cpp b/nano/node/vote_router.cpp index b74fea4b1b..426dc69a45 100644 --- a/nano/node/vote_router.cpp +++ b/nano/node/vote_router.cpp @@ -123,16 +123,6 @@ std::unordered_map nano::vote_router::vote (s return results; } -bool nano::vote_router::trigger_vote_cache (nano::block_hash const & hash) -{ - auto cached = cache.find (hash); - for (auto const & cached_vote : cached) - { - vote (cached_vote, nano::vote_source::cache, hash); - } - return !cached.empty (); -} - bool nano::vote_router::active (nano::block_hash const & hash) const { std::shared_lock lock{ mutex }; diff --git a/nano/node/vote_router.hpp b/nano/node/vote_router.hpp index d0a5bd5289..71bb42ed52 100644 --- a/nano/node/vote_router.hpp +++ b/nano/node/vote_router.hpp @@ -59,7 +59,6 @@ class vote_router final // If 'filter' parameter is non-zero, only elections for the specified hash are notified. // This eliminates duplicate processing when triggering votes from the vote_cache as the result of a specific election being created. std::unordered_map vote (std::shared_ptr const &, nano::vote_source = nano::vote_source::live, nano::block_hash filter = { 0 }); - bool trigger_vote_cache (nano::block_hash const & hash); bool active (nano::block_hash const & hash) const; std::shared_ptr election (nano::block_hash const & hash) const;