Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize state block signature verification with block processor #2570

Merged
merged 14 commits into from
Mar 12, 2020
Merged
3 changes: 3 additions & 0 deletions nano/lib/threading.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::request_aggregator:
thread_role_name_string = "Req aggregator";
break;
case nano::thread_role::name::state_block_signature_verification:
thread_role_name_string = "State block sig";
break;
}

/*
Expand Down
3 changes: 2 additions & 1 deletion nano/lib/threading.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ namespace thread_role
work_watcher,
confirmation_height_processing,
worker,
request_aggregator
request_aggregator,
state_block_signature_verification
};
/*
* Get/Set the identifier for the current thread
Expand Down
10 changes: 6 additions & 4 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ add_library (node
repcrawler.cpp
request_aggregator.hpp
request_aggregator.cpp
signatures.hpp
signatures.cpp
socket.hpp
socket.cpp
state_block_signature_verification.hpp
state_block_signature_verification.cpp
testing.hpp
testing.cpp
transport/tcp.hpp
Expand All @@ -113,10 +119,6 @@ add_library (node
transport/transport.cpp
transport/udp.hpp
transport/udp.cpp
signatures.hpp
signatures.cpp
socket.hpp
socket.cpp
vote_processor.hpp
vote_processor.cpp
voting.hpp
Expand Down
151 changes: 50 additions & 101 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include <nano/lib/threading.hpp>
#include <nano/lib/timer.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/node.hpp>
Expand All @@ -15,8 +16,12 @@ stopped (false),
active (false),
next_log (std::chrono::steady_clock::now ()),
node (node_a),
write_database_queue (write_database_queue_a)
write_database_queue (write_database_queue_a),
state_block_signature_verification (node.checker, node.ledger.network_params.ledger.epochs, node.config, node.logger, node.flags.block_processor_verification_size)
{
state_block_signature_verification.blocks_verified_callback = [this](std::deque<nano::unchecked_info> & items, std::vector<int> const & verifications, std::vector<nano::block_hash> const & hashes, std::vector<nano::signature> const & blocks_signatures) {
this->process_verified_state_blocks (items, verifications, hashes, blocks_signatures);
};
}

nano::block_processor::~block_processor ()
Expand All @@ -32,13 +37,14 @@ void nano::block_processor::stop ()
stopped = true;
}
condition.notify_all ();
state_block_signature_verification.stop ();
}

void nano::block_processor::flush ()
{
node.checker.flush ();
nano::unique_lock<std::mutex> lock (mutex);
while (!stopped && (have_blocks () || active))
while (!stopped && (have_blocks () || active || state_block_signature_verification.is_active ()))
{
condition.wait (lock);
}
Expand All @@ -48,7 +54,7 @@ void nano::block_processor::flush ()
size_t nano::block_processor::size ()
{
nano::unique_lock<std::mutex> lock (mutex);
return (blocks.size () + state_blocks.size () + forced.size ());
return (blocks.size () + state_block_signature_verification.size () + forced.size ());
}

bool nano::block_processor::full ()
Expand All @@ -71,6 +77,7 @@ void nano::block_processor::add (nano::unchecked_info const & info_a)
{
if (!nano::work_validate (nano::work_version::work_1, info_a.block->root (), info_a.block->block_work ()))
{
bool should_notify{ false };
{
auto hash (info_a.block->hash ());
auto filter_hash (filter_item (hash, info_a.block->block_signature ()));
Expand All @@ -79,16 +86,21 @@ void nano::block_processor::add (nano::unchecked_info const & info_a)
{
if (info_a.verified == nano::signature_verification::unknown && (info_a.block->type () == nano::block_type::state || info_a.block->type () == nano::block_type::open || !info_a.account.is_zero ()))
{
state_blocks.push_back (info_a);
state_block_signature_verification.add (info_a);
}
else
{
blocks.push_back (info_a);
should_notify = true;
}
blocks_filter.insert (filter_hash);
}
}
condition.notify_all ();

if (should_notify)
{
condition.notify_all ();
}
}
else
{
Expand Down Expand Up @@ -117,7 +129,7 @@ void nano::block_processor::process_blocks ()
nano::unique_lock<std::mutex> lock (mutex);
while (!stopped)
{
if (have_blocks ())
if (!blocks.empty () || !forced.empty ())
{
active = true;
lock.unlock ();
Expand All @@ -127,7 +139,7 @@ void nano::block_processor::process_blocks ()
}
else
{
condition.notify_all ();
condition.notify_one ();
condition.wait (lock);
}
}
Expand All @@ -148,71 +160,14 @@ bool nano::block_processor::should_log (bool first_time)
bool nano::block_processor::have_blocks ()
{
assert (!mutex.try_lock ());
return !blocks.empty () || !forced.empty () || !state_blocks.empty ();
return !blocks.empty () || !forced.empty () || state_block_signature_verification.size () != 0;
}

void nano::block_processor::verify_state_blocks (nano::unique_lock<std::mutex> & lock_a, size_t max_count)
void nano::block_processor::process_verified_state_blocks (std::deque<nano::unchecked_info> & items, std::vector<int> const & verifications, std::vector<nano::block_hash> const & hashes, std::vector<nano::signature> const & blocks_signatures)
{
assert (!mutex.try_lock ());
nano::timer<std::chrono::milliseconds> timer_l (nano::timer_state::started);
std::deque<nano::unchecked_info> items;
if (state_blocks.size () <= max_count)
{
items.swap (state_blocks);
}
else
{
for (auto i (0); i < max_count; ++i)
{
items.push_back (state_blocks.front ());
state_blocks.pop_front ();
}
assert (!state_blocks.empty ());
}
lock_a.unlock ();
if (!items.empty ())
{
auto size (items.size ());
std::vector<nano::block_hash> hashes;
hashes.reserve (size);
std::vector<unsigned char const *> messages;
messages.reserve (size);
std::vector<size_t> lengths;
lengths.reserve (size);
std::vector<nano::account> accounts;
accounts.reserve (size);
std::vector<unsigned char const *> pub_keys;
pub_keys.reserve (size);
std::vector<nano::signature> blocks_signatures;
blocks_signatures.reserve (size);
std::vector<unsigned char const *> signatures;
signatures.reserve (size);
std::vector<int> verifications;
verifications.resize (size, 0);
for (auto i (0); i < size; ++i)
{
auto & item (items[i]);
hashes.push_back (item.block->hash ());
messages.push_back (hashes.back ().bytes.data ());
lengths.push_back (sizeof (decltype (hashes)::value_type));
nano::account account (item.block->account ());
if (!item.block->link ().is_zero () && node.ledger.is_epoch_link (item.block->link ()))
{
account = node.ledger.epoch_signer (item.block->link ());
}
else if (!item.account.is_zero ())
{
account = item.account;
}
accounts.push_back (account);
pub_keys.push_back (accounts.back ().bytes.data ());
blocks_signatures.push_back (item.block->block_signature ());
signatures.push_back (blocks_signatures.back ().bytes.data ());
}
nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data () };
node.checker.verify (check);
lock_a.lock ();
for (auto i (0); i < size; ++i)
nano::unique_lock<std::mutex> lk (mutex);
for (auto i (0); i < verifications.size (); ++i)
{
assert (verifications[i] == 1 || verifications[i] == 0);
auto & item (items.front ());
Expand Down Expand Up @@ -244,39 +199,17 @@ void nano::block_processor::verify_state_blocks (nano::unique_lock<std::mutex> &
}
items.pop_front ();
}
if (node.config.logging.timing_logging ())
{
node.logger.try_log (boost::str (boost::format ("Batch verified %1% state blocks in %2% %3%") % size % timer_l.stop ().count () % timer_l.unit ()));
}
}
else
{
lock_a.lock ();
}
condition.notify_all ();
}

void nano::block_processor::process_batch (nano::unique_lock<std::mutex> & lock_a)
{
nano::timer<std::chrono::milliseconds> timer_l;
lock_a.lock ();
timer_l.start ();
// Limit state blocks verification time

{
if (!state_blocks.empty ())
{
size_t max_verification_batch (node.flags.block_processor_verification_size != 0 ? node.flags.block_processor_verification_size : 2048 * (node.config.signature_checker_threads + 1));
while (!state_blocks.empty () && timer_l.before_deadline (std::chrono::seconds (2)))
{
verify_state_blocks (lock_a, max_verification_batch);
}
}
}
lock_a.unlock ();
auto scoped_write_guard = write_database_queue.wait (nano::writer::process_batch);
auto transaction (node.store.tx_begin_write ({ tables::accounts, nano::tables::cached_counts, nano::tables::change_blocks, tables::frontiers, tables::open_blocks, tables::pending, tables::receive_blocks, tables::representation, tables::send_blocks, tables::state_blocks, tables::unchecked }, { tables::confirmation_height }));
timer_l.restart ();
nano::timer<std::chrono::milliseconds> timer_l;
lock_a.lock ();
timer_l.start ();
// Processing blocks
auto first_time (true);
unsigned number_of_blocks_processed (0), number_of_forced_processed (0);
Expand All @@ -292,7 +225,7 @@ void nano::block_processor::process_batch (nano::unique_lock<std::mutex> & lock_
}
else
{
if (((blocks.size () + state_blocks.size () + forced.size ()) > 64 && should_log (false)))
if (((blocks.size () + state_block_signature_verification.size () + forced.size ()) > 64 && should_log (false)))
{
log_this_record = true;
}
Expand All @@ -301,7 +234,7 @@ void nano::block_processor::process_batch (nano::unique_lock<std::mutex> & lock_
if (log_this_record)
{
first_time = false;
node.logger.always_log (boost::str (boost::format ("%1% blocks (+ %2% state blocks) (+ %3% forced) in processing queue") % blocks.size () % state_blocks.size () % forced.size ()));
node.logger.always_log (boost::str (boost::format ("%1% blocks (+ %2% state blocks) (+ %3% forced) in processing queue") % blocks.size () % state_block_signature_verification.size () % forced.size ()));
}
nano::unchecked_info info;
nano::block_hash hash (0);
Expand Down Expand Up @@ -354,12 +287,6 @@ void nano::block_processor::process_batch (nano::unique_lock<std::mutex> & lock_
number_of_blocks_processed++;
process_one (transaction, info);
lock_a.lock ();
/* Verify more state blocks if blocks deque is empty
Because verification is long process, avoid large deque verification inside of write transaction */
if (blocks.empty () && !state_blocks.empty ())
{
verify_state_blocks (lock_a, 256 * (node.config.signature_checker_threads + 1));
}
}
awaiting_write = false;
lock_a.unlock ();
Expand Down Expand Up @@ -587,3 +514,25 @@ void nano::block_processor::requeue_invalid (nano::block_hash const & hash_a, na
attempt->lazy_requeue (hash_a, info_a.block->previous (), info_a.confirmed);
}
}

