Skip to content

Commit

Permalink
Filter duplicate publish messages before deserializing
Browse files Browse the repository at this point in the history
When a message is unique, the digest is saved and passed around to network processing, which may drop it if the block processor is full.

Cleaning up a long unchecked block erases its digest from the publish filter.

The blocks_filter has been removed due to redundancy. The size of this filter is 256k, which uses about 4MB.
  • Loading branch information
guilhermelawless committed Mar 6, 2020
1 parent f0b8799 commit ab43d93
Show file tree
Hide file tree
Showing 22 changed files with 298 additions and 98 deletions.
51 changes: 51 additions & 0 deletions nano/core_test/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -772,3 +772,54 @@ TEST (active_transactions, activate_dependencies)
ASSERT_TRUE (node1->ledger.block_confirmed (node1->store.tx_begin_read (), block2->hash ()));
ASSERT_TRUE (node2->ledger.block_confirmed (node2->store.tx_begin_read (), block2->hash ()));
}

namespace nano
{
// Tests that blocks are correctly cleared from the duplicate filter for unconfirmed elections
TEST (active_transactions, dropped_cleanup)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto & node (*system.add_node (node_config));

nano::genesis genesis;
auto block = genesis.open;

// Add to network filter to ensure proper cleanup after the election is dropped
std::vector<uint8_t> block_bytes;
{
nano::vectorstream stream (block_bytes);
block->serialize (stream);
}
ASSERT_FALSE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));

auto election (node.active.insert (block).first);
ASSERT_NE (nullptr, election);

// Not yet removed
ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));

// Now simulate dropping the election, which performs a cleanup in the background using the node worker
ASSERT_FALSE (election->confirmed ());
{
nano::lock_guard<std::mutex> guard (node.active.mutex);
election->cleanup ();
}

// Push a worker task to ensure the cleanup is already performed
std::atomic<bool> flag{ false };
node.worker.push_task ([&flag]() {
flag = true;
});
system.deadline_set (5s);
while (!flag)
{
ASSERT_NO_ERROR (system.poll ());
}

// The filter must have been cleared
ASSERT_FALSE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
}
}
15 changes: 10 additions & 5 deletions nano/core_test/message_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ TEST (message_parser, exact_confirm_ack_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, block_uniquer, vote_uniquer, visitor, system.work);
auto block (std::make_shared<nano::send_block> (1, 1, 2, nano::keypair ().prv, 4, *system.work.generate (nano::root (1))));
auto vote (std::make_shared<nano::vote> (0, nano::keypair ().prv, 0, std::move (block)));
nano::confirm_ack message (vote);
Expand Down Expand Up @@ -96,9 +97,10 @@ TEST (message_parser, exact_confirm_req_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, block_uniquer, vote_uniquer, visitor, system.work);
auto block (std::make_shared<nano::send_block> (1, 1, 2, nano::keypair ().prv, 4, *system.work.generate (nano::root (1))));
nano::confirm_req message (std::move (block));
std::vector<uint8_t> bytes;
Expand Down Expand Up @@ -128,9 +130,10 @@ TEST (message_parser, exact_confirm_req_hash_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, block_uniquer, vote_uniquer, visitor, system.work);
nano::send_block block (1, 1, 2, nano::keypair ().prv, 4, *system.work.generate (nano::root (1)));
nano::confirm_req message (block.hash (), block.root ());
std::vector<uint8_t> bytes;
Expand Down Expand Up @@ -160,9 +163,10 @@ TEST (message_parser, exact_publish_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, block_uniquer, vote_uniquer, visitor, system.work);
auto block (std::make_shared<nano::send_block> (1, 1, 2, nano::keypair ().prv, 4, *system.work.generate (nano::root (1))));
nano::publish message (std::move (block));
std::vector<uint8_t> bytes;
Expand Down Expand Up @@ -192,9 +196,10 @@ TEST (message_parser, exact_keepalive_size)
{
nano::system system (1);
test_visitor visitor;
nano::network_filter filter (1);
nano::block_uniquer block_uniquer;
nano::vote_uniquer vote_uniquer (block_uniquer);
nano::message_parser parser (block_uniquer, vote_uniquer, visitor, system.work);
nano::message_parser parser (filter, block_uniquer, vote_uniquer, visitor, system.work);
nano::keepalive message;
std::vector<uint8_t> bytes;
{
Expand Down
61 changes: 61 additions & 0 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,67 @@ TEST (network, replace_port)
node1->stop ();
}

TEST (network, duplicate_detection)
{
nano::system system;
nano::node_flags node_flags;
node_flags.disable_udp = false;
auto & node0 (*system.add_node (node_flags));
auto & node1 (*system.add_node (node_flags));
auto udp_channel (std::make_shared<nano::transport::channel_udp> (node0.network.udp_channels, node1.network.endpoint (), node1.network_params.protocol.protocol_version));
nano::genesis genesis;
nano::publish publish (genesis.open);

// Publish duplicate detection through UDP
ASSERT_EQ (0, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish));
udp_channel->send (publish);
udp_channel->send (publish);
system.deadline_set (2s);
while (node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish) < 1)
{
ASSERT_NO_ERROR (system.poll ());
}

