Skip to content

Commit

Permalink
Publish prototype (#2468)
Browse files Browse the repository at this point in the history
This is commit is based on a contribution from @Srayman on a recommendation to reduce traffic to principle representatives. The substantive changes are:
- Principle representatives do not relay votes
- Principle representatives are not the target of vote relaying
- Each principle representative broadcasts its votes to all other principle representative and additional non-principle representatives.

An analysis of this methodology as well as other suggestions were posted https://medium.com/nanocurrency/proposal-for-nano-node-network-optimizations-21003e79cdba . 

Testing on beta shows upwards of a 50% reduction in traffic to principle representatives.
  • Loading branch information
clemahieu committed Jan 30, 2020
1 parent 17b9e8f commit 57ce856
Show file tree
Hide file tree
Showing 9 changed files with 158 additions and 40 deletions.
57 changes: 41 additions & 16 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2061,13 +2061,16 @@ TEST (node, rep_weight)
auto & node (*system.nodes[0]);
nano::genesis genesis;
nano::keypair keypair1;
nano::keypair keypair2;
nano::block_builder builder;
auto amount_pr (node.minimum_principal_weight () + 100);
auto amount_not_pr (node.minimum_principal_weight () - 100);
std::shared_ptr<nano::block> block1 = builder
.state ()
.account (nano::test_genesis_key.pub)
.previous (genesis.hash ())
.representative (nano::test_genesis_key.pub)
.balance (nano::genesis_amount - node.minimum_principal_weight () * 4)
.balance (nano::genesis_amount - amount_not_pr)
.link (keypair1.pub)
.sign (nano::test_genesis_key.prv, nano::test_genesis_key.pub)
.work (*system.work.generate (genesis.hash ()))
Expand All @@ -2077,15 +2080,37 @@ TEST (node, rep_weight)
.account (keypair1.pub)
.previous (0)
.representative (keypair1.pub)
.balance (node.minimum_principal_weight () * 4)
.balance (amount_not_pr)
.link (block1->hash ())
.sign (keypair1.prv, keypair1.pub)
.work (*system.work.generate (keypair1.pub))
.build ();
std::shared_ptr<nano::block> block3 = builder
.state ()
.account (nano::test_genesis_key.pub)
.previous (block1->hash ())
.representative (nano::test_genesis_key.pub)
.balance (nano::genesis_amount - amount_not_pr - amount_pr)
.link (keypair2.pub)
.sign (nano::test_genesis_key.prv, nano::test_genesis_key.pub)
.work (*system.work.generate (block1->hash ()))
.build ();
std::shared_ptr<nano::block> block4 = builder
.state ()
.account (keypair2.pub)
.previous (0)
.representative (keypair2.pub)
.balance (amount_pr)
.link (block3->hash ())
.sign (keypair2.prv, keypair2.pub)
.work (*system.work.generate (keypair2.pub))
.build ();
{
auto transaction = node.store.tx_begin_write ();
ASSERT_EQ (nano::process_result::progress, node.ledger.process (transaction, *block1).code);
ASSERT_EQ (nano::process_result::progress, node.ledger.process (transaction, *block2).code);
ASSERT_EQ (nano::process_result::progress, node.ledger.process (transaction, *block3).code);
ASSERT_EQ (nano::process_result::progress, node.ledger.process (transaction, *block4).code);
}
node.network.udp_channels.insert (nano::endpoint (boost::asio::ip::address_v6::loopback (), nano::get_available_port ()), 0);
ASSERT_TRUE (node.rep_crawler.representatives (1).empty ());
Expand All @@ -2095,24 +2120,30 @@ TEST (node, rep_weight)
std::shared_ptr<nano::transport::channel> channel0 (std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, endpoint0, node.network_params.protocol.protocol_version));
std::shared_ptr<nano::transport::channel> channel1 (std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, endpoint1, node.network_params.protocol.protocol_version));
std::shared_ptr<nano::transport::channel> channel2 (std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, endpoint2, node.network_params.protocol.protocol_version));
node.network.udp_channels.insert (endpoint2, node.network_params.protocol.protocol_version);
node.network.udp_channels.insert (endpoint0, node.network_params.protocol.protocol_version);
node.network.udp_channels.insert (endpoint1, node.network_params.protocol.protocol_version);
auto vote1 = std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, genesis.open);
auto vote2 = std::make_shared<nano::vote> (keypair1.pub, keypair1.prv, 0, genesis.open);
node.network.udp_channels.insert (endpoint2, node.network_params.protocol.protocol_version);
auto vote0 = std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, genesis.open);
auto vote1 = std::make_shared<nano::vote> (keypair1.pub, keypair1.prv, 0, genesis.open);
auto vote2 = std::make_shared<nano::vote> (keypair2.pub, keypair2.prv, 0, genesis.open);
node.rep_crawler.add (genesis.open->hash ());
node.rep_crawler.response (channel0, vote1);
node.rep_crawler.response (channel1, vote2);
node.rep_crawler.response (channel0, vote0);
node.rep_crawler.response (channel1, vote1);
node.rep_crawler.response (channel2, vote2);
system.deadline_set (5s);
while (node.rep_crawler.representative_count () != 2)
{
ASSERT_NO_ERROR (system.poll ());
}
// Make sure we get the rep with the most weight first
auto reps (node.rep_crawler.representatives (1));
ASSERT_EQ (1, reps.size ());
ASSERT_EQ (nano::genesis_amount - node.minimum_principal_weight () * 4, reps[0].weight.number ());
ASSERT_EQ (node.balance (nano::test_genesis_key.pub), reps[0].weight.number ());
ASSERT_EQ (nano::test_genesis_key.pub, reps[0].account);
ASSERT_EQ (*channel0, reps[0].channel_ref ());
ASSERT_TRUE (node.rep_crawler.is_pr (*channel0));
ASSERT_FALSE (node.rep_crawler.is_pr (*channel1));
ASSERT_TRUE (node.rep_crawler.is_pr (*channel2));
}

