diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 6d604a7d95..ae909967dd 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -2061,13 +2061,16 @@ TEST (node, rep_weight) auto & node (*system.nodes[0]); nano::genesis genesis; nano::keypair keypair1; + nano::keypair keypair2; nano::block_builder builder; + auto amount_pr (node.minimum_principal_weight () + 100); + auto amount_not_pr (node.minimum_principal_weight () - 100); std::shared_ptr block1 = builder .state () .account (nano::test_genesis_key.pub) .previous (genesis.hash ()) .representative (nano::test_genesis_key.pub) - .balance (nano::genesis_amount - node.minimum_principal_weight () * 4) + .balance (nano::genesis_amount - amount_not_pr) .link (keypair1.pub) .sign (nano::test_genesis_key.prv, nano::test_genesis_key.pub) .work (*system.work.generate (genesis.hash ())) @@ -2077,15 +2080,37 @@ TEST (node, rep_weight) .account (keypair1.pub) .previous (0) .representative (keypair1.pub) - .balance (node.minimum_principal_weight () * 4) + .balance (amount_not_pr) .link (block1->hash ()) .sign (keypair1.prv, keypair1.pub) .work (*system.work.generate (keypair1.pub)) .build (); + std::shared_ptr block3 = builder + .state () + .account (nano::test_genesis_key.pub) + .previous (block1->hash ()) + .representative (nano::test_genesis_key.pub) + .balance (nano::genesis_amount - amount_not_pr - amount_pr) + .link (keypair2.pub) + .sign (nano::test_genesis_key.prv, nano::test_genesis_key.pub) + .work (*system.work.generate (block1->hash ())) + .build (); + std::shared_ptr block4 = builder + .state () + .account (keypair2.pub) + .previous (0) + .representative (keypair2.pub) + .balance (amount_pr) + .link (block3->hash ()) + .sign (keypair2.prv, keypair2.pub) + .work (*system.work.generate (keypair2.pub)) + .build (); { auto transaction = node.store.tx_begin_write (); ASSERT_EQ (nano::process_result::progress, node.ledger.process (transaction, *block1).code); ASSERT_EQ (nano::process_result::progress, node.ledger.process (transaction, *block2).code); + ASSERT_EQ (nano::process_result::progress, node.ledger.process (transaction, *block3).code); + ASSERT_EQ (nano::process_result::progress, node.ledger.process (transaction, *block4).code); } node.network.udp_channels.insert (nano::endpoint (boost::asio::ip::address_v6::loopback (), nano::get_available_port ()), 0); ASSERT_TRUE (node.rep_crawler.representatives (1).empty ()); @@ -2095,14 +2120,17 @@ TEST (node, rep_weight) std::shared_ptr channel0 (std::make_shared (node.network.udp_channels, endpoint0, node.network_params.protocol.protocol_version)); std::shared_ptr channel1 (std::make_shared (node.network.udp_channels, endpoint1, node.network_params.protocol.protocol_version)); std::shared_ptr channel2 (std::make_shared (node.network.udp_channels, endpoint2, node.network_params.protocol.protocol_version)); - node.network.udp_channels.insert (endpoint2, node.network_params.protocol.protocol_version); node.network.udp_channels.insert (endpoint0, node.network_params.protocol.protocol_version); node.network.udp_channels.insert (endpoint1, node.network_params.protocol.protocol_version); - auto vote1 = std::make_shared (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, genesis.open); - auto vote2 = std::make_shared (keypair1.pub, keypair1.prv, 0, genesis.open); + node.network.udp_channels.insert (endpoint2, node.network_params.protocol.protocol_version); + auto vote0 = std::make_shared (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, genesis.open); + auto vote1 = std::make_shared (keypair1.pub, keypair1.prv, 0, genesis.open); + auto vote2 = std::make_shared (keypair2.pub, keypair2.prv, 0, genesis.open); node.rep_crawler.add (genesis.open->hash ()); - node.rep_crawler.response (channel0, vote1); - node.rep_crawler.response (channel1, vote2); + node.rep_crawler.response (channel0, vote0); + node.rep_crawler.response (channel1, vote1); + node.rep_crawler.response (channel2, vote2); + system.deadline_set (5s); while (node.rep_crawler.representative_count () != 2) { ASSERT_NO_ERROR (system.poll ()); @@ -2110,9 +2138,12 @@ TEST (node, rep_weight) // Make sure we get the rep with the most weight first auto reps (node.rep_crawler.representatives (1)); ASSERT_EQ (1, reps.size ()); - ASSERT_EQ (nano::genesis_amount - node.minimum_principal_weight () * 4, reps[0].weight.number ()); + ASSERT_EQ (node.balance (nano::test_genesis_key.pub), reps[0].weight.number ()); ASSERT_EQ (nano::test_genesis_key.pub, reps[0].account); ASSERT_EQ (*channel0, reps[0].channel_ref ()); + ASSERT_TRUE (node.rep_crawler.is_pr (*channel0)); + ASSERT_FALSE (node.rep_crawler.is_pr (*channel1)); + ASSERT_TRUE (node.rep_crawler.is_pr (*channel2)); } TEST (node, rep_remove) @@ -2976,9 +3007,9 @@ TEST (node, fork_invalid_block_signature) } auto vote (std::make_shared (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, send2)); auto vote_corrupt (std::make_shared (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, send2_corrupt)); - node2.network.flood_vote (vote_corrupt); + node2.network.flood_vote (vote_corrupt, 1.0f); ASSERT_NO_ERROR (system.poll ()); - node2.network.flood_vote (vote); + node2.network.flood_vote (vote, 1.0f); while (node1.block (send1->hash ())) { ASSERT_NO_ERROR (system.poll ()); @@ -3446,11 +3477,6 @@ TEST (node, bidirectional_tcp) confirmed = node1->ledger.block_confirmed (transaction1, send1->hash ()) && node2->ledger.block_confirmed (transaction2, send1->hash ()); ASSERT_NO_ERROR (system.poll ()); } - // Test block propagation from node 2 - { - auto transaction (system.wallet (0)->wallets.tx_begin_write ()); - system.wallet (0)->store.erase (transaction, nano::test_genesis_key.pub); - } auto send2 (std::make_shared (nano::test_genesis_key.pub, send1->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 2 * nano::Gxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node1->work_generate_blocking (send1->hash ()))); node2->process_active (send2); node2->block_processor.flush (); @@ -3460,7 +3486,6 @@ TEST (node, bidirectional_tcp) ASSERT_NO_ERROR (system.poll ()); } // Test block confirmation from node 2 - system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv); confirmed = false; system.deadline_set (20s); while (!confirmed) diff --git a/nano/core_test/peer_container.cpp b/nano/core_test/peer_container.cpp index 8edef342f5..dcf519bf55 100644 --- a/nano/core_test/peer_container.cpp +++ b/nano/core_test/peer_container.cpp @@ -120,14 +120,37 @@ TEST (channels, fill_random_part) TEST (peer_container, list_fanout) { nano::system system (1); - auto list1 (system.nodes[0]->network.list_fanout ()); + auto & node (*system.nodes[0]); + ASSERT_EQ (0, node.network.size ()); + ASSERT_EQ (0.0, node.network.size_sqrt ()); + ASSERT_EQ (0, node.network.fanout ()); + auto list1 (node.network.list (node.network.fanout ())); ASSERT_TRUE (list1.empty ()); + auto add_peer = [&node](const uint16_t port_a) { + ASSERT_NE (nullptr, node.network.udp_channels.insert (nano::endpoint (boost::asio::ip::address_v6::loopback (), port_a), node.network_params.protocol.protocol_version)); + }; + add_peer (9998); + ASSERT_EQ (1, node.network.size ()); + ASSERT_EQ (1.f, node.network.size_sqrt ()); + ASSERT_EQ (1, node.network.fanout ()); + auto list2 (node.network.list (node.network.fanout ())); + ASSERT_EQ (1, list2.size ()); + add_peer (9999); + ASSERT_EQ (2, node.network.size ()); + ASSERT_EQ (std::sqrt (2.f), node.network.size_sqrt ()); + ASSERT_EQ (2, node.network.fanout ()); + auto list3 (node.network.list (node.network.fanout ())); + ASSERT_EQ (2, list3.size ()); for (auto i (0); i < 1000; ++i) { - ASSERT_NE (nullptr, system.nodes[0]->network.udp_channels.insert (nano::endpoint (boost::asio::ip::address_v6::loopback (), 10000 + i), system.nodes[0]->network_params.protocol.protocol_version)); + add_peer (10000 + i); } - auto list2 (system.nodes[0]->network.list_fanout ()); - ASSERT_EQ (32, list2.size ()); + ASSERT_EQ (1002, node.network.size ()); + ASSERT_EQ (std::sqrt (1002.f), node.network.size_sqrt ()); + size_t expected_size (std::ceil (std::sqrt (1002.f))); + ASSERT_EQ (expected_size, node.network.fanout ()); + auto list4 (node.network.list (node.network.fanout ())); + ASSERT_EQ (expected_size, list4.size ()); } // Test to make sure we don't repeatedly send keepalive messages to nodes that aren't responding diff --git a/nano/node/active_transactions.cpp b/nano/node/active_transactions.cpp index 76cbad8b6e..4e8b944a85 100644 --- a/nano/node/active_transactions.cpp +++ b/nano/node/active_transactions.cpp @@ -593,9 +593,9 @@ nano::vote_code nano::active_transactions::vote (std::shared_ptr vot } if (at_least_one) { - if (processed) + if (processed && !node.wallets.rep_counts ().have_half_rep ()) { - node.network.flood_vote (vote_a); + node.network.flood_vote (vote_a, 0.5f); } return replay ? nano::vote_code::replay : nano::vote_code::vote; } diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 149fa847f4..ffbe20839b 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -215,10 +215,27 @@ bool nano::network::send_votes_cache (std::shared_ptr void nano::network::flood_message (nano::message const & message_a, bool const is_droppable_a) { - auto list (list_fanout ()); - for (auto i (list.begin ()), n (list.end ()); i != n; ++i) + for (auto & i : list (fanout ())) { - (*i)->send (message_a, nullptr, is_droppable_a); + i->send (message_a, nullptr, is_droppable_a); + } +} + +void nano::network::flood_vote (std::shared_ptr const & vote_a, float scale) +{ + nano::confirm_ack message (vote_a); + for (auto & i : list_non_pr (fanout (scale))) + { + i->send (message, nullptr); + } +} + +void nano::network::flood_vote_pr (std::shared_ptr const & vote_a) +{ + nano::confirm_ack message (vote_a); + for (auto const & i : node.rep_crawler.principal_representatives ()) + { + i.channel->send (message, nullptr, false); } } @@ -265,7 +282,7 @@ void nano::network::broadcast_confirm_req (std::shared_ptr block_a) if (list->empty () || node.rep_crawler.total_weight () < node.config.online_weight_minimum.number ()) { // broadcast request to all peers (with max limit 2 * sqrt (peers count)) - auto peers (node.network.list (std::min (static_cast (100), 2 * node.network.size_sqrt ()))); + auto peers (node.network.list (std::min (100, node.network.fanout (2.0)))); list->clear (); list->insert (list->end (), peers.begin (), peers.end ()); } @@ -618,13 +635,29 @@ std::deque> nano::network::list (size_ return result; } -// Simulating with sqrt_broadcast_simulate shows we only need to broadcast to sqrt(total_peers) random peers in order to successfully publish to everyone with high probability -std::deque> nano::network::list_fanout () +std::deque> nano::network::list_non_pr (size_t count_a) { - auto result (list (size_sqrt ())); + std::deque> result; + tcp_channels.list (result); + udp_channels.list (result); + nano::random_pool_shuffle (result.begin (), result.end ()); + result.erase (std::remove_if (result.begin (), result.end (), [this](std::shared_ptr const & channel) { + return this->node.rep_crawler.is_pr (*channel); + }), + result.end ()); + if (result.size () > count_a) + { + result.resize (count_a, nullptr); + } return result; } +// Simulating with sqrt_broadcast_simulate shows we only need to broadcast to sqrt(total_peers) random peers in order to successfully publish to everyone with high probability +size_t nano::network::fanout (float scale) const +{ + return static_cast (std::ceil (scale * size_sqrt ())); +} + std::unordered_set> nano::network::random_set (size_t count_a, uint8_t min_version_a) const { std::unordered_set> result (tcp_channels.random_set (count_a, min_version_a)); @@ -748,9 +781,9 @@ size_t nano::network::size () const return tcp_channels.size () + udp_channels.size (); } -size_t nano::network::size_sqrt () const +float nano::network::size_sqrt () const { - return (static_cast (std::ceil (std::sqrt (size ())))); + return static_cast (std::sqrt (size ())); } bool nano::network::empty () const diff --git a/nano/node/network.hpp b/nano/node/network.hpp index b6729a3733..9156ab8fce 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -105,11 +105,8 @@ class network final random_fill (message.peers); flood_message (message); } - void flood_vote (std::shared_ptr vote_a) - { - nano::confirm_ack message (vote_a); - flood_message (message); - } + void flood_vote (std::shared_ptr const &, float scale); + void flood_vote_pr (std::shared_ptr const &); void flood_block (std::shared_ptr block_a, bool const is_droppable_a = true) { nano::publish publish (block_a); @@ -136,8 +133,9 @@ class network final // Should we reach out to this endpoint with a keepalive message bool reachout (nano::endpoint const &, bool = false); std::deque> list (size_t); - // A list of random peers sized for the configured rebroadcast fanout - std::deque> list_fanout (); + std::deque> list_non_pr (size_t); + // Desired fanout for a given scale + size_t fanout (float scale = 1.0f) const; void random_fill (std::array &) const; std::unordered_set> random_set (size_t, uint8_t = 0) const; // Get the next peer for attempting a tcp bootstrap connection @@ -150,7 +148,7 @@ class network final void ongoing_syn_cookie_cleanup (); void ongoing_keepalive (); size_t size () const; - size_t size_sqrt () const; + float size_sqrt () const; bool empty () const; nano::message_buffer_manager buffer_container; boost::asio::ip::udp::resolver resolver; diff --git a/nano/node/repcrawler.cpp b/nano/node/repcrawler.cpp index b8e239b429..8e30473bdc 100644 --- a/nano/node/repcrawler.cpp +++ b/nano/node/repcrawler.cpp @@ -186,6 +186,18 @@ void nano::rep_crawler::query (std::shared_ptr & chann query (peers); } +bool nano::rep_crawler::is_pr (nano::transport::channel const & channel_a) const +{ + nano::lock_guard lock (probable_reps_mutex); + auto existing = probable_reps.get ().find (channel_a); + bool result = false; + if (existing != probable_reps.get ().end ()) + { + result = existing->weight > node.minimum_principal_weight (); + } + return result; +} + void nano::rep_crawler::response (std::shared_ptr & channel_a, std::shared_ptr & vote_a) { nano::lock_guard lock (active_mutex); @@ -312,6 +324,21 @@ std::vector nano::rep_crawler::representatives (size_t cou return result; } +std::vector nano::rep_crawler::principal_representatives (size_t count_a) +{ + std::vector result; + auto minimum = node.minimum_principal_weight (); + nano::lock_guard lock (probable_reps_mutex); + for (auto i (probable_reps.get ().begin ()), n (probable_reps.get ().end ()); i != n && result.size () < count_a; ++i) + { + if (i->weight > minimum) + { + result.push_back (*i); + } + } + return result; +} + std::vector> nano::rep_crawler::representative_endpoints (size_t count_a) { std::vector> result; diff --git a/nano/node/repcrawler.hpp b/nano/node/repcrawler.hpp index b0a59c7925..62e52d6886 100644 --- a/nano/node/repcrawler.hpp +++ b/nano/node/repcrawler.hpp @@ -91,6 +91,9 @@ class rep_crawler final /** Attempt to determine if the peer manages one or more representative accounts */ void query (std::shared_ptr & channel_a); + /** Query if a peer manages a principle representative */ + bool is_pr (nano::transport::channel const &) const; + /** * Called when a non-replay vote on a block previously sent by query() is received. This indiciates * with high probability that the endpoint is a representative node. @@ -104,6 +107,9 @@ class rep_crawler final /** Request a list of the top \p count_a known representatives in descending order of weight, optionally with a minimum version \p opt_version_min_a */ std::vector representatives (size_t count_a = std::numeric_limits::max (), boost::optional const & opt_version_min_a = boost::none); + /** Request a list of the top \p count_a known principal representatives in descending order of weight. */ + std::vector principal_representatives (size_t count_a = std::numeric_limits::max ()); + /** Request a list of the top \p count_a known representative endpoints. */ std::vector> representative_endpoints (size_t count_a); diff --git a/nano/node/voting.cpp b/nano/node/voting.cpp index 2f396aae93..261b0c17c0 100644 --- a/nano/node/voting.cpp +++ b/nano/node/voting.cpp @@ -64,8 +64,10 @@ void nano::vote_generator::send (nano::unique_lock & lock_a) auto transaction (store.tx_begin_read ()); wallets.foreach_representative ([this, &hashes_l, &transaction](nano::public_key const & pub_a, nano::raw_key const & prv_a) { auto vote (this->store.vote_generate (transaction, pub_a, prv_a, hashes_l)); - this->vote_processor.vote (vote, std::make_shared (this->network.udp_channels, this->network.endpoint (), this->network_params.protocol.protocol_version)); this->votes_cache.add (vote); + this->network.flood_vote_pr (vote); + this->network.flood_vote (vote, 2.0f); + this->vote_processor.vote (vote, std::make_shared (this->network.udp_channels, this->network.endpoint (), this->network_params.protocol.protocol_version)); }); } lock_a.lock (); diff --git a/nano/node/wallet.hpp b/nano/node/wallet.hpp index 6ff97cc49a..3d302b8cfe 100644 --- a/nano/node/wallet.hpp +++ b/nano/node/wallet.hpp @@ -185,6 +185,10 @@ class wallet_representative_counts public: uint64_t voting{ 0 }; // Representatives with at least the configured minimum voting weight uint64_t half_principal{ 0 }; // Representatives with at least 50% of principal representative requirements + bool have_half_rep () const + { + return half_principal > 0; + } }; /**