// Publish duplicate detection through TCP
auto tcp_channel (node0.network.tcp_channels.find_channel (nano::transport::map_endpoint_to_tcp (node1.network.endpoint ())));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish));
tcp_channel->send (publish);
system.deadline_set (2s);
while (node1.stats.count (nano::stat::type::filter, nano::stat::detail::duplicate_publish) < 2)
{
ASSERT_NO_ERROR (system.poll ());
}
}

TEST (network, duplicate_revert_publish)
{
nano::system system;
nano::node_flags node_flags;
node_flags.block_processor_full_size = 0;
auto & node (*system.add_node (node_flags));
ASSERT_TRUE (node.block_processor.full ());
nano::genesis genesis;
nano::publish publish (genesis.open);
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
publish.block->serialize (stream);
}
// Add to the blocks filter
// Should be cleared when dropping due to a full block processor, as long as the message has the optional digest attached
// Test network.duplicate_detection ensures that the digest is attached when deserializing messages
nano::uint128_t digest;
ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size (), &digest));
ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
auto channel (std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, node.network.endpoint (), node.network_params.protocol.protocol_version));
ASSERT_EQ (0, publish.digest);
node.network.process_message (publish, channel);
ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
publish.digest = digest;
node.network.process_message (publish, channel);
ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
}

// The test must be completed in less than 1 second
TEST (bandwidth_limiter, validate)
{
Expand Down
14 changes: 12 additions & 2 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3217,7 +3217,7 @@ TEST (node, block_processor_full)
{
nano::system system;
nano::node_flags node_flags;
node_flags.block_processor_full_size = 2;
node_flags.block_processor_full_size = 3;
auto & node = *system.add_node (nano::node_config (nano::get_available_port (), system.logging), node_flags);
nano::genesis genesis;
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
Expand Down Expand Up @@ -3245,7 +3245,7 @@ TEST (node, block_processor_half_full)
{
nano::system system;
nano::node_flags node_flags;
node_flags.block_processor_full_size = 4;
node_flags.block_processor_full_size = 6;
auto & node = *system.add_node (nano::node_config (nano::get_available_port (), system.logging), node_flags);
nano::genesis genesis;
auto send1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, 0));
Expand Down Expand Up @@ -3421,6 +3421,14 @@ TEST (node, unchecked_cleanup)
nano::keypair key;
auto & node (*system.nodes[0]);
auto open (std::make_shared<nano::state_block> (key.pub, 0, key.pub, 1, key.pub, key.prv, key.pub, *system.work.generate (key.pub)));
std::vector<uint8_t> bytes;
{
nano::vectorstream stream (bytes);
open->serialize (stream);
}
// Add to the blocks filter
// Should be cleared after unchecked cleanup
ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
node.process_active (open);
node.block_processor.flush ();
node.config.unchecked_cutoff_time = std::chrono::seconds (2);
Expand All @@ -3432,6 +3440,7 @@ TEST (node, unchecked_cleanup)
}
std::this_thread::sleep_for (std::chrono::seconds (1));
node.unchecked_cleanup ();
ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
{
auto transaction (node.store.tx_begin_read ());
auto unchecked_count (node.store.unchecked_count (transaction));
Expand All @@ -3440,6 +3449,7 @@ TEST (node, unchecked_cleanup)
}
std::this_thread::sleep_for (std::chrono::seconds (2));
node.unchecked_cleanup ();
ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
{
auto transaction (node.store.tx_begin_read ());
auto unchecked_count (node.store.unchecked_count (transaction));
Expand Down
7 changes: 5 additions & 2 deletions nano/lib/numbers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,11 @@ bool nano::validate_message (nano::public_key const & public_key, nano::uint256_

bool nano::validate_message_batch (const unsigned char ** m, size_t * mlen, const unsigned char ** pk, const unsigned char ** RS, size_t num, int * valid)
{
bool result (0 == ed25519_sign_open_batch (m, mlen, pk, RS, num, valid));
return result;
for (size_t i{ 0 }; i < num; ++i)
{
valid[i] = (0 == ed25519_sign_open (m[i], mlen[i], pk[i], RS[i]));
}
return true;
}

nano::uint128_union::uint128_union (std::string const & string_a)
Expand Down
6 changes: 6 additions & 0 deletions nano/lib/stats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,9 @@ std::string nano::stat::type_to_string (uint32_t key)
case nano::stat::type::requests:
res = "requests";
break;
case nano::stat::type::filter:
res = "filter";
break;
}
return res;
}
Expand Down Expand Up @@ -694,6 +697,9 @@ std::string nano::stat::detail_to_string (uint32_t key)
case nano::stat::detail::requests_unknown:
res = "requests_unknown";
break;
case nano::stat::detail::duplicate_publish:
res = "duplicate_publish";
break;
}
return res;
}
Expand Down
8 changes: 6 additions & 2 deletions nano/lib/stats.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,8 @@ class stat final
confirmation_height,
drop,
aggregator,
requests
requests,
filter,
};