TEST (node, rep_remove)
Expand Down Expand Up @@ -2976,9 +3007,9 @@ TEST (node, fork_invalid_block_signature)
}
auto vote (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, send2));
auto vote_corrupt (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, send2_corrupt));
node2.network.flood_vote (vote_corrupt);
node2.network.flood_vote (vote_corrupt, 1.0f);
ASSERT_NO_ERROR (system.poll ());
node2.network.flood_vote (vote);
node2.network.flood_vote (vote, 1.0f);
while (node1.block (send1->hash ()))
{
ASSERT_NO_ERROR (system.poll ());
Expand Down Expand Up @@ -3446,11 +3477,6 @@ TEST (node, bidirectional_tcp)
confirmed = node1->ledger.block_confirmed (transaction1, send1->hash ()) && node2->ledger.block_confirmed (transaction2, send1->hash ());
ASSERT_NO_ERROR (system.poll ());
}
// Test block propagation from node 2
{
auto transaction (system.wallet (0)->wallets.tx_begin_write ());
system.wallet (0)->store.erase (transaction, nano::test_genesis_key.pub);
}
auto send2 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, send1->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 2 * nano::Gxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node1->work_generate_blocking (send1->hash ())));
node2->process_active (send2);
node2->block_processor.flush ();
Expand All @@ -3460,7 +3486,6 @@ TEST (node, bidirectional_tcp)
ASSERT_NO_ERROR (system.poll ());
}
// Test block confirmation from node 2
system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
confirmed = false;
system.deadline_set (20s);
while (!confirmed)
Expand Down
31 changes: 27 additions & 4 deletions nano/core_test/peer_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,37 @@ TEST (channels, fill_random_part)
TEST (peer_container, list_fanout)
{
nano::system system (1);
auto list1 (system.nodes[0]->network.list_fanout ());
auto & node (*system.nodes[0]);
ASSERT_EQ (0, node.network.size ());
ASSERT_EQ (0.0, node.network.size_sqrt ());
ASSERT_EQ (0, node.network.fanout ());
auto list1 (node.network.list (node.network.fanout ()));
ASSERT_TRUE (list1.empty ());
auto add_peer = [&node](const uint16_t port_a) {
ASSERT_NE (nullptr, node.network.udp_channels.insert (nano::endpoint (boost::asio::ip::address_v6::loopback (), port_a), node.network_params.protocol.protocol_version));
};
add_peer (9998);
ASSERT_EQ (1, node.network.size ());
ASSERT_EQ (1.f, node.network.size_sqrt ());
ASSERT_EQ (1, node.network.fanout ());
auto list2 (node.network.list (node.network.fanout ()));
ASSERT_EQ (1, list2.size ());
add_peer (9999);
ASSERT_EQ (2, node.network.size ());
ASSERT_EQ (std::sqrt (2.f), node.network.size_sqrt ());
ASSERT_EQ (2, node.network.fanout ());
auto list3 (node.network.list (node.network.fanout ()));
ASSERT_EQ (2, list3.size ());
for (auto i (0); i < 1000; ++i)
{
ASSERT_NE (nullptr, system.nodes[0]->network.udp_channels.insert (nano::endpoint (boost::asio::ip::address_v6::loopback (), 10000 + i), system.nodes[0]->network_params.protocol.protocol_version));
add_peer (10000 + i);
}
auto list2 (system.nodes[0]->network.list_fanout ());
ASSERT_EQ (32, list2.size ());
ASSERT_EQ (1002, node.network.size ());
ASSERT_EQ (std::sqrt (1002.f), node.network.size_sqrt ());
size_t expected_size (std::ceil (std::sqrt (1002.f)));
ASSERT_EQ (expected_size, node.network.fanout ());
auto list4 (node.network.list (node.network.fanout ()));
ASSERT_EQ (expected_size, list4.size ());
}

