Skip to content

Commit

Permalink
Vote cache processor
Browse files Browse the repository at this point in the history
  • Loading branch information
pwojcikdev committed May 20, 2024
1 parent 8255d97 commit 4607051
Show file tree
Hide file tree
Showing 11 changed files with 199 additions and 43 deletions.
2 changes: 2 additions & 0 deletions nano/lib/stats_enums.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ enum class type
telemetry,
vote_generator,
vote_cache,
vote_cache_processor,
hinting,
blockprocessor,
blockprocessor_source,
Expand Down Expand Up @@ -112,6 +113,7 @@ enum class detail
cache,
rebroadcast,
queue_overflow,
triggered,

// processing queue
queue,
Expand Down
3 changes: 3 additions & 0 deletions nano/lib/thread_roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::vote_processing:
thread_role_name_string = "Vote processing";
break;
case nano::thread_role::name::vote_cache_processing:
thread_role_name_string = "Vote cache proc";
break;
case nano::thread_role::name::block_processing:
thread_role_name_string = "Blck processing";
break;
Expand Down
1 change: 1 addition & 0 deletions nano/lib/thread_roles.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ enum class name
work,
message_processing,
vote_processing,
vote_cache_processing,
block_processing,
request_loop,
wallet_actions,
Expand Down
4 changes: 2 additions & 2 deletions nano/node/active_elections.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ nano::election_insertion_result nano::active_elections::insert (std::shared_ptr<
{
debug_assert (result.election);

node.vote_router.trigger_vote_cache (hash);
node.vote_cache_processor.trigger (hash);
node.observers.active_started.notify (hash);
vacancy_update ();
}
Expand Down Expand Up @@ -507,7 +507,7 @@ bool nano::active_elections::publish (std::shared_ptr<nano::block> const & block
node.vote_router.connect (block_a->hash (), election);
lock.unlock ();

node.vote_router.trigger_vote_cache (block_a->hash ());
node.vote_cache_processor.trigger (block_a->hash ());

node.stats.inc (nano::stat::type::active, nano::stat::detail::election_block_conflict);
}
Expand Down
10 changes: 10 additions & 0 deletions nano/node/fwd.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,20 @@ class ledger;
class local_vote_history;
class logger;
class network;
class network_params;
class node;
class node_config;
class node_flags;
class node_observers;
class online_reps;
class rep_crawler;
class rep_tiers;
class stats;
class vote_cache;
class vote_generator;
class vote_processor;
class vote_router;
class wallets;

enum class vote_code;
}
5 changes: 5 additions & 0 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ nano::node::node (std::shared_ptr<boost::asio::io_context> io_ctx_a, std::filesy
vote_router{ *vote_router_impl },
vote_processor_impl{ std::make_unique<nano::vote_processor> (config.vote_processor, vote_router, observers, stats, flags, logger, online_reps, rep_crawler, ledger, network_params, rep_tiers) },
vote_processor{ *vote_processor_impl },
vote_cache_processor_impl{ std::make_unique<nano::vote_cache_processor> (config.vote_processor, vote_router, vote_cache, stats, logger) },
vote_cache_processor{ *vote_cache_processor_impl },
generator_impl{ std::make_unique<nano::vote_generator> (config, *this, ledger, wallets, vote_processor, history, network, stats, logger, /* non-final */ false) },
generator{ *generator_impl },
final_generator_impl{ std::make_unique<nano::vote_generator> (config, *this, ledger, wallets, vote_processor, history, network, stats, logger, /* final */ true) },
Expand Down Expand Up @@ -595,6 +597,7 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (no
composite->add_component (collect_container_info (node.observers, "observers"));
composite->add_component (collect_container_info (node.wallets, "wallets"));
composite->add_component (node.vote_processor.collect_container_info ("vote_processor"));
composite->add_component (node.vote_cache_processor.collect_container_info ("vote_cache_processor"));
composite->add_component (node.rep_crawler.collect_container_info ("rep_crawler"));
composite->add_component (node.block_processor.collect_container_info ("block_processor"));
composite->add_component (collect_container_info (node.online_reps, "online_reps"));
Expand Down Expand Up @@ -713,6 +716,7 @@ void nano::node::start ()
wallets.start ();
rep_tiers.start ();
vote_processor.start ();
vote_cache_processor.start ();
block_processor.start ();
active.start ();
generator.start ();
Expand Down Expand Up @@ -757,6 +761,7 @@ void nano::node::stop ()
unchecked.stop ();
block_processor.stop ();
aggregator.stop ();
vote_cache_processor.stop ();
vote_processor.stop ();
rep_tiers.stop ();
scheduler.stop ();
Expand Down
3 changes: 3 additions & 0 deletions nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class confirming_set;
class message_processor;
class node;
class vote_processor;
class vote_cache_processor;
class vote_router;
class work_pool;
class peer_history;
Expand Down Expand Up @@ -194,6 +195,8 @@ class node final : public std::enable_shared_from_this<node>
nano::vote_router & vote_router;
std::unique_ptr<nano::vote_processor> vote_processor_impl;
nano::vote_processor & vote_processor;
std::unique_ptr<nano::vote_cache_processor> vote_cache_processor_impl;
nano::vote_cache_processor & vote_cache_processor;
std::unique_ptr<nano::vote_generator> generator_impl;
nano::vote_generator & generator;
std::unique_ptr<nano::vote_generator> final_generator_impl;
Expand Down
131 changes: 129 additions & 2 deletions nano/node/vote_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@

using namespace std::chrono_literals;

/*
* vote_processor
*/

nano::vote_processor::vote_processor (vote_processor_config const & config_a, nano::vote_router & vote_router, nano::node_observers & observers_a, nano::stats & stats_a, nano::node_flags & flags_a, nano::logger & logger_a, nano::online_reps & online_reps_a, nano::rep_crawler & rep_crawler_a, nano::ledger & ledger_a, nano::network_params & network_params_a, nano::rep_tiers & rep_tiers_a) :
config{ config_a },
vote_router{ vote_router },
Expand Down Expand Up @@ -70,7 +74,7 @@ void nano::vote_processor::start ()
for (int n = 0; n < config.threads; ++n)
{
threads.emplace_back ([this] () {
nano::thread_role::set (nano::thread_role::name::vote_processing);
nano::thread_role::set (nano::thread_role::name::vote_cache_processing);
run ();
});
}
Expand Down Expand Up @@ -218,6 +222,129 @@ std::unique_ptr<nano::container_info_component> nano::vote_processor::collect_co
return composite;
}

/*
* vote_cache_processor
*/

nano::vote_cache_processor::vote_cache_processor (vote_processor_config const & config_a, nano::vote_router & vote_router_a, nano::vote_cache & vote_cache_a, nano::stats & stats_a, nano::logger & logger_a) :
config{ config_a },
vote_router{ vote_router_a },
vote_cache{ vote_cache_a },
stats{ stats_a },
logger{ logger_a }
{
}

nano::vote_cache_processor::~vote_cache_processor ()
{
debug_assert (!thread.joinable ());
}

void nano::vote_cache_processor::start ()
{
debug_assert (!thread.joinable ());

thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::vote_processing);
run ();
});
}

