From ab43d93438aa711e43421f155be6390dbcb2446c Mon Sep 17 00:00:00 2001 From: Guilherme Lawless Date: Fri, 6 Mar 2020 16:13:56 +0000 Subject: [PATCH] Filter duplicate publish messages before deserializing When a message is unique, the digest is saved and passed around to network processing, which may drop it if the block processor is full. Cleaning up a long unchecked block erases its digest from the publish filter. The blocks_filter has been removed due to redundancy. The size of this filter is 256k, which uses about 4MB. --- nano/core_test/active_transactions.cpp | 51 ++++++++++++++++++++ nano/core_test/message_parser.cpp | 15 ++++-- nano/core_test/network.cpp | 61 ++++++++++++++++++++++++ nano/core_test/node.cpp | 14 +++++- nano/lib/numbers.cpp | 7 ++- nano/lib/stats.cpp | 6 +++ nano/lib/stats.hpp | 8 +++- nano/node/active_transactions.cpp | 13 ++--- nano/node/active_transactions.hpp | 1 + nano/node/blockprocessor.cpp | 38 ++++----------- nano/node/blockprocessor.hpp | 2 - nano/node/bootstrap/bootstrap_server.cpp | 21 +++++--- nano/node/common.cpp | 26 +++++++--- nano/node/common.hpp | 12 +++-- nano/node/election.cpp | 16 ++++++- nano/node/election.hpp | 3 +- nano/node/network.cpp | 25 ++++++---- nano/node/network.hpp | 3 +- nano/node/node.cpp | 10 ++-- nano/node/transport/udp.cpp | 17 ++++--- nano/secure/network_filter.cpp | 34 ++++++++++--- nano/secure/network_filter.hpp | 13 ++++- 22 files changed, 298 insertions(+), 98 deletions(-) diff --git a/nano/core_test/active_transactions.cpp b/nano/core_test/active_transactions.cpp index 5be7c22de3..85b8eeb337 100644 --- a/nano/core_test/active_transactions.cpp +++ b/nano/core_test/active_transactions.cpp @@ -772,3 +772,54 @@ TEST (active_transactions, activate_dependencies) ASSERT_TRUE (node1->ledger.block_confirmed (node1->store.tx_begin_read (), block2->hash ())); ASSERT_TRUE (node2->ledger.block_confirmed (node2->store.tx_begin_read (), block2->hash ())); } + +namespace nano +{ +// Tests that blocks are correctly cleared from the duplicate filter for unconfirmed elections +TEST (active_transactions, dropped_cleanup) +{ + nano::system system; + nano::node_config node_config (nano::get_available_port (), system.logging); + node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; + auto & node (*system.add_node (node_config)); + + nano::genesis genesis; + auto block = genesis.open; + + // Add to network filter to ensure proper cleanup after the election is dropped + std::vector block_bytes; + { + nano::vectorstream stream (block_bytes); + block->serialize (stream); + } + ASSERT_FALSE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ())); + ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ())); + + auto election (node.active.insert (block).first); + ASSERT_NE (nullptr, election); + + // Not yet removed + ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ())); + + // Now simulate dropping the election, which performs a cleanup in the background using the node worker + ASSERT_FALSE (election->confirmed ()); + { + nano::lock_guard guard (node.active.mutex); + election->cleanup (); + } + + // Push a worker task to ensure the cleanup is already performed + std::atomic flag{ false }; + node.worker.push_task ([&flag]() { + flag = true; + }); + system.deadline_set (5s); + while (!flag) + { + ASSERT_NO_ERROR (system.poll ()); + } + + // The filter must have been cleared + ASSERT_FALSE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ())); +} +} diff --git a/nano/core_test/message_parser.cpp b/nano/core_test/message_parser.cpp index f67b2fd6e7..e13540a053 100644 --- a/nano/core_test/message_parser.cpp +++ b/nano/core_test/message_parser.cpp @@ -63,9 +63,10 @@ TEST (message_parser, exact_confirm_ack_size) { nano::system system (1); test_visitor visitor; + nano::network_filter filter (1); nano::block_uniquer block_uniquer; nano::vote_uniquer vote_uniquer (block_uniquer); - nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work); + nano::message_parser parser (filter, block_uniquer, vote_uniquer, visitor, system.work); auto block (std::make_shared (1, 1, 2, nano::keypair ().prv, 4, *system.work.generate (nano::root (1)))); auto vote (std::make_shared (0, nano::keypair ().prv, 0, std::move (block))); nano::confirm_ack message (vote); @@ -96,9 +97,10 @@ TEST (message_parser, exact_confirm_req_size) { nano::system system (1); test_visitor visitor; + nano::network_filter filter (1); nano::block_uniquer block_uniquer; nano::vote_uniquer vote_uniquer (block_uniquer); - nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work); + nano::message_parser parser (filter, block_uniquer, vote_uniquer, visitor, system.work); auto block (std::make_shared (1, 1, 2, nano::keypair ().prv, 4, *system.work.generate (nano::root (1)))); nano::confirm_req message (std::move (block)); std::vector bytes; @@ -128,9 +130,10 @@ TEST (message_parser, exact_confirm_req_hash_size) { nano::system system (1); test_visitor visitor; + nano::network_filter filter (1); nano::block_uniquer block_uniquer; nano::vote_uniquer vote_uniquer (block_uniquer); - nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work); + nano::message_parser parser (filter, block_uniquer, vote_uniquer, visitor, system.work); nano::send_block block (1, 1, 2, nano::keypair ().prv, 4, *system.work.generate (nano::root (1))); nano::confirm_req message (block.hash (), block.root ()); std::vector bytes; @@ -160,9 +163,10 @@ TEST (message_parser, exact_publish_size) { nano::system system (1); test_visitor visitor; + nano::network_filter filter (1); nano::block_uniquer block_uniquer; nano::vote_uniquer vote_uniquer (block_uniquer); - nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work); + nano::message_parser parser (filter, block_uniquer, vote_uniquer, visitor, system.work); auto block (std::make_shared (1, 1, 2, nano::keypair ().prv, 4, *system.work.generate (nano::root (1)))); nano::publish message (std::move (block)); std::vector bytes; @@ -192,9 +196,10 @@ TEST (message_parser, exact_keepalive_size) { nano::system system (1); test_visitor visitor; + nano::network_filter filter (1); nano::block_uniquer block_uniquer; nano::vote_uniquer vote_uniquer (block_uniquer); - nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work); + nano::message_parser parser (filter, block_uniquer, vote_uniquer, visitor, system.work); nano::keepalive message; std::vector bytes; { diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 99b9b9f121..c773b7cfb9 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -927,6 +927,67 @@ TEST (network, replace_port) node1->stop (); } +TEST (network, duplicate_detection) +{ + nano::system system; + nano::node_flags node_flags; + node_flags.disable_udp = false; + auto & node0 (*system.add_node (node_flags)); + auto & node1 (*system.add_node (node_flags)); + auto udp_channel (std::make_shared (node0.network.udp_channels, node1.network.endpoint (), node1.network_params.protocol.protocol_version)); + nano::genesis genesis; + nano::publish publish (genesis.open); + + // Publish duplicate detection through UDP + ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish)); + udp_channel->send (publish); + udp_channel->send (publish); + system.deadline_set (2s); + while (node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish) < 1) + { + ASSERT_NO_ERROR (system.poll ()); + } + + // Publish duplicate detection through TCP + auto tcp_channel (node0.network.tcp_channels.find_channel (nano::transport::map_endpoint_to_tcp (node1.network.endpoint ()))); + ASSERT_EQ (1, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish)); + tcp_channel->send (publish); + system.deadline_set (2s); + while (node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish) < 2) + { + ASSERT_NO_ERROR (system.poll ()); + } +} + +TEST (network, duplicate_revert_publish) +{ + nano::system system; + nano::node_flags node_flags; + node_flags.block_processor_full_size = 0; + auto & node (*system.add_node (node_flags)); + ASSERT_TRUE (node.block_processor.full ()); + nano::genesis genesis; + nano::publish publish (genesis.open); + std::vector bytes; + { + nano::vectorstream stream (bytes); + publish.block->serialize (stream); + } + // Add to the blocks filter + // Should be cleared when dropping due to a full block processor, as long as the message has the optional digest attached + // Test network.duplicate_detection ensures that the digest is attached when deserializing messages + nano::uint128_t digest; + ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size (), &digest)); + ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ())); + auto channel (std::make_shared (node.network.udp_channels, node.network.endpoint (), node.network_params.protocol.protocol_version)); + ASSERT_EQ (0, publish.digest); + node.network.process_message (publish, channel); + ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ())); + publish.digest = digest; + node.network.process_message (publish, channel); + ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ())); +} + // The test must be completed in less than 1 second TEST (bandwidth_limiter, validate) { diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 8c76236ad9..ee9d35b177 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -3217,7 +3217,7 @@ TEST (node, block_processor_full) { nano::system system; nano::node_flags node_flags; - node_flags.block_processor_full_size = 2; + node_flags.block_processor_full_size = 3; auto & node = *system.add_node (nano::node_config (nano::get_available_port (), system.logging), node_flags); nano::genesis genesis; auto send1 (std::make_shared (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0)); @@ -3245,7 +3245,7 @@ TEST (node, block_processor_half_full) { nano::system system; nano::node_flags node_flags; - node_flags.block_processor_full_size = 4; + node_flags.block_processor_full_size = 6; auto & node = *system.add_node (nano::node_config (nano::get_available_port (), system.logging), node_flags); nano::genesis genesis; auto send1 (std::make_shared (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0)); @@ -3421,6 +3421,14 @@ TEST (node, unchecked_cleanup) nano::keypair key; auto & node (*system.nodes[0]); auto open (std::make_shared (key.pub, 0, key.pub, 1, key.pub, key.prv, key.pub, *system.work.generate (key.pub))); + std::vector bytes; + { + nano::vectorstream stream (bytes); + open->serialize (stream); + } + // Add to the blocks filter + // Should be cleared after unchecked cleanup + ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ())); node.process_active (open); node.block_processor.flush (); node.config.unchecked_cutoff_time = std::chrono::seconds (2); @@ -3432,6 +3440,7 @@ TEST (node, unchecked_cleanup) } std::this_thread::sleep_for (std::chrono::seconds (1)); node.unchecked_cleanup (); + ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ())); { auto transaction (node.store.tx_begin_read ()); auto unchecked_count (node.store.unchecked_count (transaction)); @@ -3440,6 +3449,7 @@ TEST (node, unchecked_cleanup) } std::this_thread::sleep_for (std::chrono::seconds (2)); node.unchecked_cleanup (); + ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ())); { auto transaction (node.store.tx_begin_read ()); auto unchecked_count (node.store.unchecked_count (transaction)); diff --git a/nano/lib/numbers.cpp b/nano/lib/numbers.cpp index fba48e109a..542f4805f3 100644 --- a/nano/lib/numbers.cpp +++ b/nano/lib/numbers.cpp @@ -430,8 +430,11 @@ bool nano::validate_message (nano::public_key const & public_key, nano::uint256_ bool nano::validate_message_batch (const unsigned char ** m, size_t * mlen, const unsigned char ** pk, const unsigned char ** RS, size_t num, int * valid) { - bool result (0 == ed25519_sign_open_batch (m, mlen, pk, RS, num, valid)); - return result; + for (size_t i{ 0 }; i < num; ++i) + { + valid[i] = (0 == ed25519_sign_open (m[i], mlen[i], pk[i], RS[i])); + } + return true; } nano::uint128_union::uint128_union (std::string const & string_a) diff --git a/nano/lib/stats.cpp b/nano/lib/stats.cpp index dccbfeda17..4d2cab087b 100644 --- a/nano/lib/stats.cpp +++ b/nano/lib/stats.cpp @@ -444,6 +444,9 @@ std::string nano::stat::type_to_string (uint32_t key) case nano::stat::type::requests: res = "requests"; break; + case nano::stat::type::filter: + res = "filter"; + break; } return res; } @@ -694,6 +697,9 @@ std::string nano::stat::detail_to_string (uint32_t key) case nano::stat::detail::requests_unknown: res = "requests_unknown"; break; + case nano::stat::detail::duplicate_publish: + res = "duplicate_publish"; + break; } return res; } diff --git a/nano/lib/stats.hpp b/nano/lib/stats.hpp index 26f78afa5f..c559db493f 100644 --- a/nano/lib/stats.hpp +++ b/nano/lib/stats.hpp @@ -199,7 +199,8 @@ class stat final confirmation_height, drop, aggregator, - requests + requests, + filter, }; /** Optional detail type */ @@ -314,7 +315,10 @@ class stat final requests_generated_hashes, requests_cached_votes, requests_generated_votes, - requests_unknown + requests_unknown, + + // duplicate + duplicate_publish }; /** Direction of the stat. If the direction is irrelevant, use in */ diff --git a/nano/node/active_transactions.cpp b/nano/node/active_transactions.cpp index 6161501d49..267074986d 100644 --- a/nano/node/active_transactions.cpp +++ b/nano/node/active_transactions.cpp @@ -249,7 +249,7 @@ void nano::active_transactions::request_confirm (nano::unique_lock & bool const overflow_l (count_l >= node.config.active_elections_size && election_l->election_start < election_ttl_cutoff_l && !node.wallets.watcher->is_watched (i->root)); if (overflow_l || election_l->transition_time (solicitor, saturated_l)) { - election_l->clear_blocks (); + election_l->cleanup (); i = sorted_roots_l.erase (i); } else @@ -791,7 +791,7 @@ void nano::active_transactions::erase (nano::block const & block_a) auto root_it (roots.get ().find (block_a.qualified_root ())); if (root_it != roots.get ().end ()) { - root_it->election->clear_blocks (); + root_it->election->cleanup (); root_it->election->adjust_dependent_difficulty (); roots.get ().erase (root_it); node.logger.try_log (boost::str (boost::format ("Election erased for block block %1% root %2%") % block_a.hash ().to_string () % block_a.root ().to_string ())); @@ -937,18 +937,13 @@ nano::inactive_cache_information nano::active_transactions::find_inactive_votes_ } else { - return nano::inactive_cache_information{ std::chrono::steady_clock::time_point{}, 0, std::vector{} }; + return nano::inactive_cache_information{}; } } void nano::active_transactions::erase_inactive_votes_cache (nano::block_hash const & hash_a) { - auto & inactive_by_hash (inactive_votes_cache.get ()); - auto existing (inactive_by_hash.find (hash_a)); - if (existing != inactive_by_hash.end ()) - { - inactive_by_hash.erase (existing); - } + inactive_votes_cache.get ().erase (hash_a); } bool nano::active_transactions::inactive_votes_bootstrap_check (std::vector const & voters_a, nano::block_hash const & hash_a, bool & confirmed_a) diff --git a/nano/node/active_transactions.hpp b/nano/node/active_transactions.hpp index d1ffcb3d09..bd7f7aa4da 100644 --- a/nano/node/active_transactions.hpp +++ b/nano/node/active_transactions.hpp @@ -195,6 +195,7 @@ class active_transactions final bool inactive_votes_bootstrap_check (std::vector const &, nano::block_hash const &, bool &); boost::thread thread; + friend class active_transactions_dropped_cleanup_Test; friend class confirmation_height_prioritize_frontiers_Test; friend class confirmation_height_prioritize_frontiers_overwrite_Test; friend std::unique_ptr collect_container_info (active_transactions &, const std::string &); diff --git a/nano/node/blockprocessor.cpp b/nano/node/blockprocessor.cpp index 5447645f39..6b50a762c4 100644 --- a/nano/node/blockprocessor.cpp +++ b/nano/node/blockprocessor.cpp @@ -42,7 +42,6 @@ void nano::block_processor::flush () { condition.wait (lock); } - blocks_filter.clear (); flushing = false; } @@ -54,12 +53,12 @@ size_t nano::block_processor::size () bool nano::block_processor::full () { - return size () > node.flags.block_processor_full_size; + return size () >= node.flags.block_processor_full_size; } bool nano::block_processor::half_full () { - return size () > node.flags.block_processor_full_size / 2; + return size () >= node.flags.block_processor_full_size / 2; } void nano::block_processor::add (std::shared_ptr block_a, uint64_t origination) @@ -72,20 +71,14 @@ void nano::block_processor::add (nano::unchecked_info const & info_a) { debug_assert (!nano::work_validate (*info_a.block)); { - auto hash (info_a.block->hash ()); - auto filter_hash (filter_item (hash, info_a.block->block_signature ())); nano::lock_guard lock (mutex); - if (blocks_filter.find (filter_hash) == blocks_filter.end ()) + if (info_a.verified == nano::signature_verification::unknown && (info_a.block->type () == nano::block_type::state || info_a.block->type () == nano::block_type::open || !info_a.account.is_zero ())) { - if (info_a.verified == nano::signature_verification::unknown && (info_a.block->type () == nano::block_type::state || info_a.block->type () == nano::block_type::open || !info_a.account.is_zero ())) - { - state_blocks.push_back (info_a); - } - else - { - blocks.push_back (info_a); - } - blocks_filter.insert (filter_hash); + state_blocks.push_back (info_a); + } + else + { + blocks.push_back (info_a); } } condition.notify_all (); @@ -233,7 +226,6 @@ void nano::block_processor::verify_state_blocks (nano::unique_lock & } else { - blocks_filter.erase (filter_item (hashes[i], blocks_signatures[i])); requeue_invalid (hashes[i], item); } items.pop_front (); @@ -305,7 +297,6 @@ void nano::block_processor::process_batch (nano::unique_lock & lock_ info = blocks.front (); blocks.pop_front (); hash = info.block->hash (); - blocks_filter.erase (filter_item (hash, info.block->block_signature ())); } else { @@ -568,19 +559,6 @@ void nano::block_processor::queue_unchecked (nano::write_transaction const & tra node.gap_cache.erase (hash_a); } -nano::block_hash nano::block_processor::filter_item (nano::block_hash const & hash_a, nano::signature const & signature_a) -{ - static nano::random_constants constants; - nano::block_hash result; - blake2b_state state; - blake2b_init (&state, sizeof (result.bytes)); - blake2b_update (&state, constants.not_an_account.bytes.data (), constants.not_an_account.bytes.size ()); - blake2b_update (&state, signature_a.bytes.data (), signature_a.bytes.size ()); - blake2b_update (&state, hash_a.bytes.data (), hash_a.bytes.size ()); - blake2b_final (&state, result.bytes.data (), sizeof (result.bytes)); - return result; -} - void nano::block_processor::requeue_invalid (nano::block_hash const & hash_a, nano::unchecked_info const & info_a) { debug_assert (hash_a == info_a.block->hash ()); diff --git a/nano/node/blockprocessor.hpp b/nano/node/blockprocessor.hpp index 1395400d00..61484b0406 100644 --- a/nano/node/blockprocessor.hpp +++ b/nano/node/blockprocessor.hpp @@ -62,8 +62,6 @@ class block_processor final std::deque state_blocks; std::deque blocks; std::deque> forced; - nano::block_hash filter_item (nano::block_hash const &, nano::signature const &); - std::unordered_set blocks_filter; nano::condition_variable condition; nano::node & node; nano::write_database_queue & write_database_queue; diff --git a/nano/node/bootstrap/bootstrap_server.cpp b/nano/node/bootstrap/bootstrap_server.cpp index e8c9cc1168..d4f98f8868 100644 --- a/nano/node/bootstrap/bootstrap_server.cpp +++ b/nano/node/bootstrap/bootstrap_server.cpp @@ -408,15 +408,24 @@ void nano::bootstrap_server::receive_publish_action (boost::system::error_code c { if (!ec) { - auto error (false); - nano::bufferstream stream (receive_buffer->data (), size_a); - auto request (std::make_unique (error, stream, header_a)); - if (!error) + nano::uint128_t digest; + if (!node->network.publish_filter.apply (receive_buffer->data (), size_a, &digest)) { - if (is_realtime_connection ()) + auto error (false); + nano::bufferstream stream (receive_buffer->data (), size_a); + auto request (std::make_unique (error, stream, header_a, digest)); + if (!error) { - add_request (std::unique_ptr (request.release ())); + if (is_realtime_connection ()) + { + add_request (std::unique_ptr (request.release ())); + } + receive (); } + } + else + { + node->stats.inc (nano::stat::type::filter, nano::stat::detail::duplicate_publish); receive (); } } diff --git a/nano/node/common.cpp b/nano/node/common.cpp index d57558897e..9b07ffc1b9 100644 --- a/nano/node/common.cpp +++ b/nano/node/common.cpp @@ -312,6 +312,10 @@ std::string nano::message_parser::status_string () { return "invalid_network"; } + case nano::message_parser::parse_status::duplicate_publish_message: + { + return "duplicate_publish_message"; + } } debug_assert (false); @@ -319,7 +323,8 @@ std::string nano::message_parser::status_string () return "[unknown parse_status]"; } -nano::message_parser::message_parser (nano::block_uniquer & block_uniquer_a, nano::vote_uniquer & vote_uniquer_a, nano::message_visitor & visitor_a, nano::work_pool & pool_a) : +nano::message_parser::message_parser (nano::network_filter & publish_filter_a, nano::block_uniquer & block_uniquer_a, nano::vote_uniquer & vote_uniquer_a, nano::message_visitor & visitor_a, nano::work_pool & pool_a) : +publish_filter (publish_filter_a), block_uniquer (block_uniquer_a), vote_uniquer (vote_uniquer_a), visitor (visitor_a), @@ -355,7 +360,15 @@ void nano::message_parser::deserialize_buffer (uint8_t const * buffer_a, size_t } case nano::message_type::publish: { - deserialize_publish (stream, header); + nano::uint128_t digest; + if (!publish_filter.apply (buffer_a + header.size, size_a - header.size, &digest)) + { + deserialize_publish (stream, header, digest); + } + else + { + status = parse_status::duplicate_publish_message; + } break; } case nano::message_type::confirm_req: @@ -412,10 +425,10 @@ void nano::message_parser::deserialize_keepalive (nano::stream & stream_a, nano: } } -void nano::message_parser::deserialize_publish (nano::stream & stream_a, nano::message_header const & header_a) +void nano::message_parser::deserialize_publish (nano::stream & stream_a, nano::message_header const & header_a, nano::uint128_t const & digest_a) { auto error (false); - nano::publish incoming (error, stream_a, header_a, &block_uniquer); + nano::publish incoming (error, stream_a, header_a, digest_a, &block_uniquer); if (!error && at_end (stream_a)) { if (!nano::work_validate (*incoming.block)) @@ -593,8 +606,9 @@ bool nano::keepalive::operator== (nano::keepalive const & other_a) const return peers == other_a.peers; } -nano::publish::publish (bool & error_a, nano::stream & stream_a, nano::message_header const & header_a, nano::block_uniquer * uniquer_a) : -message (header_a) +nano::publish::publish (bool & error_a, nano::stream & stream_a, nano::message_header const & header_a, nano::uint128_t const & digest_a, nano::block_uniquer * uniquer_a) : +message (header_a), +digest (digest_a) { if (!error_a) { diff --git a/nano/node/common.hpp b/nano/node/common.hpp index 4cf7273208..cd9612fda8 100644 --- a/nano/node/common.hpp +++ b/nano/node/common.hpp @@ -7,6 +7,7 @@ #include #include #include +#include #include @@ -247,18 +248,20 @@ class message_parser final invalid_telemetry_ack_message, outdated_version, invalid_magic, - invalid_network + invalid_network, + duplicate_publish_message }; - message_parser (nano::block_uniquer &, nano::vote_uniquer &, nano::message_visitor &, nano::work_pool &); + message_parser (nano::network_filter &, nano::block_uniquer &, nano::vote_uniquer &, nano::message_visitor &, nano::work_pool &); void deserialize_buffer (uint8_t const *, size_t); void deserialize_keepalive (nano::stream &, nano::message_header const &); - void deserialize_publish (nano::stream &, nano::message_header const &); + void deserialize_publish (nano::stream &, nano::message_header const &, nano::uint128_t const & = 0); void deserialize_confirm_req (nano::stream &, nano::message_header const &); void deserialize_confirm_ack (nano::stream &, nano::message_header const &); void deserialize_node_id_handshake (nano::stream &, nano::message_header const &); void deserialize_telemetry_req (nano::stream &, nano::message_header const &); void deserialize_telemetry_ack (nano::stream &, nano::message_header const &); bool at_end (nano::stream &); + nano::network_filter & publish_filter; nano::block_uniquer & block_uniquer; nano::vote_uniquer & vote_uniquer; nano::message_visitor & visitor; @@ -282,13 +285,14 @@ class keepalive final : public message class publish final : public message { public: - publish (bool &, nano::stream &, nano::message_header const &, nano::block_uniquer * = nullptr); + publish (bool &, nano::stream &, nano::message_header const &, nano::uint128_t const & = 0, nano::block_uniquer * = nullptr); explicit publish (std::shared_ptr); void visit (nano::message_visitor &) const override; void serialize (nano::stream &) const override; bool deserialize (nano::stream &, nano::block_uniquer * = nullptr); bool operator== (nano::publish const &) const; std::shared_ptr block; + nano::uint128_t digest{ 0 }; }; class confirm_req final : public message { diff --git a/nano/node/election.cpp b/nano/node/election.cpp index 8481bb0456..7541c4ba3f 100644 --- a/nano/node/election.cpp +++ b/nano/node/election.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -531,8 +532,9 @@ void nano::election::adjust_dependent_difficulty () } } -void nano::election::clear_blocks () +void nano::election::cleanup () { + bool unconfirmed (!confirmed ()); auto winner_hash (status.winner->hash ()); for (auto const & block : blocks) { @@ -542,11 +544,21 @@ void nano::election::clear_blocks () debug_assert (erased == 1); node.active.erase_inactive_votes_cache (hash); // Notify observers about dropped elections & blocks lost confirmed elections - if (!confirmed () || hash != winner_hash) + if (unconfirmed || hash != winner_hash) { node.observers.active_stopped.notify (hash); } } + if (unconfirmed) + { + // Clear network filter in another thread + node.worker.push_task ([node_l = node.shared (), blocks_l = std::move (blocks)]() { + for (auto const & block : blocks_l) + { + node_l->network.publish_filter.clear (block.second); + } + }); + } } void nano::election::insert_inactive_votes_cache (nano::block_hash const & hash_a) diff --git a/nano/node/election.hpp b/nano/node/election.hpp index 6ea73c11ce..f76569bcd4 100644 --- a/nano/node/election.hpp +++ b/nano/node/election.hpp @@ -79,8 +79,9 @@ class election final : public std::enable_shared_from_this size_t last_votes_size (); void update_dependent (); void adjust_dependent_difficulty (); - void clear_blocks (); void insert_inactive_votes_cache (nano::block_hash const &); + // Erase all blocks from active and, if not confirmed, clear digests from network filters + void cleanup (); public: // State transitions bool transition_time (nano::confirmation_solicitor &, bool const saturated); diff --git a/nano/node/network.cpp b/nano/node/network.cpp index aba02256d9..9c41f60df4 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -15,6 +15,7 @@ buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receiv resolver (node_a.io_ctx), limiter (node_a.config.bandwidth_limit), node (node_a), +publish_filter (256 * 1024), udp_channels (node_a, port_a), tcp_channels (node_a), port (port_a), @@ -390,6 +391,7 @@ class network_message_visitor : public nano::message_visitor } else { + node.network.publish_filter.clear (message_a.digest); node.stats.inc (nano::stat::type::drop, nano::stat::detail::publish, nano::stat::dir::in); } } @@ -427,22 +429,25 @@ class network_message_visitor : public nano::message_visitor node.logger.try_log (boost::str (boost::format ("Received confirm_ack message from %1% for %2%sequence %3%") % channel->to_string () % message_a.vote->hashes_string () % std::to_string (message_a.vote->sequence))); } node.stats.inc (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::in); - for (auto & vote_block : message_a.vote->blocks) + if (!message_a.vote->account.is_zero ()) { - if (!vote_block.which ()) + for (auto & vote_block : message_a.vote->blocks) { - auto block (boost::get> (vote_block)); - if (!node.block_processor.full ()) + if (!vote_block.which ()) { - node.process_active (block); - } - else - { - node.stats.inc (nano::stat::type::drop, nano::stat::detail::confirm_ack, nano::stat::dir::in); + auto block (boost::get> (vote_block)); + if (!node.block_processor.full ()) + { + node.process_active (block); + } + else + { + node.stats.inc (nano::stat::type::drop, nano::stat::detail::confirm_ack, nano::stat::dir::in); + } } } + node.vote_processor.vote (message_a.vote, channel); } - node.vote_processor.vote (message_a.vote, channel); } void bulk_pull (nano::bulk_pull const &) override { diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 99c68bbc45..705c27aee2 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -3,13 +3,13 @@ #include #include #include +#include #include #include #include #include - namespace nano { class channel; @@ -152,6 +152,7 @@ class network final std::vector packet_processing_threads; nano::bandwidth_limiter limiter; nano::node & node; + nano::network_filter publish_filter; nano::transport::udp_channels udp_channels; nano::transport::tcp_channels tcp_channels; std::atomic port{ 0 }; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 77e5041e59..2581ffe706 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -84,21 +84,18 @@ std::unique_ptr nano::collect_container_info (bl { size_t state_blocks_count; size_t blocks_count; - size_t blocks_filter_count; size_t forced_count; { nano::lock_guard guard (block_processor.mutex); state_blocks_count = block_processor.state_blocks.size (); blocks_count = block_processor.blocks.size (); - blocks_filter_count = block_processor.blocks_filter.size (); forced_count = block_processor.forced.size (); } auto composite = std::make_unique (name); composite->add_component (std::make_unique (container_info{ "state_blocks", state_blocks_count, sizeof (decltype (block_processor.state_blocks)::value_type) })); composite->add_component (std::make_unique (container_info{ "blocks", blocks_count, sizeof (decltype (block_processor.blocks)::value_type) })); - composite->add_component (std::make_unique (container_info{ "blocks_filter", blocks_filter_count, sizeof (decltype (block_processor.blocks_filter)::value_type) })); composite->add_component (std::make_unique (container_info{ "forced", forced_count, sizeof (decltype (block_processor.forced)::value_type) })); composite->add_component (collect_container_info (block_processor.generator, "generator")); return composite; @@ -936,6 +933,7 @@ void nano::node::bootstrap_wallet () void nano::node::unchecked_cleanup () { + std::vector> blocks; std::deque cleaning_list; auto attempt (bootstrap_initiator.current_attempt ()); bool long_attempt (attempt != nullptr && std::chrono::duration_cast (std::chrono::steady_clock::now () - attempt->attempt_start).count () > config.unchecked_cutoff_time.count ()); @@ -951,6 +949,7 @@ void nano::node::unchecked_cleanup () nano::unchecked_info const & info (i->second); if ((now - info.modified) > static_cast (config.unchecked_cutoff_time.count ())) { + blocks.push_back (info.block); cleaning_list.push_back (key); } } @@ -976,6 +975,11 @@ void nano::node::unchecked_cleanup () } } } + // Delete from the duplicate filter + for (auto const & block : blocks) + { + network.publish_filter.clear (block); + } } void nano::node::ongoing_unchecked_cleanup () diff --git a/nano/node/transport/udp.cpp b/nano/node/transport/udp.cpp index 993ee9bc9c..b2ba6896e0 100644 --- a/nano/node/transport/udp.cpp +++ b/nano/node/transport/udp.cpp @@ -543,9 +543,17 @@ void nano::transport::udp_channels::receive_action (nano::message_buffer * data_ if (allowed_sender) { udp_message_visitor visitor (node, data_a->endpoint); - nano::message_parser parser (node.block_uniquer, node.vote_uniquer, visitor, node.work); + nano::message_parser parser (node.network.publish_filter, node.block_uniquer, node.vote_uniquer, visitor, node.work); parser.deserialize_buffer (data_a->buffer, data_a->size); - if (parser.status != nano::message_parser::parse_status::success) + if (parser.status == nano::message_parser::parse_status::success) + { + node.stats.add (nano::stat::type::traffic_udp, nano::stat::dir::in, data_a->size); + } + else if (parser.status == nano::message_parser::parse_status::duplicate_publish_message) + { + node.stats.inc (nano::stat::type::filter, nano::stat::detail::duplicate_publish); + } + else { node.stats.inc (nano::stat::type::error); @@ -591,15 +599,12 @@ void nano::transport::udp_channels::receive_action (nano::message_buffer * data_ case nano::message_parser::parse_status::outdated_version: node.stats.inc (nano::stat::type::udp, nano::stat::detail::outdated_version); break; + case nano::message_parser::parse_status::duplicate_publish_message: case nano::message_parser::parse_status::success: /* Already checked, unreachable */ break; } } - else - { - node.stats.add (nano::stat::type::traffic_udp, nano::stat::dir::in, data_a->size); - } } else { diff --git a/nano/secure/network_filter.cpp b/nano/secure/network_filter.cpp index b6fc7b3f2b..051ebd566b 100644 --- a/nano/secure/network_filter.cpp +++ b/nano/secure/network_filter.cpp @@ -40,6 +40,19 @@ void nano::network_filter::clear (nano::uint128_t const & digest_a) } } +void nano::network_filter::clear (std::vector const & digests_a) +{ + nano::lock_guard lock (mutex); + for (auto const & digest : digests_a) + { + auto & element (get_element (digest)); + if (element == digest) + { + element = nano::uint128_t{ 0 }; + } + } +} + void nano::network_filter::clear (uint8_t const * bytes_a, size_t count_a) { clear (hash (bytes_a, count_a)); @@ -48,12 +61,7 @@ void nano::network_filter::clear (uint8_t const * bytes_a, size_t count_a) template void nano::network_filter::clear (OBJECT const & object_a) { - std::vector bytes; - { - nano::vectorstream stream (bytes); - object_a->serialize (stream); - } - clear (bytes.data (), bytes.size ()); + clear (hash (object_a)); } void nano::network_filter::clear () @@ -62,6 +70,17 @@ void nano::network_filter::clear () items.assign (items.size (), nano::uint128_t{ 0 }); } +template +nano::uint128_t nano::network_filter::hash (OBJECT const & object_a) const +{ + std::vector bytes; + { + nano::vectorstream stream (bytes); + object_a->serialize (stream); + } + return hash (bytes.data (), bytes.size ()); +} + nano::uint128_t & nano::network_filter::get_element (nano::uint128_t const & hash_a) { debug_assert (!mutex.try_lock ()); @@ -77,3 +96,6 @@ nano::uint128_t nano::network_filter::hash (uint8_t const * bytes_a, size_t coun siphash.CalculateDigest (digest.bytes.data (), bytes_a, count_a); return digest.number (); } + +// Explicitly instantiate +template void nano::network_filter::clear (std::shared_ptr const &); diff --git a/nano/secure/network_filter.hpp b/nano/secure/network_filter.hpp index 60b99dbd08..0e42a2cce7 100644 --- a/nano/secure/network_filter.hpp +++ b/nano/secure/network_filter.hpp @@ -34,6 +34,11 @@ class network_filter final **/ void clear (nano::uint128_t const & digest_a); + /** + * Clear many digests from the filter + **/ + void clear (std::vector const &); + /** * Reads \p count_a bytes starting from \p bytes_a and digests the contents. * Then, sets the corresponding element in the filter to zero, if it matches the digest exactly. @@ -42,7 +47,7 @@ class network_filter final void clear (uint8_t const * bytes_a, size_t count_a); /** - * Serializes \p object_a and runs clears the resulting siphash digest. + * Serializes \p object_a and clears the resulting siphash digest from the filter. * @return a boolean representing the previous existence of the hash in the filter. **/ template @@ -51,6 +56,12 @@ class network_filter final /** Sets every element of the filter to zero, keeping its size and capacity. */ void clear (); + /** + * Serializes \p object_a and returns the resulting siphash digest + */ + template + nano::uint128_t hash (OBJECT const & object_a) const; + private: using siphash_t = CryptoPP::SipHash<2, 4, true>;