Skip to content

Commit

Permalink
Parallelize state block signature verification with block processor (#…
Browse files Browse the repository at this point in the history
…2570)

* Parallelize state block signature verification with batch block processing

* Make sure all state blocks are flushed too

* Serg review comment about notification cleanup

* Use half the amount of threads for the signature checker

* Remove multithreaded cutoff

* Use n/2 extra threads. Handles odd number of CPU threads too.

* Formatting

* Simplify expression (Gui comment)
  • Loading branch information
wezrule committed Mar 12, 2020
1 parent 6aea75e commit c1808b6
Show file tree
Hide file tree
Showing 13 changed files with 285 additions and 173 deletions.
3 changes: 3 additions & 0 deletions nano/lib/threading.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::request_aggregator:
thread_role_name_string = "Req aggregator";
break;
case nano::thread_role::name::state_block_signature_verification:
thread_role_name_string = "State block sig";
break;
}

/*
Expand Down
3 changes: 2 additions & 1 deletion nano/lib/threading.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ namespace thread_role
work_watcher,
confirmation_height_processing,
worker,
request_aggregator
request_aggregator,
state_block_signature_verification
};
/*
* Get/Set the identifier for the current thread
Expand Down
10 changes: 6 additions & 4 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ add_library (node
repcrawler.cpp
request_aggregator.hpp
request_aggregator.cpp
signatures.hpp
signatures.cpp
socket.hpp
socket.cpp
state_block_signature_verification.hpp
state_block_signature_verification.cpp
testing.hpp
testing.cpp
transport/tcp.hpp
Expand All @@ -129,10 +135,6 @@ add_library (node
transport/transport.cpp
transport/udp.hpp
transport/udp.cpp
signatures.hpp
signatures.cpp
socket.hpp
socket.cpp
vote_processor.hpp
vote_processor.cpp
voting.hpp
Expand Down
149 changes: 46 additions & 103 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <nano/lib/threading.hpp>
#include <nano/lib/timer.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/election.hpp>
Expand All @@ -14,8 +15,12 @@ stopped (false),
active (false),
next_log (std::chrono::steady_clock::now ()),
node (node_a),
write_database_queue (write_database_queue_a)
write_database_queue (write_database_queue_a),
state_block_signature_verification (node.checker, node.ledger.network_params.ledger.epochs, node.config, node.logger, node.flags.block_processor_verification_size)
{
state_block_signature_verification.blocks_verified_callback = [this](std::deque<nano::unchecked_info> & items, std::vector<int> const & verifications, std::vector<nano::block_hash> const & hashes, std::vector<nano::signature> const & blocks_signatures) {
this->process_verified_state_blocks (items, verifications, hashes, blocks_signatures);
};
}

nano::block_processor::~block_processor ()
Expand All @@ -31,14 +36,15 @@ void nano::block_processor::stop ()
stopped = true;
}
condition.notify_all ();
state_block_signature_verification.stop ();
}

void nano::block_processor::flush ()
{
node.checker.flush ();
flushing = true;
nano::unique_lock<std::mutex> lock (mutex);
while (!stopped && (have_blocks () || active))
while (!stopped && (have_blocks () || active || state_block_signature_verification.is_active ()))
{
condition.wait (lock);
}
Expand All @@ -48,7 +54,7 @@ void nano::block_processor::flush ()
size_t nano::block_processor::size ()
{
nano::unique_lock<std::mutex> lock (mutex);
return (blocks.size () + state_blocks.size () + forced.size ());
return (blocks.size () + state_block_signature_verification.size () + forced.size ());
}

bool nano::block_processor::full ()
Expand All @@ -69,19 +75,23 @@ void nano::block_processor::add (std::shared_ptr<nano::block> block_a, uint64_t

void nano::block_processor::add (nano::unchecked_info const & info_a)
{
debug_assert (!nano::work_validate (*info_a.block));
bool should_notify{ false };
{
nano::lock_guard<std::mutex> lock (mutex);
if (info_a.verified == nano::signature_verification::unknown && (info_a.block->type () == nano::block_type::state || info_a.block->type () == nano::block_type::open || !info_a.account.is_zero ()))
{
state_blocks.push_back (info_a);
state_block_signature_verification.add (info_a);
}
else
{
should_notify = true;
blocks.push_back (info_a);
}
}
condition.notify_all ();
if (should_notify)
{
condition.notify_all ();
}
}

void nano::block_processor::force (std::shared_ptr<nano::block> block_a)
Expand All @@ -104,7 +114,7 @@ void nano::block_processor::process_blocks ()
nano::unique_lock<std::mutex> lock (mutex);
while (!stopped)
{
if (have_blocks ())
if (!blocks.empty () || !forced.empty ())
{
active = true;
lock.unlock ();
Expand All @@ -114,7 +124,7 @@ void nano::block_processor::process_blocks ()
}
else
{
condition.notify_all ();
condition.notify_one ();
condition.wait (lock);
}
}
Expand All @@ -135,71 +145,14 @@ bool nano::block_processor::should_log ()
bool nano::block_processor::have_blocks ()
{
debug_assert (!mutex.try_lock ());
return !blocks.empty () || !forced.empty () || !state_blocks.empty ();
return !blocks.empty () || !forced.empty () || state_block_signature_verification.size () != 0;
}

void nano::block_processor::verify_state_blocks (nano::unique_lock<std::mutex> & lock_a, size_t max_count)
void nano::block_processor::process_verified_state_blocks (std::deque<nano::unchecked_info> & items, std::vector<int> const & verifications, std::vector<nano::block_hash> const & hashes, std::vector<nano::signature> const & blocks_signatures)
{
debug_assert (!mutex.try_lock ());
nano::timer<std::chrono::milliseconds> timer_l (nano::timer_state::started);
std::deque<nano::unchecked_info> items;
if (state_blocks.size () <= max_count)
{
items.swap (state_blocks);
}
else
{
for (auto i (0); i < max_count; ++i)
{
items.push_back (state_blocks.front ());
state_blocks.pop_front ();
}
debug_assert (!state_blocks.empty ());
}
lock_a.unlock ();
if (!items.empty ())
{
auto size (items.size ());
std::vector<nano::block_hash> hashes;
hashes.reserve (size);
std::vector<unsigned char const *> messages;
messages.reserve (size);
std::vector<size_t> lengths;
lengths.reserve (size);
std::vector<nano::account> accounts;
accounts.reserve (size);
std::vector<unsigned char const *> pub_keys;
pub_keys.reserve (size);
std::vector<nano::signature> blocks_signatures;
blocks_signatures.reserve (size);
std::vector<unsigned char const *> signatures;
signatures.reserve (size);
std::vector<int> verifications;
verifications.resize (size, 0);
for (auto i (0); i < size; ++i)
{
auto & item (items[i]);
hashes.push_back (item.block->hash ());
messages.push_back (hashes.back ().bytes.data ());
lengths.push_back (sizeof (decltype (hashes)::value_type));
nano::account account (item.block->account ());
if (!item.block->link ().is_zero () && node.ledger.is_epoch_link (item.block->link ()))
{
account = node.ledger.epoch_signer (item.block->link ());
}
else if (!item.account.is_zero ())
{
account = item.account;
}
accounts.push_back (account);
pub_keys.push_back (accounts.back ().bytes.data ());
blocks_signatures.push_back (item.block->block_signature ());
signatures.push_back (blocks_signatures.back ().bytes.data ());
}
nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data () };
node.checker.verify (check);
lock_a.lock ();
for (auto i (0); i < size; ++i)
nano::unique_lock<std::mutex> lk (mutex);
for (auto i (0); i < verifications.size (); ++i)
{
debug_assert (verifications[i] == 1 || verifications[i] == 0);
auto & item (items.front ());
Expand Down Expand Up @@ -230,47 +183,24 @@ void nano::block_processor::verify_state_blocks (nano::unique_lock<std::mutex> &
}
items.pop_front ();
}
if (node.config.logging.timing_logging () && timer_l.stop () > std::chrono::milliseconds (10))
{
node.logger.try_log (boost::str (boost::format ("Batch verified %1% state blocks in %2% %3%") % size % timer_l.value ().count () % timer_l.unit ()));
}
}
else
{
lock_a.lock ();
}
condition.notify_all ();
}

void nano::block_processor::process_batch (nano::unique_lock<std::mutex> & lock_a)
{
nano::timer<std::chrono::milliseconds> timer_l;
lock_a.lock ();
timer_l.start ();
// Limit state blocks verification time

{
if (!state_blocks.empty ())
{
size_t max_verification_batch (node.flags.block_processor_verification_size != 0 ? node.flags.block_processor_verification_size : 2048 * (node.config.signature_checker_threads + 1));
while (!state_blocks.empty () && timer_l.before_deadline (std::chrono::seconds (2)))
{
verify_state_blocks (lock_a, max_verification_batch);
}
}
}
lock_a.unlock ();
auto scoped_write_guard = write_database_queue.wait (nano::writer::process_batch);
auto transaction (node.store.tx_begin_write ({ tables::accounts, nano::tables::cached_counts, nano::tables::change_blocks, tables::frontiers, tables::open_blocks, tables::pending, tables::receive_blocks, tables::representation, tables::send_blocks, tables::state_blocks, tables::unchecked }, { tables::confirmation_height }));
timer_l.restart ();
nano::timer<std::chrono::milliseconds> timer_l;
lock_a.lock ();
timer_l.start ();
// Processing blocks
unsigned number_of_blocks_processed (0), number_of_forced_processed (0);
while ((!blocks.empty () || !forced.empty ()) && (timer_l.before_deadline (node.config.block_processor_batch_max_time) || (number_of_blocks_processed < node.flags.block_processor_batch_size)) && !awaiting_write)
{
bool log_this_record = (blocks.size () + state_blocks.size () + forced.size () > 64) && should_log ();
if (log_this_record)
if ((blocks.size () + state_block_signature_verification.size () + forced.size () > 64) && should_log ())
{
node.logger.always_log (boost::str (boost::format ("%1% blocks (+ %2% state blocks) (+ %3% forced) in processing queue") % blocks.size () % state_blocks.size () % forced.size ()));
node.logger.always_log (boost::str (boost::format ("%1% blocks (+ %2% state blocks) (+ %3% forced) in processing queue") % blocks.size () % state_block_signature_verification.size () % forced.size ()));
}
nano::unchecked_info info;
nano::block_hash hash (0);
Expand Down Expand Up @@ -322,12 +252,6 @@ void nano::block_processor::process_batch (nano::unique_lock<std::mutex> & lock_
number_of_blocks_processed++;
process_one (transaction, info);
lock_a.lock ();
/* Verify more state blocks if blocks deque is empty
Because verification is long process, avoid large deque verification inside of write transaction */
if (blocks.empty () && !state_blocks.empty ())
{
verify_state_blocks (lock_a, 256 * (node.config.signature_checker_threads + 1));
}
}
awaiting_write = false;
lock_a.unlock ();
Expand Down Expand Up @@ -547,3 +471,22 @@ void nano::block_processor::requeue_invalid (nano::block_hash const & hash_a, na
debug_assert (hash_a == info_a.block->hash ());
node.bootstrap_initiator.lazy_requeue (hash_a, info_a.block->previous (), info_a.confirmed);
}