std::unique_ptr<nano::container_info_component> nano::collect_container_info (block_processor & block_processor, const std::string & name)
{
size_t blocks_count;
size_t blocks_filter_count;
size_t forced_count;

{
nano::lock_guard<std::mutex> guard (block_processor.mutex);
blocks_count = block_processor.blocks.size ();
blocks_filter_count = block_processor.blocks_filter.size ();
forced_count = block_processor.forced.size ();
}

auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (collect_container_info (block_processor.state_block_signature_verification, "state_block_signature_verification"));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocks", blocks_count, sizeof (decltype (block_processor.blocks)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocks_filter", blocks_filter_count, sizeof (decltype (block_processor.blocks_filter)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "forced", forced_count, sizeof (decltype (block_processor.forced)::value_type) }));
composite->add_component (collect_container_info (block_processor.generator, "generator"));
return composite;
}
6 changes: 4 additions & 2 deletions nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <nano/lib/blocks.hpp>
#include <nano/node/state_block_signature_verification.hpp>
#include <nano/node/voting.hpp>
#include <nano/secure/common.hpp>

Expand Down Expand Up @@ -50,15 +51,14 @@ class block_processor final

private:
void queue_unchecked (nano::write_transaction const &, nano::block_hash const &);
void verify_state_blocks (nano::unique_lock<std::mutex> &, size_t = std::numeric_limits<size_t>::max ());
void process_batch (nano::unique_lock<std::mutex> &);
void process_live (nano::block_hash const &, std::shared_ptr<nano::block>, const bool = false);
void requeue_invalid (nano::block_hash const &, nano::unchecked_info const &);
void process_verified_state_blocks (std::deque<nano::unchecked_info> &, std::vector<int> const &, std::vector<nano::block_hash> const &, std::vector<nano::signature> const &);
bool stopped;
bool active;
bool awaiting_write{ false };
std::chrono::steady_clock::time_point next_log;
std::deque<nano::unchecked_info> state_blocks;
std::deque<nano::unchecked_info> blocks;
std::deque<std::shared_ptr<nano::block>> forced;
nano::block_hash filter_item (nano::block_hash const &, nano::signature const &);
Expand All @@ -67,7 +67,9 @@ class block_processor final
nano::node & node;
nano::write_database_queue & write_database_queue;
std::mutex mutex;
nano::state_block_signature_verification state_block_signature_verification;

