Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Network duplicate filter for publish messages #2643

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions nano/core_test/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -772,3 +772,54 @@ TEST (active_transactions, activate_dependencies)
ASSERT_TRUE (node1->ledger.block_confirmed (node1->store.tx_begin_read (), block2->hash ()));
ASSERT_TRUE (node2->ledger.block_confirmed (node2->store.tx_begin_read (), block2->hash ()));
}

namespace nano
{
// Tests that blocks are correctly cleared from the duplicate filter for unconfirmed elections
TEST (active_transactions, dropped_cleanup)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto & node (*system.add_node (node_config));

nano::genesis genesis;
auto block = genesis.open;

// Add to network filter to ensure proper cleanup after the election is dropped
std::vector<uint8_t> block_bytes;
{
nano::vectorstream stream (block_bytes);
block->serialize (stream);
}
ASSERT_FALSE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));

auto election (node.active.insert (block).first);
ASSERT_NE (nullptr, election);

// Not yet removed
ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));

// Now simulate dropping the election, which performs a cleanup in the background using the node worker
ASSERT_FALSE (election->confirmed ());
{
nano::lock_guard<std::mutex> guard (node.active.mutex);
election->cleanup ();
}

// Push a worker task to ensure the cleanup is already performed
std::atomic<bool> flag{ false };
node.worker.push_task ([&flag]() {
flag = true;
});
system.deadline_set (5s);
while (!flag)
{
ASSERT_NO_ERROR (system.poll ());
}

// The filter must have been cleared
ASSERT_FALSE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
}
}
15 changes: 10 additions & 5 deletions nano/core_test/message_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ TEST (message_parser, exact_confirm_ack_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, block_uniquer, vote_uniquer, visitor, system.work);
auto block (std::make_shared<nano::send_block> (1, 1, 2, nano::keypair ().prv, 4, *system.work.generate (nano::root (1))));
auto vote (std::make_shared<nano::vote> (0, nano::keypair ().prv, 0, std::move (block)));
nano::confirm_ack message (vote);
Expand Down Expand Up @@ -96,9 +97,10 @@ TEST (message_parser, exact_confirm_req_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, block_uniquer, vote_uniquer, visitor, system.work);
auto block (std::make_shared<nano::send_block> (1, 1, 2, nano::keypair ().prv, 4, *system.work.generate (nano::root (1))));
nano::confirm_req message (std::move (block));
std::vector<uint8_t> bytes;
Expand Down Expand Up @@ -128,9 +130,10 @@ TEST (message_parser, exact_confirm_req_hash_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, block_uniquer, vote_uniquer, visitor, system.work);
nano::send_block block (1, 1, 2, nano::keypair ().prv, 4, *system.work.generate (nano::root (1)));
nano::confirm_req message (block.hash (), block.root ());
std::vector<uint8_t> bytes;
Expand Down Expand Up @@ -160,9 +163,10 @@ TEST (message_parser, exact_publish_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, block_uniquer, vote_uniquer, visitor, system.work);
auto block (std::make_shared<nano::send_block> (1, 1, 2, nano::keypair ().prv, 4, *system.work.generate (nano::root (1))));
nano::publish message (std::move (block));
std::vector<uint8_t> bytes;
Expand Down Expand Up @@ -192,9 +196,10 @@ TEST (message_parser, exact_keepalive_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, block_uniquer, vote_uniquer, visitor, system.work);
nano::keepalive message;
std::vector<uint8_t> bytes;
{
Expand Down
61 changes: 61 additions & 0 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,67 @@ TEST (network, peer_max_tcp_attempts)
ASSERT_TRUE (node->network.tcp_channels.reachout (nano::endpoint (node->network.endpoint ().address (), nano::get_available_port ())));
}