// Test to make sure we don't repeatedly send keepalive messages to nodes that aren't responding
Expand Down
4 changes: 2 additions & 2 deletions nano/node/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -593,9 +593,9 @@ nano::vote_code nano::active_transactions::vote (std::shared_ptr<nano::vote> vot
}
if (at_least_one)
{
if (processed)
if (processed && !node.wallets.rep_counts ().have_half_rep ())
{
node.network.flood_vote (vote_a);
node.network.flood_vote (vote_a, 0.5f);
}
return replay ? nano::vote_code::replay : nano::vote_code::vote;
}
Expand Down
51 changes: 42 additions & 9 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,27 @@ bool nano::network::send_votes_cache (std::shared_ptr<nano::transport::channel>

void nano::network::flood_message (nano::message const & message_a, bool const is_droppable_a)
{
auto list (list_fanout ());
for (auto i (list.begin ()), n (list.end ()); i != n; ++i)
for (auto & i : list (fanout ()))
{
(*i)->send (message_a, nullptr, is_droppable_a);
i->send (message_a, nullptr, is_droppable_a);
}
}

void nano::network::flood_vote (std::shared_ptr<nano::vote> const & vote_a, float scale)
{
nano::confirm_ack message (vote_a);
for (auto & i : list_non_pr (fanout (scale)))
{
i->send (message, nullptr);
}
}

void nano::network::flood_vote_pr (std::shared_ptr<nano::vote> const & vote_a)
{
nano::confirm_ack message (vote_a);
for (auto const & i : node.rep_crawler.principal_representatives ())
{
i.channel->send (message, nullptr, false);
}
}

Expand Down Expand Up @@ -265,7 +282,7 @@ void nano::network::broadcast_confirm_req (std::shared_ptr<nano::block> block_a)
if (list->empty () || node.rep_crawler.total_weight () < node.config.online_weight_minimum.number ())
{
// broadcast request to all peers (with max limit 2 * sqrt (peers count))
auto peers (node.network.list (std::min (static_cast<size_t> (100), 2 * node.network.size_sqrt ())));
auto peers (node.network.list (std::min<size_t> (100, node.network.fanout (2.0))));
list->clear ();
list->insert (list->end (), peers.begin (), peers.end ());
}
Expand Down Expand Up @@ -614,13 +631,29 @@ std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list (size_
return result;
}

// Simulating with sqrt_broadcast_simulate shows we only need to broadcast to sqrt(total_peers) random peers in order to successfully publish to everyone with high probability
std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list_fanout ()
std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list_non_pr (size_t count_a)
{
auto result (list (size_sqrt ()));
std::deque<std::shared_ptr<nano::transport::channel>> result;
tcp_channels.list (result);
udp_channels.list (result);
nano::random_pool_shuffle (result.begin (), result.end ());
result.erase (std::remove_if (result.begin (), result.end (), [this](std::shared_ptr<nano::transport::channel> const & channel) {
return this->node.rep_crawler.is_pr (*channel);
}),
result.end ());
if (result.size () > count_a)
{
result.resize (count_a, nullptr);
}
return result;
}

// Simulating with sqrt_broadcast_simulate shows we only need to broadcast to sqrt(total_peers) random peers in order to successfully publish to everyone with high probability
size_t nano::network::fanout (float scale) const
{
return static_cast<size_t> (std::ceil (scale * size_sqrt ()));
}

std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::network::random_set (size_t count_a, uint8_t min_version_a) const
{
std::unordered_set<std::shared_ptr<nano::transport::channel>> result (tcp_channels.random_set (count_a, min_version_a));
Expand Down Expand Up @@ -744,9 +777,9 @@ size_t nano::network::size () const
return tcp_channels.size () + udp_channels.size ();
}

size_t nano::network::size_sqrt () const
float nano::network::size_sqrt () const
{
return (static_cast<size_t> (std::ceil (std::sqrt (size ()))));
return static_cast<float> (std::sqrt (size ()));
}

bool nano::network::empty () const
Expand Down
14 changes: 6 additions & 8 deletions nano/node/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,8 @@ class network final
random_fill (message.peers);
flood_message (message);
}
void flood_vote (std::shared_ptr<nano::vote> vote_a)
{
nano::confirm_ack message (vote_a);
flood_message (message);
}
void flood_vote (std::shared_ptr<nano::vote> const &, float scale);
void flood_vote_pr (std::shared_ptr<nano::vote> const &);
void flood_block (std::shared_ptr<nano::block> block_a, bool const is_droppable_a = true)
{
nano::publish publish (block_a);
Expand All @@ -136,8 +133,9 @@ class network final
// Should we reach out to this endpoint with a keepalive message
bool reachout (nano::endpoint const &, bool = false);
std::deque<std::shared_ptr<nano::transport::channel>> list (size_t);
// A list of random peers sized for the configured rebroadcast fanout
std::deque<std::shared_ptr<nano::transport::channel>> list_fanout ();
std::deque<std::shared_ptr<nano::transport::channel>> list_non_pr (size_t);
// Desired fanout for a given scale
size_t fanout (float scale = 1.0f) const;
void random_fill (std::array<nano::endpoint, 8> &) const;
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (size_t, uint8_t = 0) const;
// Get the next peer for attempting a tcp bootstrap connection
Expand All @@ -150,7 +148,7 @@ class network final
void ongoing_syn_cookie_cleanup ();
void ongoing_keepalive ();
size_t size () const;
size_t size_sqrt () const;
float size_sqrt () const;
bool empty () const;
nano::message_buffer_manager buffer_container;
boost::asio::ip::udp::resolver resolver;
Expand Down
27 changes: 27 additions & 0 deletions nano/node/repcrawler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,18 @@ void nano::rep_crawler::query (std::shared_ptr<nano::transport::channel> & chann
query (peers);
}

bool nano::rep_crawler::is_pr (nano::transport::channel const & channel_a) const
{
nano::lock_guard<std::mutex> lock (probable_reps_mutex);
auto existing = probable_reps.get<tag_channel_ref> ().find (channel_a);
bool result = false;
if (existing != probable_reps.get<tag_channel_ref> ().end ())
{
result = existing->weight > node.minimum_principal_weight ();
}
return result;
}

void nano::rep_crawler::response (std::shared_ptr<nano::transport::channel> & channel_a, std::shared_ptr<nano::vote> & vote_a)
{
nano::lock_guard<std::mutex> lock (active_mutex);
Expand Down Expand Up @@ -312,6 +324,21 @@ std::vector<nano::representative> nano::rep_crawler::representatives (size_t cou
return result;
}

std::vector<nano::representative> nano::rep_crawler::principal_representatives (size_t count_a)
{
std::vector<representative> result;
auto minimum = node.minimum_principal_weight ();
nano::lock_guard<std::mutex> lock (probable_reps_mutex);
for (auto i (probable_reps.get<tag_weight> ().begin ()), n (probable_reps.get<tag_weight> ().end ()); i != n && result.size () < count_a; ++i)
{
if (i->weight > minimum)
{
result.push_back (*i);
}
}
return result;
}

std::vector<std::shared_ptr<nano::transport::channel>> nano::rep_crawler::representative_endpoints (size_t count_a)
{
std::vector<std::shared_ptr<nano::transport::channel>> result;
Expand Down
6 changes: 6 additions & 0 deletions nano/node/repcrawler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ class rep_crawler final
/** Attempt to determine if the peer manages one or more representative accounts */
void query (std::shared_ptr<nano::transport::channel> & channel_a);

/** Query if a peer manages a principle representative */
bool is_pr (nano::transport::channel const &) const;

/**
* Called when a non-replay vote on a block previously sent by query() is received. This indiciates
* with high probability that the endpoint is a representative node.
Expand All @@ -104,6 +107,9 @@ class rep_crawler final
/** Request a list of the top \p count_a known representatives in descending order of weight, optionally with a minimum version \p opt_version_min_a */
std::vector<representative> representatives (size_t count_a = std::numeric_limits<size_t>::max (), boost::optional<decltype (nano::protocol_constants::protocol_version_min)> const & opt_version_min_a = boost::none);

/** Request a list of the top \p count_a known principal representatives in descending order of weight. */
std::vector<representative> principal_representatives (size_t count_a = std::numeric_limits<size_t>::max ());

/** Request a list of the top \p count_a known representative endpoints. */
std::vector<std::shared_ptr<nano::transport::channel>> representative_endpoints (size_t count_a);

Expand Down
4 changes: 3 additions & 1 deletion nano/node/voting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ void nano::vote_generator::send (nano::unique_lock<std::mutex> & lock_a)
auto transaction (store.tx_begin_read ());
wallets.foreach_representative ([this, &hashes_l, &transaction](nano::public_key const & pub_a, nano::raw_key const & prv_a) {
auto vote (this->store.vote_generate (transaction, pub_a, prv_a, hashes_l));
this->vote_processor.vote (vote, std::make_shared<nano::transport::channel_udp> (this->network.udp_channels, this->network.endpoint (), this->network_params.protocol.protocol_version));
this->votes_cache.add (vote);
this->network.flood_vote_pr (vote);
this->network.flood_vote (vote, 2.0f);
this->vote_processor.vote (vote, std::make_shared<nano::transport::channel_udp> (this->network.udp_channels, this->network.endpoint (), this->network_params.protocol.protocol_version));
});
}
lock_a.lock ();
Expand Down
4 changes: 4 additions & 0 deletions nano/node/wallet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ class wallet_representative_counts
public:
uint64_t voting{ 0 }; // Representatives with at least the configured minimum voting weight
uint64_t half_principal{ 0 }; // Representatives with at least 50% of principal representative requirements
bool have_half_rep () const
{
return half_principal > 0;
}
};

/**
Expand Down

0 comments on commit 57ce856

Please sign in to comment.