From 48ed5c01c76b7b3ae6eb758370da4e94904cfabf Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Tue, 18 Feb 2020 13:04:23 +0000 Subject: [PATCH 1/8] Parallelize state block signature verification with batch block processing --- nano/lib/threading.cpp | 3 + nano/lib/threading.hpp | 3 +- nano/node/CMakeLists.txt | 10 +- nano/node/blockprocessor.cpp | 173 ++++++++---------- nano/node/blockprocessor.hpp | 6 +- nano/node/node.cpp | 24 --- nano/node/node.hpp | 1 - nano/node/signatures.cpp | 41 +---- nano/node/signatures.hpp | 3 +- .../state_block_signature_verification.cpp | 159 ++++++++++++++++ .../state_block_signature_verification.hpp | 47 +++++ 11 files changed, 302 insertions(+), 168 deletions(-) create mode 100644 nano/node/state_block_signature_verification.cpp create mode 100644 nano/node/state_block_signature_verification.hpp diff --git a/nano/lib/threading.cpp b/nano/lib/threading.cpp index a996d35a5e..8ff74e427c 100644 --- a/nano/lib/threading.cpp +++ b/nano/lib/threading.cpp @@ -72,6 +72,9 @@ std::string nano::thread_role::get_string (nano::thread_role::name role) case nano::thread_role::name::request_aggregator: thread_role_name_string = "Req aggregator"; break; + case nano::thread_role::name::state_block_signature_verification: + thread_role_name_string = "State block sig"; + break; } /* diff --git a/nano/lib/threading.hpp b/nano/lib/threading.hpp index 56f7f53181..a1f43063a0 100644 --- a/nano/lib/threading.hpp +++ b/nano/lib/threading.hpp @@ -32,7 +32,8 @@ namespace thread_role work_watcher, confirmation_height_processing, worker, - request_aggregator + request_aggregator, + state_block_signature_verification }; /* * Get/Set the identifier for the current thread diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 4c1e035a75..8f8c847b98 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -102,6 +102,12 @@ add_library (node repcrawler.cpp request_aggregator.hpp request_aggregator.cpp + signatures.hpp + signatures.cpp + socket.hpp + socket.cpp + state_block_signature_verification.hpp + state_block_signature_verification.cpp testing.hpp testing.cpp transport/tcp.hpp @@ -110,10 +116,6 @@ add_library (node transport/transport.cpp transport/udp.hpp transport/udp.cpp - signatures.hpp - signatures.cpp - socket.hpp - socket.cpp vote_processor.hpp vote_processor.cpp voting.hpp diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 79078028bf..7a6e9331d6 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -15,8 +16,12 @@ stopped (false), active (false), next_log (std::chrono::steady_clock::now ()), node (node_a), -write_database_queue (write_database_queue_a) +write_database_queue (write_database_queue_a), +state_block_signature_verification (node.checker, node.ledger.network_params.ledger.epochs, node.config, node.logger, node.flags.block_processor_verification_size) { + state_block_signature_verification.blocks_verified_callback = [this](std::deque & items, std::vector const & verifications, std::vector const & hashes, std::vector const & blocks_signatures) { + this->process_verified_state_blocks (items, verifications, hashes, blocks_signatures); + }; } nano::block_processor::~block_processor () @@ -32,6 +37,7 @@ void nano::block_processor::stop () stopped = true; } condition.notify_all (); + state_block_signature_verification.stop (); } void nano::block_processor::flush () @@ -48,7 +54,7 @@ void nano::block_processor::flush () size_t nano::block_processor::size () { nano::unique_lock lock (mutex); - return (blocks.size () + state_blocks.size () + forced.size ()); + return (blocks.size () + state_block_signature_verification.size () + forced.size ()); } bool nano::block_processor::full () @@ -71,6 +77,11 @@ void nano::block_processor::add (nano::unchecked_info const & info_a) { if (!nano::work_validate (info_a.block->root (), info_a.block->block_work ())) { + struct + { + bool condition{ false }; + bool state_block_condition{ false }; + } should_notify; { auto hash (info_a.block->hash ()); auto filter_hash (filter_item (hash, info_a.block->block_signature ())); @@ -79,7 +90,7 @@ void nano::block_processor::add (nano::unchecked_info const & info_a) { if (info_a.verified == nano::signature_verification::unknown && (info_a.block->type () == nano::block_type::state || info_a.block->type () == nano::block_type::open || !info_a.account.is_zero ())) { - state_blocks.push_back (info_a); + state_block_signature_verification.add (info_a); } else { @@ -87,8 +98,25 @@ void nano::block_processor::add (nano::unchecked_info const & info_a) } blocks_filter.insert (filter_hash); } + + if (!blocks.empty ()) + { + should_notify.condition = true; + } + if (state_block_signature_verification.size () != 0) + { + should_notify.state_block_condition = true; + } + } + + if (should_notify.condition) + { + condition.notify_all (); + } + if (should_notify.state_block_condition) + { + state_block_signature_verification.notify (); } - condition.notify_all (); } else { @@ -117,7 +145,7 @@ void nano::block_processor::process_blocks () nano::unique_lock lock (mutex); while (!stopped) { - if (have_blocks ()) + if (!blocks.empty () || !forced.empty ()) { active = true; lock.unlock (); @@ -127,7 +155,15 @@ void nano::block_processor::process_blocks () } else { - condition.notify_all (); + if (state_block_signature_verification.size () != 0) + { + state_block_signature_verification.notify (); + } + else + { + condition.notify_one (); + } + condition.wait (lock); } } @@ -148,71 +184,14 @@ bool nano::block_processor::should_log (bool first_time) bool nano::block_processor::have_blocks () { assert (!mutex.try_lock ()); - return !blocks.empty () || !forced.empty () || !state_blocks.empty (); + return !blocks.empty () || !forced.empty () || state_block_signature_verification.size () != 0; } -void nano::block_processor::verify_state_blocks (nano::unique_lock & lock_a, size_t max_count) +void nano::block_processor::process_verified_state_blocks (std::deque & items, std::vector const & verifications, std::vector const & hashes, std::vector const & blocks_signatures) { - assert (!mutex.try_lock ()); - nano::timer timer_l (nano::timer_state::started); - std::deque items; - if (state_blocks.size () <= max_count) - { - items.swap (state_blocks); - } - else - { - for (auto i (0); i < max_count; ++i) - { - items.push_back (state_blocks.front ()); - state_blocks.pop_front (); - } - assert (!state_blocks.empty ()); - } - lock_a.unlock (); - if (!items.empty ()) { - auto size (items.size ()); - std::vector hashes; - hashes.reserve (size); - std::vector messages; - messages.reserve (size); - std::vector lengths; - lengths.reserve (size); - std::vector accounts; - accounts.reserve (size); - std::vector pub_keys; - pub_keys.reserve (size); - std::vector blocks_signatures; - blocks_signatures.reserve (size); - std::vector signatures; - signatures.reserve (size); - std::vector verifications; - verifications.resize (size, 0); - for (auto i (0); i < size; ++i) - { - auto & item (items[i]); - hashes.push_back (item.block->hash ()); - messages.push_back (hashes.back ().bytes.data ()); - lengths.push_back (sizeof (decltype (hashes)::value_type)); - nano::account account (item.block->account ()); - if (!item.block->link ().is_zero () && node.ledger.is_epoch_link (item.block->link ())) - { - account = node.ledger.epoch_signer (item.block->link ()); - } - else if (!item.account.is_zero ()) - { - account = item.account; - } - accounts.push_back (account); - pub_keys.push_back (accounts.back ().bytes.data ()); - blocks_signatures.push_back (item.block->block_signature ()); - signatures.push_back (blocks_signatures.back ().bytes.data ()); - } - nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data () }; - node.checker.verify (check); - lock_a.lock (); - for (auto i (0); i < size; ++i) + nano::unique_lock lk (mutex); + for (auto i (0); i < verifications.size (); ++i) { assert (verifications[i] == 1 || verifications[i] == 0); auto & item (items.front ()); @@ -244,39 +223,17 @@ void nano::block_processor::verify_state_blocks (nano::unique_lock & } items.pop_front (); } - if (node.config.logging.timing_logging ()) - { - node.logger.try_log (boost::str (boost::format ("Batch verified %1% state blocks in %2% %3%") % size % timer_l.stop ().count () % timer_l.unit ())); - } - } - else - { - lock_a.lock (); } + condition.notify_all (); } void nano::block_processor::process_batch (nano::unique_lock & lock_a) { - nano::timer timer_l; - lock_a.lock (); - timer_l.start (); - // Limit state blocks verification time - - { - if (!state_blocks.empty ()) - { - size_t max_verification_batch (node.flags.block_processor_verification_size != 0 ? node.flags.block_processor_verification_size : 2048 * (node.config.signature_checker_threads + 1)); - while (!state_blocks.empty () && timer_l.before_deadline (std::chrono::seconds (2))) - { - verify_state_blocks (lock_a, max_verification_batch); - } - } - } - lock_a.unlock (); auto scoped_write_guard = write_database_queue.wait (nano::writer::process_batch); auto transaction (node.store.tx_begin_write ({ nano::tables::accounts, nano::tables::cached_counts, nano::tables::change_blocks, nano::tables::frontiers, nano::tables::open_blocks, nano::tables::pending, nano::tables::receive_blocks, nano::tables::representation, nano::tables::send_blocks, nano::tables::state_blocks, nano::tables::unchecked }, { nano::tables::confirmation_height })); - timer_l.restart (); + nano::timer timer_l; lock_a.lock (); + timer_l.start (); // Processing blocks auto first_time (true); unsigned number_of_blocks_processed (0), number_of_forced_processed (0); @@ -292,7 +249,7 @@ void nano::block_processor::process_batch (nano::unique_lock & lock_ } else { - if (((blocks.size () + state_blocks.size () + forced.size ()) > 64 && should_log (false))) + if (((blocks.size () + state_block_signature_verification.size () + forced.size ()) > 64 && should_log (false))) { log_this_record = true; } @@ -301,7 +258,7 @@ void nano::block_processor::process_batch (nano::unique_lock & lock_ if (log_this_record) { first_time = false; - node.logger.always_log (boost::str (boost::format ("%1% blocks (+ %2% state blocks) (+ %3% forced) in processing queue") % blocks.size () % state_blocks.size () % forced.size ())); + node.logger.always_log (boost::str (boost::format ("%1% blocks (+ %2% state blocks) (+ %3% forced) in processing queue") % blocks.size () % state_block_signature_verification.size () % forced.size ())); } nano::unchecked_info info; nano::block_hash hash (0); @@ -354,12 +311,6 @@ void nano::block_processor::process_batch (nano::unique_lock & lock_ number_of_blocks_processed++; process_one (transaction, info); lock_a.lock (); - /* Verify more state blocks if blocks deque is empty - Because verification is long process, avoid large deque verification inside of write transaction */ - if (blocks.empty () && !state_blocks.empty ()) - { - verify_state_blocks (lock_a, 256 * (node.config.signature_checker_threads + 1)); - } } awaiting_write = false; lock_a.unlock (); @@ -587,3 +538,25 @@ void nano::block_processor::requeue_invalid (nano::block_hash const & hash_a, na attempt->lazy_requeue (hash_a, info_a.block->previous (), info_a.confirmed); } } + +std::unique_ptr nano::collect_container_info (block_processor & block_processor, const std::string & name) +{ + size_t blocks_count; + size_t blocks_filter_count; + size_t forced_count; + + { + nano::lock_guard guard (block_processor.mutex); + blocks_count = block_processor.blocks.size (); + blocks_filter_count = block_processor.blocks_filter.size (); + forced_count = block_processor.forced.size (); + } + + auto composite = std::make_unique (name); + composite->add_component (collect_container_info (block_processor.state_block_signature_verification, "state_block_signature_verification")); + composite->add_component (std::make_unique (container_info{ "blocks", blocks_count, sizeof (decltype (block_processor.blocks)::value_type) })); + composite->add_component (std::make_unique (container_info{ "blocks_filter", blocks_filter_count, sizeof (decltype (block_processor.blocks_filter)::value_type) })); + composite->add_component (std::make_unique (container_info{ "forced", forced_count, sizeof (decltype (block_processor.forced)::value_type) })); + composite->add_component (collect_container_info (block_processor.generator, "generator")); + return composite; +} \ No newline at end of file diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 15614d42dd..27c2ae4823 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include #include @@ -50,15 +51,14 @@ class block_processor final private: void queue_unchecked (nano::write_transaction const &, nano::block_hash const &); - void verify_state_blocks (nano::unique_lock &, size_t = std::numeric_limits::max ()); void process_batch (nano::unique_lock &); void process_live (nano::block_hash const &, std::shared_ptr, const bool = false); void requeue_invalid (nano::block_hash const &, nano::unchecked_info const &); + void process_verified_state_blocks (std::deque &, std::vector const &, std::vector const &, std::vector const &); bool stopped; bool active; bool awaiting_write{ false }; std::chrono::steady_clock::time_point next_log; - std::deque state_blocks; std::deque blocks; std::deque> forced; nano::block_hash filter_item (nano::block_hash const &, nano::signature const &); @@ -67,7 +67,9 @@ class block_processor final nano::node & node; nano::write_database_queue & write_database_queue; std::mutex mutex; + nano::state_block_signature_verification state_block_signature_verification; friend std::unique_ptr collect_container_info (block_processor & block_processor, const std::string & name); }; +std::unique_ptr collect_container_info (block_processor & block_processor, const std::string & name); } diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 2c674dc7b1..8f8ecbe646 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -79,30 +79,6 @@ std::unique_ptr nano::collect_container_info (re return composite; } -std::unique_ptr nano::collect_container_info (block_processor & block_processor, const std::string & name) -{ - size_t state_blocks_count; - size_t blocks_count; - size_t blocks_filter_count; - size_t forced_count; - - { - nano::lock_guard guard (block_processor.mutex); - state_blocks_count = block_processor.state_blocks.size (); - blocks_count = block_processor.blocks.size (); - blocks_filter_count = block_processor.blocks_filter.size (); - forced_count = block_processor.forced.size (); - } - - auto composite = std::make_unique (name); - composite->add_component (std::make_unique (container_info{ "state_blocks", state_blocks_count, sizeof (decltype (block_processor.state_blocks)::value_type) })); - composite->add_component (std::make_unique (container_info{ "blocks", blocks_count, sizeof (decltype (block_processor.blocks)::value_type) })); - composite->add_component (std::make_unique (container_info{ "blocks_filter", blocks_filter_count, sizeof (decltype (block_processor.blocks_filter)::value_type) })); - composite->add_component (std::make_unique (container_info{ "forced", forced_count, sizeof (decltype (block_processor.forced)::value_type) })); - composite->add_component (collect_container_info (block_processor.generator, "generator")); - return composite; -} - nano::node::node (boost::asio::io_context & io_ctx_a, uint16_t peering_port_a, boost::filesystem::path const & application_path_a, nano::alarm & alarm_a, nano::logging const & logging_a, nano::work_pool & work_a, nano::node_flags flags_a) : node (io_ctx_a, application_path_a, alarm_a, nano::node_config (peering_port_a, logging_a), work_a, flags_a) { diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 0938cb3b05..b8cefe5fd1 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -82,7 +82,6 @@ class block_arrival final std::unique_ptr collect_container_info (block_arrival & block_arrival, const std::string & name); std::unique_ptr collect_container_info (rep_crawler & rep_crawler, const std::string & name); -std::unique_ptr collect_container_info (block_processor & block_processor, const std::string & name); class node final : public std::enable_shared_from_this { diff --git a/nano/node/signatures.cpp b/nano/node/signatures.cpp index 502d250302..699e46bc33 100644 --- a/nano/node/signatures.cpp +++ b/nano/node/signatures.cpp @@ -22,13 +22,10 @@ nano::signature_checker::~signature_checker () void nano::signature_checker::verify (nano::signature_check_set & check_a) { + // Don't process anything else if we have stopped + if (stopped) { - // Don't process anything else if we have stopped - nano::lock_guard guard (mutex); - if (stopped) - { - return; - } + return; } if (check_a.size < multithreaded_cutoff || single_threaded) @@ -76,32 +73,26 @@ void nano::signature_checker::verify (nano::signature_check_set & check_a) void nano::signature_checker::stop () { - nano::lock_guard guard (mutex); - if (!stopped) + if (!stopped.exchange (true)) { - stopped = true; thread_pool.join (); } } void nano::signature_checker::flush () { - nano::lock_guard guard (mutex); while (!stopped && tasks_remaining != 0) ; } bool nano::signature_checker::verify_batch (const nano::signature_check_set & check_a, size_t start_index, size_t size) { - /* Returns false if there are at least 1 invalid signature */ - auto code (nano::validate_message_batch (check_a.messages + start_index, check_a.message_lengths + start_index, check_a.pub_keys + start_index, check_a.signatures + start_index, size, check_a.verifications + start_index)); - (void)code; - + nano::validate_message_batch (check_a.messages + start_index, check_a.message_lengths + start_index, check_a.pub_keys + start_index, check_a.signatures + start_index, size, check_a.verifications + start_index); return std::all_of (check_a.verifications + start_index, check_a.verifications + start_index + size, [](int verification) { return verification == 0 || verification == 1; }); } /* This operates on a number of signatures of size (num_batches * batch_size) from the beginning of the check_a pointers. - * Caller should check the value of the promise which indicateswhen the work has been completed. + * Caller should check the value of the promise which indicates when the work has been completed. */ void nano::signature_checker::verify_async (nano::signature_check_set & check_a, size_t num_batches, std::promise & promise) { @@ -129,10 +120,6 @@ void nano::signature_checker::verify_async (nano::signature_check_set & check_a, // Set the names of all the threads in the thread pool for easier identification void nano::signature_checker::set_thread_names (unsigned num_threads) { - auto ready = false; - auto pending = num_threads; - nano::condition_variable cv; - std::vector> promises (num_threads); std::vector> futures; futures.reserve (num_threads); @@ -142,21 +129,8 @@ void nano::signature_checker::set_thread_names (unsigned num_threads) for (auto i = 0u; i < num_threads; ++i) { - boost::asio::post (thread_pool, [&cv, &ready, &pending, &mutex = mutex, &promise = promises[i]]() { - nano::unique_lock lk (mutex); + boost::asio::post (thread_pool, [& promise = promises[i]]() { nano::thread_role::set (nano::thread_role::name::signature_checking); - if (--pending == 0) - { - // All threads have been reached - ready = true; - lk.unlock (); - cv.notify_all (); - } - else - { - // We need to wait until the other threads are finished - cv.wait (lk, [&ready]() { return ready; }); - } promise.set_value (); }); } @@ -166,5 +140,4 @@ void nano::signature_checker::set_thread_names (unsigned num_threads) { future.wait (); } - assert (pending == 0); } diff --git a/nano/node/signatures.hpp b/nano/node/signatures.hpp index 61a6bb7a97..56419b5805 100644 --- a/nano/node/signatures.hpp +++ b/nano/node/signatures.hpp @@ -60,7 +60,6 @@ class signature_checker final static constexpr size_t batch_size = 256; const bool single_threaded; unsigned num_threads; - std::mutex mutex; - bool stopped{ false }; + std::atomic stopped{ false }; }; } diff --git a/nano/node/state_block_signature_verification.cpp b/nano/node/state_block_signature_verification.cpp new file mode 100644 index 0000000000..d8e6ad733d --- /dev/null +++ b/nano/node/state_block_signature_verification.cpp @@ -0,0 +1,159 @@ +#include +#include +#include +#include +#include +#include +#include + +#include + +nano::state_block_signature_verification::state_block_signature_verification (nano::signature_checker & signature_checker, nano::epochs & epochs, nano::node_config & node_config, nano::logger_mt & logger, uint64_t state_block_signature_verification_size) : +signature_checker (signature_checker), +epochs (epochs), +node_config (node_config), +logger (logger), +thread ([this, state_block_signature_verification_size]() { + nano::thread_role::set (nano::thread_role::name::state_block_signature_verification); + this->run (state_block_signature_verification_size); +}) +{ +} + +nano::state_block_signature_verification::~state_block_signature_verification () +{ + stop (); +} + +void nano::state_block_signature_verification::stop () +{ + { + nano::lock_guard guard (mutex); + stopped = true; + } + + if (thread.joinable ()) + { + condition.notify_one (); + thread.join (); + } +} + +void nano::state_block_signature_verification::run (uint64_t state_block_signature_verification_size) +{ + nano::unique_lock lk (mutex); + while (!stopped) + { + if (!state_blocks.empty ()) + { + size_t const max_verification_batch (state_block_signature_verification_size != 0 ? state_block_signature_verification_size : 256 * (node_config.signature_checker_threads + 1)); + while (!state_blocks.empty () && !stopped) + { + auto items = setup_items (max_verification_batch); + lk.unlock (); + verify_state_blocks (items); + lk.lock (); + } + } + else + { + condition.wait (lk); + } + } +} + +void nano::state_block_signature_verification::notify () +{ + nano::lock_guard guard (mutex); + condition.notify_one (); +} + +void nano::state_block_signature_verification::add (nano::unchecked_info const & info_a) +{ + nano::lock_guard guard (mutex); + state_blocks.push_back (info_a); +} + +size_t nano::state_block_signature_verification::size () +{ + nano::lock_guard guard (mutex); + return state_blocks.size (); +} + +std::deque nano::state_block_signature_verification::setup_items (size_t max_count) +{ + std::deque items; + if (state_blocks.size () <= max_count) + { + items.swap (state_blocks); + } + else + { + for (auto i (0); i < max_count; ++i) + { + items.push_back (state_blocks.front ()); + state_blocks.pop_front (); + } + assert (!state_blocks.empty ()); + } + return items; +} + +void nano::state_block_signature_verification::verify_state_blocks (std::deque & items) +{ + if (!items.empty ()) + { + nano::timer<> timer_l; + timer_l.start (); + auto size (items.size ()); + std::vector hashes; + hashes.reserve (size); + std::vector messages; + messages.reserve (size); + std::vector lengths; + lengths.reserve (size); + std::vector accounts; + accounts.reserve (size); + std::vector pub_keys; + pub_keys.reserve (size); + std::vector blocks_signatures; + blocks_signatures.reserve (size); + std::vector signatures; + signatures.reserve (size); + std::vector verifications; + verifications.resize (size, 0); + for (auto & item : items) + { + hashes.push_back (item.block->hash ()); + messages.push_back (hashes.back ().bytes.data ()); + lengths.push_back (sizeof (decltype (hashes)::value_type)); + nano::account account (item.block->account ()); + if (!item.block->link ().is_zero () && epochs.is_epoch_link (item.block->link ())) + { + account = epochs.signer (epochs.epoch (item.block->link ())); + } + else if (!item.account.is_zero ()) + { + account = item.account; + } + accounts.push_back (account); + pub_keys.push_back (accounts.back ().bytes.data ()); + blocks_signatures.push_back (item.block->block_signature ()); + signatures.push_back (blocks_signatures.back ().bytes.data ()); + } + nano::signature_check_set check = { size, messages.data (), lengths.data (), pub_keys.data (), signatures.data (), verifications.data () }; + signature_checker.verify (check); + if (node_config.logging.timing_logging ()) + { + logger.try_log (boost::str (boost::format ("Batch verified %1% state blocks in %2% %3%") % size % timer_l.stop ().count () % timer_l.unit ())); + } + blocks_verified_callback (items, verifications, hashes, blocks_signatures); + } +} + +std::unique_ptr nano::collect_container_info (state_block_signature_verification & state_block_signature_verification, const std::string & name) +{ + auto composite = std::make_unique (name); + composite->add_component (std::make_unique (container_info{ "state_blocks", state_block_signature_verification.size (), sizeof (nano::unchecked_info) })); + return composite; +} diff --git a/nano/node/state_block_signature_verification.hpp b/nano/node/state_block_signature_verification.hpp new file mode 100644 index 0000000000..017ee68138 --- /dev/null +++ b/nano/node/state_block_signature_verification.hpp @@ -0,0 +1,47 @@ +#pragma once + +#include +#include + +#include +#include +#include + +namespace nano +{ +class epochs; +class logger_mt; +class node_config; +class signature_checker; + +class state_block_signature_verification +{ +public: + state_block_signature_verification (nano::signature_checker &, nano::epochs &, nano::node_config &, nano::logger_mt &, uint64_t); + ~state_block_signature_verification (); + void add (nano::unchecked_info const & info_a); + size_t size (); + void stop (); + void notify (); + + std::function &, std::vector const &, std::vector const &, std::vector const &)> blocks_verified_callback; + +private: + nano::signature_checker & signature_checker; + nano::epochs & epochs; + nano::node_config & node_config; + nano::logger_mt & logger; + + std::mutex mutex; + bool stopped{ false }; + std::deque state_blocks; + nano::condition_variable condition; + std::thread thread; + + void run (uint64_t block_processor_verification_size); + std::deque setup_items (size_t); + void verify_state_blocks (std::deque &); +}; + +std::unique_ptr collect_container_info (state_block_signature_verification & state_block_signature_verification, const std::string & name); +} From b66219fa69a3069a42a266b57f796a9372ade567 Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Tue, 18 Feb 2020 14:26:14 +0000 Subject: [PATCH 2/8] Make sure all state blocks are flushed too --- nano/node/blockprocessor.cpp | 2 +- nano/node/state_block_signature_verification.cpp | 8 ++++++++ nano/node/state_block_signature_verification.hpp | 2 ++ 3 files changed, 11 insertions(+), 1 deletion(-) diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 7a6e9331d6..c611138e31 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -44,7 +44,7 @@ void nano::block_processor::flush () { node.checker.flush (); nano::unique_lock lock (mutex); - while (!stopped && (have_blocks () || active)) + while (!stopped && (have_blocks () || active || state_block_signature_verification.is_active ())) { condition.wait (lock); } diff --git a/nano/node/state_block_signature_verification.cpp b/nano/node/state_block_signature_verification.cpp index d8e6ad733d..29602160a1 100644 --- a/nano/node/state_block_signature_verification.cpp +++ b/nano/node/state_block_signature_verification.cpp @@ -47,6 +47,7 @@ void nano::state_block_signature_verification::run (uint64_t state_block_signatu if (!state_blocks.empty ()) { size_t const max_verification_batch (state_block_signature_verification_size != 0 ? state_block_signature_verification_size : 256 * (node_config.signature_checker_threads + 1)); + active = true; while (!state_blocks.empty () && !stopped) { auto items = setup_items (max_verification_batch); @@ -54,6 +55,7 @@ void nano::state_block_signature_verification::run (uint64_t state_block_signatu verify_state_blocks (items); lk.lock (); } + active = false; } else { @@ -62,6 +64,12 @@ void nano::state_block_signature_verification::run (uint64_t state_block_signatu } } +bool nano::state_block_signature_verification::is_active () +{ + nano::lock_guard guard (mutex); + return active; +} + void nano::state_block_signature_verification::notify () { nano::lock_guard guard (mutex); diff --git a/nano/node/state_block_signature_verification.hpp b/nano/node/state_block_signature_verification.hpp index 017ee68138..43b0257013 100644 --- a/nano/node/state_block_signature_verification.hpp +++ b/nano/node/state_block_signature_verification.hpp @@ -23,6 +23,7 @@ class state_block_signature_verification size_t size (); void stop (); void notify (); + bool is_active (); std::function &, std::vector const &, std::vector const &, std::vector const &)> blocks_verified_callback; @@ -34,6 +35,7 @@ class state_block_signature_verification std::mutex mutex; bool stopped{ false }; + bool active{ false }; std::deque state_blocks; nano::condition_variable condition; std::thread thread; From b7400377fa54284d5a6e752a397d6f9649b84903 Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Wed, 19 Feb 2020 20:17:50 +0000 Subject: [PATCH 3/8] Serg review comment about notification cleanup --- nano/node/blockprocessor.cpp | 32 +++---------------- .../state_block_signature_verification.cpp | 13 +++----- .../state_block_signature_verification.hpp | 1 - 3 files changed, 9 insertions(+), 37 deletions(-) diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 32b7ed662c..24cb9798a6 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -77,11 +77,7 @@ void nano::block_processor::add (nano::unchecked_info const & info_a) { if (!nano::work_validate (nano::work_version::work_1, info_a.block->root (), info_a.block->block_work ())) { - struct - { - bool condition{ false }; - bool state_block_condition{ false }; - } should_notify; + bool should_notify{ false }; { auto hash (info_a.block->hash ()); auto filter_hash (filter_item (hash, info_a.block->block_signature ())); @@ -95,28 +91,16 @@ void nano::block_processor::add (nano::unchecked_info const & info_a) else { blocks.push_back (info_a); + should_notify = true; } blocks_filter.insert (filter_hash); } - - if (!blocks.empty ()) - { - should_notify.condition = true; - } - if (state_block_signature_verification.size () != 0) - { - should_notify.state_block_condition = true; - } } - if (should_notify.condition) + if (should_notify) { condition.notify_all (); } - if (should_notify.state_block_condition) - { - state_block_signature_verification.notify (); - } } else { @@ -155,15 +139,7 @@ void nano::block_processor::process_blocks () } else { - if (state_block_signature_verification.size () != 0) - { - state_block_signature_verification.notify (); - } - else - { - condition.notify_one (); - } - + condition.notify_one (); condition.wait (lock); } } diff --git a/nano/node/state_block_signature_verification.cpp b/nano/node/state_block_signature_verification.cpp index 29602160a1..c69814f483 100644 --- a/nano/node/state_block_signature_verification.cpp +++ b/nano/node/state_block_signature_verification.cpp @@ -70,16 +70,13 @@ bool nano::state_block_signature_verification::is_active () return active; } -void nano::state_block_signature_verification::notify () -{ - nano::lock_guard guard (mutex); - condition.notify_one (); -} - void nano::state_block_signature_verification::add (nano::unchecked_info const & info_a) { - nano::lock_guard guard (mutex); - state_blocks.push_back (info_a); + { + nano::lock_guard guard (mutex); + state_blocks.push_back (info_a); + } + condition.notify_one (); } size_t nano::state_block_signature_verification::size () diff --git a/nano/node/state_block_signature_verification.hpp b/nano/node/state_block_signature_verification.hpp index 43b0257013..7af89bfd8f 100644 --- a/nano/node/state_block_signature_verification.hpp +++ b/nano/node/state_block_signature_verification.hpp @@ -22,7 +22,6 @@ class state_block_signature_verification void add (nano::unchecked_info const & info_a); size_t size (); void stop (); - void notify (); bool is_active (); std::function &, std::vector const &, std::vector const &, std::vector const &)> blocks_verified_callback; From 6b102d6f4e75cf04b25d87e455f8b6a0be054380 Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Thu, 20 Feb 2020 09:53:57 +0000 Subject: [PATCH 4/8] Use half the amount of threads for the signature checker --- nano/node/nodeconfig.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index ef01a5d362..5e1f57b7c9 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -59,7 +59,7 @@ class node_config unsigned io_threads{ std::max (4, std::thread::hardware_concurrency ()) }; unsigned network_threads{ std::max (4, std::thread::hardware_concurrency ()) }; unsigned work_threads{ std::max (4, std::thread::hardware_concurrency ()) }; - unsigned signature_checker_threads{ (std::thread::hardware_concurrency () != 0) ? std::thread::hardware_concurrency () - 1 : 0 }; /* The calling thread does checks as well so remove it from the number of threads used */ + unsigned signature_checker_threads{ (std::thread::hardware_concurrency () / 2 > 1) ? std::thread::hardware_concurrency () / 2 - 1 : 0 }; /* The calling thread does checks as well so remove it from the number of threads used */ bool enable_voting{ false }; unsigned bootstrap_connections{ 4 }; unsigned bootstrap_connections_max{ 64 }; From 7750570eaab1d8caa2cc9cbc5ae35b862e2fcb40 Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Wed, 11 Mar 2020 17:31:55 +0000 Subject: [PATCH 5/8] Remove multithreaded cutoff --- nano/node/signatures.cpp | 2 +- nano/node/signatures.hpp | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/nano/node/signatures.cpp b/nano/node/signatures.cpp index 699e46bc33..ff4d2dea19 100644 --- a/nano/node/signatures.cpp +++ b/nano/node/signatures.cpp @@ -28,7 +28,7 @@ void nano::signature_checker::verify (nano::signature_check_set & check_a) return; } - if (check_a.size < multithreaded_cutoff || single_threaded) + if (check_a.size <= batch_size || single_threaded) { // Not dealing with many so just use the calling thread for checking signatures auto result = verify_batch (check_a, 0, check_a.size); diff --git a/nano/node/signatures.hpp b/nano/node/signatures.hpp index 56419b5805..72f26b9fc2 100644 --- a/nano/node/signatures.hpp +++ b/nano/node/signatures.hpp @@ -55,8 +55,6 @@ class signature_checker final void set_thread_names (unsigned num_threads); boost::asio::thread_pool thread_pool; std::atomic tasks_remaining{ 0 }; - /** minimum signature_check_set size eligible to be multithreaded */ - static constexpr size_t multithreaded_cutoff = 513; static constexpr size_t batch_size = 256; const bool single_threaded; unsigned num_threads; From 406434acc621ba5181f823f6d1082e0754b05b86 Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Thu, 12 Mar 2020 08:46:03 +0000 Subject: [PATCH 6/8] Use n/2 extra threads. Handles odd number of CPU threads too. --- nano/node/nodeconfig.cpp | 2 +- nano/node/nodeconfig.hpp | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/nano/node/nodeconfig.cpp b/nano/node/nodeconfig.cpp index 74ff6542c3..a00fea859c 100644 --- a/nano/node/nodeconfig.cpp +++ b/nano/node/nodeconfig.cpp @@ -75,7 +75,7 @@ nano::error nano::node_config::serialize_toml (nano::tomlconfig & toml) const toml.put ("io_threads", io_threads, "Number of threads dedicated to I/O opeations. Defaults to the number of CPU threads, and at least 4.\ntype:uint64"); toml.put ("network_threads", network_threads, "Number of threads dedicated to processing network messages. Defaults to the number of CPU threads, and at least 4.\ntype:uint64"); toml.put ("work_threads", work_threads, "Number of threads dedicated to CPU generated work. Defaults to all available CPU threads.\ntype:uint64"); - toml.put ("signature_checker_threads", signature_checker_threads, "Number of additional threads dedicated to signature verification. Defaults to the number of CPU threads minus 1.\ntype:uint64"); + toml.put ("signature_checker_threads", signature_checker_threads, "Number of additional threads dedicated to signature verification. Defaults to number of CPU threads / 2.\ntype:uint64"); toml.put ("enable_voting", enable_voting, "Enable or disable voting. Enabling this option requires additional system resources, namely increased CPU, bandwidth and disk usage.\ntype:bool"); toml.put ("bootstrap_connections", bootstrap_connections, "Number of outbound bootstrap connections. Must be a power of 2. Defaults to 4.\nWarning: a larger amount of connections may use substantially more system memory.\ntype:uint64"); toml.put ("bootstrap_connections_max", bootstrap_connections_max, "Maximum number of inbound bootstrap connections. Defaults to 64.\nWarning: a larger amount of connections may use additional system memory.\ntype:uint64"); diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 0e776dff4a..0a7bee9acb 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -60,7 +60,8 @@ class node_config unsigned io_threads{ std::max (4, std::thread::hardware_concurrency ()) }; unsigned network_threads{ std::max (4, std::thread::hardware_concurrency ()) }; unsigned work_threads{ std::max (4, std::thread::hardware_concurrency ()) }; - unsigned signature_checker_threads{ (std::thread::hardware_concurrency () / 2 > 1) ? std::thread::hardware_concurrency () / 2 - 1 : 0 }; /* The calling thread does checks as well so remove it from the number of threads used */ + /* Use half available threads on the system for signature checking. The calling thread does checks as well, so these are extra worker threads */ + unsigned signature_checker_threads{ (std::thread::hardware_concurrency () > 1) ? (std::thread::hardware_concurrency () / 2 ) : 0 }; bool enable_voting{ false }; unsigned bootstrap_connections{ 4 }; unsigned bootstrap_connections_max{ 64 }; From 25a2cf4740c0b3133dabbcdf8587a2044c1549b3 Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Thu, 12 Mar 2020 09:09:21 +0000 Subject: [PATCH 7/8] Formatting --- nano/node/nodeconfig.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 0a7bee9acb..58136fd684 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -61,7 +61,7 @@ class node_config unsigned network_threads{ std::max (4, std::thread::hardware_concurrency ()) }; unsigned work_threads{ std::max (4, std::thread::hardware_concurrency ()) }; /* Use half available threads on the system for signature checking. The calling thread does checks as well, so these are extra worker threads */ - unsigned signature_checker_threads{ (std::thread::hardware_concurrency () > 1) ? (std::thread::hardware_concurrency () / 2 ) : 0 }; + unsigned signature_checker_threads{ (std::thread::hardware_concurrency () > 1) ? (std::thread::hardware_concurrency () / 2) : 0 }; bool enable_voting{ false }; unsigned bootstrap_connections{ 4 }; unsigned bootstrap_connections_max{ 64 }; From 6ef69792790f42665df53effcf46de7bc6ac5448 Mon Sep 17 00:00:00 2001 From: Wesley Shillingford Date: Thu, 12 Mar 2020 09:11:15 +0000 Subject: [PATCH 8/8] Simplify expression (Gui comment) --- nano/node/nodeconfig.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 58136fd684..86d9e8de7f 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -61,7 +61,7 @@ class node_config unsigned network_threads{ std::max (4, std::thread::hardware_concurrency ()) }; unsigned work_threads{ std::max (4, std::thread::hardware_concurrency ()) }; /* Use half available threads on the system for signature checking. The calling thread does checks as well, so these are extra worker threads */ - unsigned signature_checker_threads{ (std::thread::hardware_concurrency () > 1) ? (std::thread::hardware_concurrency () / 2) : 0 }; + unsigned signature_checker_threads{ std::thread::hardware_concurrency () / 2 }; bool enable_voting{ false }; unsigned bootstrap_connections{ 4 }; unsigned bootstrap_connections_max{ 64 };