TEST (network, duplicate_detection)
{
nano::system system;
nano::node_flags node_flags;
node_flags.disable_udp = false;
auto & node0 (*system.add_node (node_flags));
auto & node1 (*system.add_node (node_flags));
auto udp_channel (std::make_shared<nano::transport::channel_udp> (node0.network.udp_channels, node1.network.endpoint (), node1.network_params.protocol.protocol_version));
nano::genesis genesis;
nano::publish publish (genesis.open);

// Publish duplicate detection through UDP
ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish));
udp_channel->send (publish);
udp_channel->send (publish);
system.deadline_set (2s);
while (node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish) < 1)
{
ASSERT_NO_ERROR (system.poll ());
}

// Publish duplicate detection through TCP
auto tcp_channel (node0.network.tcp_channels.find_channel (nano::transport::map_endpoint_to_tcp (node1.network.endpoint ())));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish));
tcp_channel->send (publish);
system.deadline_set (2s);
while (node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish) < 2)
{
ASSERT_NO_ERROR (system.poll ());
}
}

TEST (network, duplicate_revert_publish)
{
nano::system system;
nano::node_flags node_flags;
node_flags.block_processor_full_size = 0;
auto & node (*system.add_node (node_flags));
ASSERT_TRUE (node.block_processor.full ());
nano::genesis genesis;
nano::publish publish (genesis.open);
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
publish.block->serialize (stream);
}
// Add to the blocks filter
// Should be cleared when dropping due to a full block processor, as long as the message has the optional digest attached
// Test network.duplicate_detection ensures that the digest is attached when deserializing messages
nano::uint128_t digest;
ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size (), &digest));
ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
auto channel (std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, node.network.endpoint (), node.network_params.protocol.protocol_version));
ASSERT_EQ (0, publish.digest);
node.network.process_message (publish, channel);
ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
publish.digest = digest;
node.network.process_message (publish, channel);
ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
}

// The test must be completed in less than 1 second
TEST (bandwidth_limiter, validate)
{
Expand Down
14 changes: 12 additions & 2 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3217,7 +3217,7 @@ TEST (node, block_processor_full)
{
nano::system system;
nano::node_flags node_flags;
node_flags.block_processor_full_size = 2;
node_flags.block_processor_full_size = 3;
auto & node = *system.add_node (nano::node_config (nano::get_available_port (), system.logging), node_flags);
nano::genesis genesis;
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
Expand Down Expand Up @@ -3245,7 +3245,7 @@ TEST (node, block_processor_half_full)
{
nano::system system;
nano::node_flags node_flags;
node_flags.block_processor_full_size = 4;
node_flags.block_processor_full_size = 6;
auto & node = *system.add_node (nano::node_config (nano::get_available_port (), system.logging), node_flags);
nano::genesis genesis;
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
Expand Down Expand Up @@ -3421,6 +3421,14 @@ TEST (node, unchecked_cleanup)
nano::keypair key;
auto & node (*system.nodes[0]);
auto open (std::make_shared<nano::state_block> (key.pub, 0, key.pub, 1, key.pub, key.prv, key.pub, *system.work.generate (key.pub)));
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
open->serialize (stream);
}
// Add to the blocks filter
// Should be cleared after unchecked cleanup
ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
node.process_active (open);
node.block_processor.flush ();
node.config.unchecked_cutoff_time = std::chrono::seconds (2);
Expand All @@ -3432,6 +3440,7 @@ TEST (node, unchecked_cleanup)
}
std::this_thread::sleep_for (std::chrono::seconds (1));
node.unchecked_cleanup ();
ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
{
auto transaction (node.store.tx_begin_read ());
auto unchecked_count (node.store.unchecked_count (transaction));
Expand All @@ -3440,6 +3449,7 @@ TEST (node, unchecked_cleanup)
}
std::this_thread::sleep_for (std::chrono::seconds (2));
node.unchecked_cleanup ();
ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
{
auto transaction (node.store.tx_begin_read ());
auto unchecked_count (node.store.unchecked_count (transaction));
Expand Down
7 changes: 5 additions & 2 deletions nano/lib/numbers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,11 @@ bool nano::validate_message (nano::public_key const & public_key, nano::uint256_

bool nano::validate_message_batch (const unsigned char ** m, size_t * mlen, const unsigned char ** pk, const unsigned char ** RS, size_t num, int * valid)
{
bool result (0 == ed25519_sign_open_batch (m, mlen, pk, RS, num, valid));
return result;
for (size_t i{ 0 }; i < num; ++i)
{
valid[i] = (0 == ed25519_sign_open (m[i], mlen[i], pk[i], RS[i]));
}
return true;
}

nano::uint128_union::uint128_union (std::string const & string_a)
Expand Down
6 changes: 6 additions & 0 deletions nano/lib/stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,9 @@ std::string nano::stat::type_to_string (uint32_t key)
case nano::stat::type::requests:
res = "requests";
break;
case nano::stat::type::filter:
res = "filter";
break;
}
return res;
}
Expand Down Expand Up @@ -694,6 +697,9 @@ std::string nano::stat::detail_to_string (uint32_t key)
case nano::stat::detail::requests_unknown:
res = "requests_unknown";
break;
case nano::stat::detail::duplicate_publish:
res = "duplicate_publish";
break;
}
return res;
}
Expand Down
8 changes: 6 additions & 2 deletions nano/lib/stats.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ class stat final
confirmation_height,
drop,
aggregator,
requests
requests,
filter,
};