void nano::vote_cache_processor::stop ()
{
{
nano::lock_guard<nano::mutex> guard{ mutex };
stopped = true;
}
condition.notify_all ();
if (thread.joinable ())
{
thread.join ();
}
}

void nano::vote_cache_processor::trigger (nano::block_hash const & hash)
{
{
nano::lock_guard<nano::mutex> guard{ mutex };
if (triggered.size () >= config.max_triggered)
{
triggered.pop_front ();
stats.inc (nano::stat::type::vote_cache_processor, nano::stat::detail::overfill);
}
triggered.push_back (hash);
}
condition.notify_all ();
stats.inc (nano::stat::type::vote_cache_processor, nano::stat::detail::triggered);
}

void nano::vote_cache_processor::run ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
stats.inc (nano::stat::type::vote_cache_processor, nano::stat::detail::loop);

if (!triggered.empty ())
{
run_batch (lock);
debug_assert (!lock.owns_lock ());
lock.lock ();
}
else
{
condition.wait (lock, [&] { return stopped || !triggered.empty (); });
}
}
}

void nano::vote_cache_processor::run_batch (nano::unique_lock<nano::mutex> & lock)
{
debug_assert (lock.owns_lock ());
debug_assert (!mutex.try_lock ());
debug_assert (!triggered.empty ());

// Swap and deduplicate
decltype (triggered) triggered_l;
swap (triggered_l, triggered);

lock.unlock ();

std::unordered_set<nano::block_hash> hashes;
hashes.reserve (triggered_l.size ());
hashes.insert (triggered_l.begin (), triggered_l.end ());

stats.add (nano::stat::type::vote_cache_processor, nano::stat::detail::processed, hashes.size ());

for (auto const & hash : hashes)
{
auto cached = vote_cache.find (hash);
for (auto const & cached_vote : cached)
{
vote_router.vote (cached_vote, nano::vote_source::cache, hash);
}
}
}

