Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add populate_backlog rpc command #3860

Merged
merged 15 commits into from
Jul 20, 2022
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions nano/lib/threading.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,16 +90,19 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::unchecked:
thread_role_name_string = "Unchecked";
break;
case nano::thread_role::name::backlog_population:
thread_role_name_string = "Backlog";
break;
default:
debug_assert (false && "nano::thread_role::get_string unhandled thread role");
}

/*
* We want to constrain the thread names to 15
* characters, since this is the smallest maximum
* length supported by the platforms we support
* (specifically, Linux)
*/
* We want to constrain the thread names to 15
* characters, since this is the smallest maximum
* length supported by the platforms we support
* (specifically, Linux)
*/
debug_assert (thread_role_name_string.size () < 16);
return (thread_role_name_string);
}
Expand All @@ -121,7 +124,7 @@ void nano::thread_role::set (nano::thread_role::name role)
void nano::thread_attributes::set (boost::thread::attributes & attrs)
{
auto attrs_l (&attrs);
attrs_l->set_stack_size (8000000); //8MB
attrs_l->set_stack_size (8000000); // 8MB
}

nano::thread_runner::thread_runner (boost::asio::io_context & io_ctx_a, unsigned service_threads_a) :
Expand Down
1 change: 1 addition & 0 deletions nano/lib/threading.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ namespace thread_role
db_parallel_traversal,
election_scheduler,
unchecked,
backlog_population
};

/*
Expand Down
2 changes: 2 additions & 0 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ add_library(
${platform_sources}
active_transactions.hpp
active_transactions.cpp
backlog_population.hpp
backlog_population.cpp
blockprocessor.hpp
blockprocessor.cpp
bootstrap/bootstrap_attempt.hpp
Expand Down
96 changes: 96 additions & 0 deletions nano/node/backlog_population.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#include <nano/lib/threading.hpp>
#include <nano/node/backlog_population.hpp>
#include <nano/node/election_scheduler.hpp>
#include <nano/node/nodeconfig.hpp>
#include <nano/secure/store.hpp>

nano::backlog_population::backlog_population (const config & config_a, nano::store & store_a, nano::election_scheduler & scheduler_a) :
config_m{ config_a },
store_m{ store_a },
scheduler{ scheduler_a }
{
}

nano::backlog_population::~backlog_population ()
{
stop ();
if (thread.joinable ())
dsiganos marked this conversation as resolved.
Show resolved Hide resolved
{
thread.join ();
}
}

void nano::backlog_population::start ()
{
if (!thread.joinable ())
{
thread = std::thread{ [this] () { run (); } };
}
}

void nano::backlog_population::stop ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
stopped = true;
notify ();
}

void nano::backlog_population::trigger ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
triggered = true;
notify ();
}

void nano::backlog_population::notify ()
{
condition.notify_all ();
}

bool nano::backlog_population::predicate () const
{
return triggered;
}

void nano::backlog_population::run ()
{
nano::thread_role::set (nano::thread_role::name::backlog_population);
const auto delay = std::chrono::seconds{ config_m.delay_between_runs_in_seconds };
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
if (predicate () || config_m.ongoing_backlog_population_enabled)
{
triggered = false;
lock.unlock ();
populate_backlog ();
lock.lock ();
}

condition.wait_for (lock, delay, [this] () {
return stopped || predicate ();
});
}
}

void nano::backlog_population::populate_backlog ()
{
auto done = false;
uint64_t const chunk_size = 65536;
nano::account next = 0;
uint64_t total = 0;
while (!stopped && !done)
{
auto transaction = store_m.tx_begin_read ();
auto count = 0;
auto i = store_m.account.begin (transaction, next);
const auto end = store_m.account.end ();
for (; !stopped && i != end && count < chunk_size; ++i, ++count, ++total)
{
auto const & account = i->first;
scheduler.activate (account, transaction);
next = account.number () + 1;
}
done = store_m.account.begin (transaction, next) == end;
}
}
58 changes: 58 additions & 0 deletions nano/node/backlog_population.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#pragma once

#include <nano/lib/locks.hpp>

#include <atomic>
#include <condition_variable>
#include <thread>

namespace nano
{
class store;
class election_scheduler;

class backlog_population final
{
public:
struct config
{
bool ongoing_backlog_population_enabled;
unsigned int delay_between_runs_in_seconds;
};

explicit backlog_population (const config & config_a, store & store, election_scheduler & scheduler);
~backlog_population ();

void start ();
void stop ();
void trigger ();

/** Other components call this to notify us about external changes, so we can check our predicate. */
void notify ();