/** Optional detail type */
Expand Down Expand Up @@ -314,7 +315,10 @@ class stat final
requests_generated_hashes,
requests_cached_votes,
requests_generated_votes,
requests_unknown
requests_unknown,

// duplicate
duplicate_publish
};

/** Direction of the stat. If the direction is irrelevant, use in */
Expand Down
13 changes: 4 additions & 9 deletions nano/node/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ void nano::active_transactions::request_confirm (nano::unique_lock<std::mutex> &
bool const overflow_l (count_l >= node.config.active_elections_size && election_l->election_start < election_ttl_cutoff_l && !node.wallets.watcher->is_watched (i->root));
if (overflow_l || election_l->transition_time (solicitor, saturated_l))
{
election_l->clear_blocks ();
election_l->cleanup ();
i = sorted_roots_l.erase (i);
}
else
Expand Down Expand Up @@ -790,7 +790,7 @@ void nano::active_transactions::erase (nano::block const & block_a)
auto root_it (roots.get<tag_root> ().find (block_a.qualified_root ()));
if (root_it != roots.get<tag_root> ().end ())
{
root_it->election->clear_blocks ();
root_it->election->cleanup ();
root_it->election->adjust_dependent_difficulty ();
roots.get<tag_root> ().erase (root_it);
node.logger.try_log (boost::str (boost::format ("Election erased for block block %1% root %2%") % block_a.hash ().to_string () % block_a.root ().to_string ()));
Expand Down Expand Up @@ -936,18 +936,13 @@ nano::inactive_cache_information nano::active_transactions::find_inactive_votes_
}
else
{
return nano::inactive_cache_information{ std::chrono::steady_clock::time_point{}, 0, std::vector<nano::account>{} };
return nano::inactive_cache_information{};
}
}

void nano::active_transactions::erase_inactive_votes_cache (nano::block_hash const & hash_a)
{
auto & inactive_by_hash (inactive_votes_cache.get<tag_hash> ());
auto existing (inactive_by_hash.find (hash_a));
if (existing != inactive_by_hash.end ())
{
inactive_by_hash.erase (existing);
}
inactive_votes_cache.get<tag_hash> ().erase (hash_a);
}

bool nano::active_transactions::inactive_votes_bootstrap_check (std::vector<nano::account> const & voters_a, nano::block_hash const & hash_a, bool & confirmed_a)
Expand Down
1 change: 1 addition & 0 deletions nano/node/active_transactions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ class active_transactions final
bool inactive_votes_bootstrap_check (std::vector<nano::account> const &, nano::block_hash const &, bool &);
boost::thread thread;

friend class active_transactions_dropped_cleanup_Test;
friend class confirmation_height_prioritize_frontiers_Test;
friend class confirmation_height_prioritize_frontiers_overwrite_Test;
friend std::unique_ptr<container_info_component> collect_container_info (active_transactions &, const std::string &);
Expand Down
Loading