std::size_t nano::vote_cache_processor::size () const
{
nano::lock_guard<nano::mutex> guard{ mutex };
return triggered.size ();
}

bool nano::vote_cache_processor::empty () const
{
return size () == 0;
}

std::unique_ptr<nano::container_info_component> nano::vote_cache_processor::collect_container_info (std::string const & name) const
{
nano::lock_guard<nano::mutex> guard{ mutex };
auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "triggered", triggered.size (), sizeof (decltype (triggered)::value_type) }));
return composite;
}

/*
* vote_processor_config
*/
Expand All @@ -242,4 +369,4 @@ nano::error nano::vote_processor_config::deserialize (nano::tomlconfig & toml)
toml.get ("batch_size", batch_size);

return toml.get_error ();
}
}
72 changes: 44 additions & 28 deletions nano/node/vote_processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <nano/lib/threading.hpp>
#include <nano/lib/utility.hpp>
#include <nano/node/fair_queue.hpp>
#include <nano/node/fwd.hpp>
#include <nano/node/rep_tiers.hpp>
#include <nano/node/vote_router.hpp>
#include <nano/secure/common.hpp>
Expand All @@ -13,32 +14,6 @@
#include <thread>
#include <unordered_set>

namespace nano
{
namespace store
{
class component;
}
class node_observers;
class stats;
class node_config;
class logger;
class online_reps;
class rep_crawler;
class ledger;
class network_params;
class node_flags;
class stats;
class rep_tiers;
enum class vote_code;
class vote_router;

namespace transport
{
class channel;
}
}

namespace nano
{
class vote_processor_config final
Expand All @@ -53,21 +28,25 @@ class vote_processor_config final
size_t pr_priority{ 3 };
size_t threads{ std::clamp (nano::hardware_concurrency () / 2, 1u, 4u) };
size_t batch_size{ 1024 };
size_t max_triggered{ 16384 };
};

class vote_processor final
{
public:
vote_processor (vote_processor_config const &, nano::vote_router & vote_router, nano::node_observers &, nano::stats &, nano::node_flags &, nano::logger &, nano::online_reps &, nano::rep_crawler &, nano::ledger &, nano::network_params &, nano::rep_tiers &);
vote_processor (vote_processor_config const &, nano::vote_router &, nano::node_observers &, nano::stats &, nano::node_flags &, nano::logger &, nano::online_reps &, nano::rep_crawler &, nano::ledger &, nano::network_params &, nano::rep_tiers &);
~vote_processor ();

void start ();
void stop ();

/** @returns true if the vote was queued for processing */
/** Queue vote for processing. @returns true if the vote was queued */
bool vote (std::shared_ptr<nano::vote> const &, std::shared_ptr<nano::transport::channel> const &, nano::vote_source = nano::vote_source::live);
nano::vote_code vote_blocking (std::shared_ptr<nano::vote> const &, std::shared_ptr<nano::transport::channel> const &, nano::vote_source = nano::vote_source::live);

/** Queue hash for vote cache lookup and processing. */
void trigger (nano::block_hash const & hash);

std::size_t size () const;
bool empty () const;

Expand Down Expand Up @@ -101,4 +80,41 @@ class vote_processor final
mutable nano::mutex mutex{ mutex_identifier (mutexes::vote_processor) };
std::vector<std::thread> threads;
};

class vote_cache_processor final
{
public:
vote_cache_processor (vote_processor_config const &, nano::vote_router &, nano::vote_cache &, nano::stats &, nano::logger &);
~vote_cache_processor ();

void start ();
void stop ();

/** Queue hash for vote cache lookup and processing. */
void trigger (nano::block_hash const & hash);

std::size_t size () const;
bool empty () const;

std::unique_ptr<container_info_component> collect_container_info (std::string const & name) const;

private:
void run ();
void run_batch (nano::unique_lock<nano::mutex> &);

private: // Dependencies
vote_processor_config const & config;
nano::vote_router & vote_router;
nano::vote_cache & vote_cache;
nano::stats & stats;
nano::logger & logger;

private:
std::deque<nano::block_hash> triggered;

bool stopped{ false };
nano::condition_variable condition;
mutable nano::mutex mutex;
std::thread thread;
};
}
Loading

0 comments on commit 4607051

Please sign in to comment.