private:
void run ();
bool predicate () const;

void populate_backlog ();

/** This is a manual trigger, the ongoing backlog population does not use this.
* It can be triggered even when backlog population (frontiers confirmation) is disabled. */
bool triggered{ false };

std::atomic<bool> stopped{ false };

nano::condition_variable condition;
mutable nano::mutex mutex;

/** Thread that runs the backlog implementation logic. The thread always runs, even if
* backlog population is disabled, so that it can service a manual trigger (e.g. via RPC). */
std::thread thread;

config config_m;

private: // Dependencies
store & store_m;
election_scheduler & scheduler;
};
}
8 changes: 8 additions & 0 deletions nano/node/json_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5224,6 +5224,13 @@ void nano::json_handler::work_peers_clear ()
response_errors ();
}

void nano::json_handler::populate_backlog ()
{
node.backlog.trigger ();
response_l.put ("success", "");
response_errors ();
}

void nano::inprocess_rpc_handler::process_request (std::string const &, std::string const & body_a, std::function<void (std::string const &)> response_a)
{
// Note that if the rpc action is async, the shared_ptr<json_handler> lifetime will be extended by the action handler
Expand Down Expand Up @@ -5388,6 +5395,7 @@ ipc_json_handler_no_arg_func_map create_ipc_json_handler_no_arg_func_map ()
no_arg_funcs.emplace ("work_peer_add", &nano::json_handler::work_peer_add);
no_arg_funcs.emplace ("work_peers", &nano::json_handler::work_peers);
no_arg_funcs.emplace ("work_peers_clear", &nano::json_handler::work_peers_clear);
no_arg_funcs.emplace ("populate_backlog", &nano::json_handler::populate_backlog);
return no_arg_funcs;
}

Expand Down
1 change: 1 addition & 0 deletions nano/node/json_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ class json_handler : public std::enable_shared_from_this<nano::json_handler>
void pending_exists ();
void receivable ();
void receivable_exists ();
void populate_backlog ();
void process ();
void pruned_exists ();
void receive ();
Expand Down
45 changes: 10 additions & 35 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ extern unsigned char nano_bootstrap_weights_beta[];
extern std::size_t nano_bootstrap_weights_beta_size;
}

nano::backlog_population::config nodeconfig_to_backlog_population_config (const nano::node_config & config)
{
return nano::backlog_population::config{
.ongoing_backlog_population_enabled = config.frontiers_confirmation != nano::frontiers_confirmation_mode::disabled,
.delay_between_runs_in_seconds = config.network_params.network.is_dev_network () ? 1u : 300u
};
}

void nano::node::keepalive (std::string const & address_a, uint16_t port_a)
{
auto node_l (shared_from_this ());
Expand Down Expand Up @@ -155,6 +163,7 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co
scheduler{ *this },
aggregator (config, stats, active.generator, active.final_generator, history, ledger, wallets, active),
wallets (wallets_store.init_error (), *this),
backlog{ nodeconfig_to_backlog_population_config (config), store, scheduler },
startup_time (std::chrono::steady_clock::now ()),
node_seq (seq)
{
Expand Down Expand Up @@ -702,12 +711,7 @@ void nano::node::start ()
port_mapping.start ();
}
wallets.start ();
if (config.frontiers_confirmation != nano::frontiers_confirmation_mode::disabled)
{
workers.push_task ([this_l = shared ()] () {
this_l->ongoing_backlog_population ();
});
}
backlog.start ();
}