std::unique_ptr<nano::container_info_component> nano::collect_container_info (block_processor & block_processor, const std::string & name)
{
size_t blocks_count;
size_t forced_count;

{
nano::lock_guard<std::mutex> guard (block_processor.mutex);
blocks_count = block_processor.blocks.size ();
forced_count = block_processor.forced.size ();
}

auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (collect_container_info (block_processor.state_block_signature_verification, "state_block_signature_verification"));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocks", blocks_count, sizeof (decltype (block_processor.blocks)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "forced", forced_count, sizeof (decltype (block_processor.forced)::value_type) }));
composite->add_component (collect_container_info (block_processor.generator, "generator"));
return composite;
}
6 changes: 4 additions & 2 deletions nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <nano/lib/blocks.hpp>
#include <nano/node/state_block_signature_verification.hpp>
#include <nano/node/voting.hpp>
#include <nano/secure/common.hpp>

Expand Down Expand Up @@ -51,22 +52,23 @@ class block_processor final

private:
void queue_unchecked (nano::write_transaction const &, nano::block_hash const &);
void verify_state_blocks (nano::unique_lock<std::mutex> &, size_t = std::numeric_limits<size_t>::max ());
void process_batch (nano::unique_lock<std::mutex> &);
void process_live (nano::block_hash const &, std::shared_ptr<nano::block>, const bool = false, const bool = false);
void requeue_invalid (nano::block_hash const &, nano::unchecked_info const &);
void process_verified_state_blocks (std::deque<nano::unchecked_info> &, std::vector<int> const &, std::vector<nano::block_hash> const &, std::vector<nano::signature> const &);
bool stopped;
bool active;
bool awaiting_write{ false };
std::chrono::steady_clock::time_point next_log;
std::deque<nano::unchecked_info> state_blocks;
std::deque<nano::unchecked_info> blocks;
std::deque<std::shared_ptr<nano::block>> forced;
nano::condition_variable condition;
nano::node & node;
nano::write_database_queue & write_database_queue;
std::mutex mutex;
nano::state_block_signature_verification state_block_signature_verification;