/** Optional detail type */
Expand Down Expand Up @@ -314,7 +315,10 @@ class stat final
requests_generated_hashes,
requests_cached_votes,
requests_generated_votes,
requests_unknown
requests_unknown,

// duplicate
duplicate_publish
};

/** Direction of the stat. If the direction is irrelevant, use in */
Expand Down
13 changes: 4 additions & 9 deletions nano/node/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ void nano::active_transactions::request_confirm (nano::unique_lock<std::mutex> &
bool const overflow_l (count_l >= node.config.active_elections_size && election_l->election_start < election_ttl_cutoff_l && !node.wallets.watcher->is_watched (i->root));
if (overflow_l || election_l->transition_time (solicitor, saturated_l))
{
election_l->clear_blocks ();
election_l->cleanup ();
i = sorted_roots_l.erase (i);
}
else
Expand Down Expand Up @@ -791,7 +791,7 @@ void nano::active_transactions::erase (nano::block const & block_a)
auto root_it (roots.get<tag_root> ().find (block_a.qualified_root ()));
if (root_it != roots.get<tag_root> ().end ())
{
root_it->election->clear_blocks ();
root_it->election->cleanup ();
root_it->election->adjust_dependent_difficulty ();
roots.get<tag_root> ().erase (root_it);
node.logger.try_log (boost::str (boost::format ("Election erased for block block %1% root %2%") % block_a.hash ().to_string () % block_a.root ().to_string ()));
Expand Down Expand Up @@ -937,18 +937,13 @@ nano::inactive_cache_information nano::active_transactions::find_inactive_votes_
}
else
{
return nano::inactive_cache_information{ std::chrono::steady_clock::time_point{}, 0, std::vector<nano::account>{} };
return nano::inactive_cache_information{};
}
}

void nano::active_transactions::erase_inactive_votes_cache (nano::block_hash const & hash_a)
{
auto & inactive_by_hash (inactive_votes_cache.get<tag_hash> ());
auto existing (inactive_by_hash.find (hash_a));
if (existing != inactive_by_hash.end ())
{
inactive_by_hash.erase (existing);
}
inactive_votes_cache.get<tag_hash> ().erase (hash_a);
}

bool nano::active_transactions::inactive_votes_bootstrap_check (std::vector<nano::account> const & voters_a, nano::block_hash const & hash_a, bool & confirmed_a)
Expand Down
1 change: 1 addition & 0 deletions nano/node/active_transactions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ class active_transactions final
bool inactive_votes_bootstrap_check (std::vector<nano::account> const &, nano::block_hash const &, bool &);
boost::thread thread;

friend class active_transactions_dropped_cleanup_Test;
friend class confirmation_height_prioritize_frontiers_Test;
friend class confirmation_height_prioritize_frontiers_overwrite_Test;
friend std::unique_ptr<container_info_component> collect_container_info (active_transactions &, const std::string &);
Expand Down
Loading

0 comments on commit ab43d93

Please sign in to comment.