void nano::node::stop ()
Expand Down Expand Up @@ -1022,15 +1026,6 @@ void nano::node::ongoing_unchecked_cleanup ()
});
}

void nano::node::ongoing_backlog_population ()
{
populate_backlog ();
auto delay = config.network_params.network.is_dev_network () ? std::chrono::seconds{ 1 } : std::chrono::duration_cast<std::chrono::seconds> (std::chrono::minutes{ 5 });
workers.add_timed_task (std::chrono::steady_clock::now () + delay, [this_l = shared ()] () {
this_l->ongoing_backlog_population ();
});
}

bool nano::node::collect_ledger_pruning_targets (std::deque<nano::block_hash> & pruning_targets_a, nano::account & last_account_a, uint64_t const batch_read_size_a, uint64_t const max_depth_a, uint64_t const cutoff_time_a)
{
uint64_t read_operations (0);
Expand Down Expand Up @@ -1794,26 +1789,6 @@ std::pair<uint64_t, decltype (nano::ledger::bootstrap_weights)> nano::node::get_
return { max_blocks, weights };
}

void nano::node::populate_backlog ()
{
auto done = false;
uint64_t const chunk_size = 65536;
nano::account next = 0;
uint64_t total = 0;
while (!stopped && !done)
{
auto transaction = store.tx_begin_read ();
auto count = 0;
for (auto i = store.account.begin (transaction, next), n = store.account.end (); !stopped && i != n && count < chunk_size; ++i, ++count, ++total)
{
auto const & account = i->first;
scheduler.activate (account, transaction);
next = account.number () + 1;
}
done = store.account.begin (transaction, next) == store.account.end ();
}
}

/** Convenience function to easily return the confirmation height of an account. */
uint64_t nano::node::get_confirmation_height (nano::transaction const & transaction_a, nano::account & account_a)
{
Expand Down
5 changes: 3 additions & 2 deletions nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <nano/lib/stats.hpp>
#include <nano/lib/work.hpp>
#include <nano/node/active_transactions.hpp>
#include <nano/node/backlog_population.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/bootstrap/bootstrap.hpp>
#include <nano/node/bootstrap/bootstrap_attempt.hpp>
Expand Down Expand Up @@ -123,7 +124,6 @@ class node final : public std::enable_shared_from_this<nano::node>
void ongoing_bootstrap ();
void ongoing_peer_store ();
void ongoing_unchecked_cleanup ();
void ongoing_backlog_population ();
void backup_wallet ();
void search_receivable_all ();
void bootstrap_wallet ();
Expand Down Expand Up @@ -154,7 +154,6 @@ class node final : public std::enable_shared_from_this<nano::node>
bool epoch_upgrader (nano::raw_key const &, nano::epoch, uint64_t, uint64_t);
void set_bandwidth_params (std::size_t limit, double ratio);
std::pair<uint64_t, decltype (nano::ledger::bootstrap_weights)> get_bootstrap_weights () const;
void populate_backlog ();
uint64_t get_confirmation_height (nano::transaction const &, nano::account &);
nano::write_database_queue write_database_queue;
boost::asio::io_context & io_ctx;
Expand Down Expand Up @@ -198,6 +197,8 @@ class node final : public std::enable_shared_from_this<nano::node>
nano::election_scheduler scheduler;
nano::request_aggregator aggregator;
nano::wallets wallets;
nano::backlog_population backlog;

std::chrono::steady_clock::time_point const startup_time;
std::chrono::seconds unchecked_cutoff = std::chrono::seconds (7 * 24 * 60 * 60); // Week
std::atomic<bool> unresponsive_work_peers{ false };
Expand Down
1 change: 1 addition & 0 deletions nano/rpc/rpc_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ std::unordered_set<std::string> create_rpc_control_impls ()
set.emplace ("ledger");
set.emplace ("node_id");
set.emplace ("password_change");
set.emplace ("populate_backlog");
set.emplace ("receive");
set.emplace ("receive_minimum");
set.emplace ("receive_minimum_set");
Expand Down
Loading