friend std::unique_ptr<container_info_component> collect_container_info (block_processor & block_processor, const std::string & name);
};
std::unique_ptr<nano::container_info_component> collect_container_info (block_processor & block_processor, const std::string & name);
}
21 changes: 0 additions & 21 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,27 +82,6 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (re
return composite;
}

std::unique_ptr<nano::container_info_component> nano::collect_container_info (block_processor & block_processor, const std::string & name)
{
size_t state_blocks_count;
size_t blocks_count;
size_t forced_count;

{
nano::lock_guard<std::mutex> guard (block_processor.mutex);
state_blocks_count = block_processor.state_blocks.size ();
blocks_count = block_processor.blocks.size ();
forced_count = block_processor.forced.size ();
}

auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "state_blocks", state_blocks_count, sizeof (decltype (block_processor.state_blocks)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocks", blocks_count, sizeof (decltype (block_processor.blocks)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "forced", forced_count, sizeof (decltype (block_processor.forced)::value_type) }));
composite->add_component (collect_container_info (block_processor.generator, "generator"));
return composite;
}

nano::node::node (boost::asio::io_context & io_ctx_a, uint16_t peering_port_a, boost::filesystem::path const & application_path_a, nano::alarm & alarm_a, nano::logging const & logging_a, nano::work_pool & work_a, nano::node_flags flags_a) :
node (io_ctx_a, application_path_a, alarm_a, nano::node_config (peering_port_a, logging_a), work_a, flags_a)
{
Expand Down
1 change: 0 additions & 1 deletion nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ class block_arrival final
std::unique_ptr<container_info_component> collect_container_info (block_arrival & block_arrival, const std::string & name);

std::unique_ptr<container_info_component> collect_container_info (rep_crawler & rep_crawler, const std::string & name);
std::unique_ptr<container_info_component> collect_container_info (block_processor & block_processor, const std::string & name);

class node final : public std::enable_shared_from_this<nano::node>
{
Expand Down
2 changes: 1 addition & 1 deletion nano/node/nodeconfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const
toml.put ("io_threads", io_threads, "Number of threads dedicated to I/O opeations. Defaults to the number of CPU threads, and at least 4.\ntype:uint64");
toml.put ("network_threads", network_threads, "Number of threads dedicated to processing network messages. Defaults to the number of CPU threads, and at least 4.\ntype:uint64");
toml.put ("work_threads", work_threads, "Number of threads dedicated to CPU generated work. Defaults to all available CPU threads.\ntype:uint64");
toml.put ("signature_checker_threads", signature_checker_threads, "Number of additional threads dedicated to signature verification. Defaults to the number of CPU threads minus 1.\ntype:uint64");
toml.put ("signature_checker_threads", signature_checker_threads, "Number of additional threads dedicated to signature verification. Defaults to number of CPU threads / 2.\ntype:uint64");
toml.put ("enable_voting", enable_voting, "Enable or disable voting. Enabling this option requires additional system resources, namely increased CPU, bandwidth and disk usage.\ntype:bool");
toml.put ("bootstrap_connections", bootstrap_connections, "Number of outbound bootstrap connections. Must be a power of 2. Defaults to 4.\nWarning: a larger amount of connections may use substantially more system memory.\ntype:uint64");
toml.put ("bootstrap_connections_max", bootstrap_connections_max, "Maximum number of inbound bootstrap connections. Defaults to 64.\nWarning: a larger amount of connections may use additional system memory.\ntype:uint64");
Expand Down
3 changes: 2 additions & 1 deletion nano/node/nodeconfig.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ class node_config
unsigned io_threads{ std::max<unsigned> (4, std::thread::hardware_concurrency ()) };
unsigned network_threads{ std::max<unsigned> (4, std::thread::hardware_concurrency ()) };
unsigned work_threads{ std::max<unsigned> (4, std::thread::hardware_concurrency ()) };
unsigned signature_checker_threads{ (std::thread::hardware_concurrency () != 0) ? std::thread::hardware_concurrency () - 1 : 0 }; /* The calling thread does checks as well so remove it from the number of threads used */
/* Use half available threads on the system for signature checking. The calling thread does checks as well, so these are extra worker threads */
unsigned signature_checker_threads{ std::thread::hardware_concurrency () / 2 };
bool enable_voting{ false };
unsigned bootstrap_connections{ 4 };
unsigned bootstrap_connections_max{ 64 };
Expand Down
Loading

0 comments on commit c1808b6

Please sign in to comment.