friend std::unique_ptr<container_info_component> collect_container_info (block_processor & block_processor, const std::string & name);
};
std::unique_ptr<nano::container_info_component> collect_container_info (block_processor & block_processor, const std::string & name);
}
24 changes: 0 additions & 24 deletions nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,30 +79,6 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (re
return composite;
}

std::unique_ptr<nano::container_info_component> nano::collect_container_info (block_processor & block_processor, const std::string & name)
{
size_t state_blocks_count;
size_t blocks_count;
size_t blocks_filter_count;
size_t forced_count;

{
nano::lock_guard<std::mutex> guard (block_processor.mutex);
state_blocks_count = block_processor.state_blocks.size ();
blocks_count = block_processor.blocks.size ();
blocks_filter_count = block_processor.blocks_filter.size ();
forced_count = block_processor.forced.size ();
}

auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "state_blocks", state_blocks_count, sizeof (decltype (block_processor.state_blocks)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocks", blocks_count, sizeof (decltype (block_processor.blocks)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocks_filter", blocks_filter_count, sizeof (decltype (block_processor.blocks_filter)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "forced", forced_count, sizeof (decltype (block_processor.forced)::value_type) }));
composite->add_component (collect_container_info (block_processor.generator, "generator"));
return composite;
}

nano::node::node (boost::asio::io_context & io_ctx_a, uint16_t peering_port_a, boost::filesystem::path const & application_path_a, nano::alarm & alarm_a, nano::logging const & logging_a, nano::work_pool & work_a, nano::node_flags flags_a) :
node (io_ctx_a, application_path_a, alarm_a, nano::node_config (peering_port_a, logging_a), work_a, flags_a)
{
Expand Down
1 change: 0 additions & 1 deletion nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ class block_arrival final
std::unique_ptr<container_info_component> collect_container_info (block_arrival & block_arrival, const std::string & name);

std::unique_ptr<container_info_component> collect_container_info (rep_crawler & rep_crawler, const std::string & name);
std::unique_ptr<container_info_component> collect_container_info (block_processor & block_processor, const std::string & name);

class node final : public std::enable_shared_from_this<nano::node>
{
Expand Down
2 changes: 1 addition & 1 deletion nano/node/nodeconfig.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class node_config
unsigned io_threads{ std::max<unsigned> (4, std::thread::hardware_concurrency ()) };
unsigned network_threads{ std::max<unsigned> (4, std::thread::hardware_concurrency ()) };
unsigned work_threads{ std::max<unsigned> (4, std::thread::hardware_concurrency ()) };
unsigned signature_checker_threads{ (std::thread::hardware_concurrency () != 0) ? std::thread::hardware_concurrency () - 1 : 0 }; /* The calling thread does checks as well so remove it from the number of threads used */
unsigned signature_checker_threads{ (std::thread::hardware_concurrency () / 2 > 1) ? std::thread::hardware_concurrency () / 2 - 1 : 0 }; /* The calling thread does checks as well so remove it from the number of threads used */
bool enable_voting{ false };
unsigned bootstrap_connections{ 4 };
unsigned bootstrap_connections_max{ 64 };
Expand Down
Loading