Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish prototype #2468

Merged
merged 9 commits into from
Jan 30, 2020
57 changes: 41 additions & 16 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2040,13 +2040,16 @@ TEST (node, rep_weight)
auto & node (*system.nodes[0]);
nano::genesis genesis;
nano::keypair keypair1;
nano::keypair keypair2;
nano::block_builder builder;
auto amount_pr (node.minimum_principal_weight () + 100);
auto amount_not_pr (node.minimum_principal_weight () - 100);
std::shared_ptr<nano::block> block1 = builder
.state ()
.account (nano::test_genesis_key.pub)
.previous (genesis.hash ())
.representative (nano::test_genesis_key.pub)
.balance (nano::genesis_amount - node.minimum_principal_weight () * 4)
.balance (nano::genesis_amount - amount_not_pr)
.link (keypair1.pub)
.sign (nano::test_genesis_key.prv, nano::test_genesis_key.pub)
.work (*system.work.generate (genesis.hash ()))
Expand All @@ -2056,15 +2059,37 @@ TEST (node, rep_weight)
.account (keypair1.pub)
.previous (0)
.representative (keypair1.pub)
.balance (node.minimum_principal_weight () * 4)
.balance (amount_not_pr)
.link (block1->hash ())
.sign (keypair1.prv, keypair1.pub)
.work (*system.work.generate (keypair1.pub))
.build ();
std::shared_ptr<nano::block> block3 = builder
.state ()
.account (nano::test_genesis_key.pub)
.previous (block1->hash ())
.representative (nano::test_genesis_key.pub)
.balance (nano::genesis_amount - amount_not_pr - amount_pr)
.link (keypair2.pub)
.sign (nano::test_genesis_key.prv, nano::test_genesis_key.pub)
.work (*system.work.generate (block1->hash ()))
.build ();
std::shared_ptr<nano::block> block4 = builder
.state ()
.account (keypair2.pub)
.previous (0)
.representative (keypair2.pub)
.balance (amount_pr)
.link (block3->hash ())
.sign (keypair2.prv, keypair2.pub)
.work (*system.work.generate (keypair2.pub))
.build ();
{
auto transaction = node.store.tx_begin_write ();
ASSERT_EQ (nano::process_result::progress, node.ledger.process (transaction, *block1).code);
ASSERT_EQ (nano::process_result::progress, node.ledger.process (transaction, *block2).code);
ASSERT_EQ (nano::process_result::progress, node.ledger.process (transaction, *block3).code);
ASSERT_EQ (nano::process_result::progress, node.ledger.process (transaction, *block4).code);
}
node.network.udp_channels.insert (nano::endpoint (boost::asio::ip::address_v6::loopback (), nano::get_available_port ()), 0);
ASSERT_TRUE (node.rep_crawler.representatives (1).empty ());
Expand All @@ -2074,24 +2099,30 @@ TEST (node, rep_weight)
std::shared_ptr<nano::transport::channel> channel0 (std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, endpoint0, node.network_params.protocol.protocol_version));
std::shared_ptr<nano::transport::channel> channel1 (std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, endpoint1, node.network_params.protocol.protocol_version));
std::shared_ptr<nano::transport::channel> channel2 (std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, endpoint2, node.network_params.protocol.protocol_version));
node.network.udp_channels.insert (endpoint2, node.network_params.protocol.protocol_version);
node.network.udp_channels.insert (endpoint0, node.network_params.protocol.protocol_version);
node.network.udp_channels.insert (endpoint1, node.network_params.protocol.protocol_version);
auto vote1 = std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, genesis.open);
auto vote2 = std::make_shared<nano::vote> (keypair1.pub, keypair1.prv, 0, genesis.open);
node.network.udp_channels.insert (endpoint2, node.network_params.protocol.protocol_version);
auto vote0 = std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, genesis.open);
auto vote1 = std::make_shared<nano::vote> (keypair1.pub, keypair1.prv, 0, genesis.open);
auto vote2 = std::make_shared<nano::vote> (keypair2.pub, keypair2.prv, 0, genesis.open);
node.rep_crawler.add (genesis.open->hash ());
node.rep_crawler.response (channel0, vote1);
node.rep_crawler.response (channel1, vote2);
node.rep_crawler.response (channel0, vote0);
node.rep_crawler.response (channel1, vote1);
node.rep_crawler.response (channel2, vote2);
system.deadline_set (5s);
while (node.rep_crawler.representative_count () != 2)
{
ASSERT_NO_ERROR (system.poll ());
}
// Make sure we get the rep with the most weight first
auto reps (node.rep_crawler.representatives (1));
ASSERT_EQ (1, reps.size ());
ASSERT_EQ (nano::genesis_amount - node.minimum_principal_weight () * 4, reps[0].weight.number ());
ASSERT_EQ (node.balance (nano::test_genesis_key.pub), reps[0].weight.number ());
ASSERT_EQ (nano::test_genesis_key.pub, reps[0].account);
ASSERT_EQ (*channel0, reps[0].channel_ref ());
ASSERT_TRUE (node.rep_crawler.is_pr (*channel0));
ASSERT_FALSE (node.rep_crawler.is_pr (*channel1));
ASSERT_TRUE (node.rep_crawler.is_pr (*channel2));
}

TEST (node, rep_remove)
Expand Down Expand Up @@ -2953,9 +2984,9 @@ TEST (node, fork_invalid_block_signature)
}
auto vote (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, send2));
auto vote_corrupt (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, send2_corrupt));
node2.network.flood_vote (vote_corrupt);
node2.network.flood_vote (vote_corrupt, 1.0f);
ASSERT_NO_ERROR (system.poll ());
node2.network.flood_vote (vote);
node2.network.flood_vote (vote, 1.0f);
while (node1.block (send1->hash ()))
{
ASSERT_NO_ERROR (system.poll ());
Expand Down Expand Up @@ -3396,11 +3427,6 @@ TEST (node, bidirectional_tcp)
confirmed = node1->ledger.block_confirmed (transaction1, send1->hash ()) && node2->ledger.block_confirmed (transaction2, send1->hash ());
ASSERT_NO_ERROR (system.poll ());
}
// Test block propagation from node 2
{
auto transaction (system.wallet (0)->wallets.tx_begin_write ());
system.wallet (0)->store.erase (transaction, nano::test_genesis_key.pub);
}
auto send2 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, send1->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 2 * nano::Gxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node1->work_generate_blocking (send1->hash ())));
node2->process_active (send2);
node2->block_processor.flush ();
Expand All @@ -3410,7 +3436,6 @@ TEST (node, bidirectional_tcp)
ASSERT_NO_ERROR (system.poll ());
}
// Test block confirmation from node 2
system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
confirmed = false;
system.deadline_set (10s);
while (!confirmed)
Expand Down
31 changes: 27 additions & 4 deletions nano/core_test/peer_container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,37 @@ TEST (channels, fill_random_part)
TEST (peer_container, list_fanout)
{
nano::system system (1);
auto list1 (system.nodes[0]->network.list_fanout ());
auto & node (*system.nodes[0]);
ASSERT_EQ (0, node.network.size ());
ASSERT_EQ (0.0, node.network.size_sqrt ());
ASSERT_EQ (0, node.network.fanout ());
auto list1 (node.network.list (node.network.fanout ()));
ASSERT_TRUE (list1.empty ());
auto add_peer = [&node](const uint16_t port_a) {
ASSERT_NE (nullptr, node.network.udp_channels.insert (nano::endpoint (boost::asio::ip::address_v6::loopback (), port_a), node.network_params.protocol.protocol_version));
};
add_peer (9998);
ASSERT_EQ (1, node.network.size ());
ASSERT_EQ (1.0, node.network.size_sqrt ());
ASSERT_EQ (1, node.network.fanout ());
auto list2 (node.network.list (node.network.fanout ()));
ASSERT_EQ (1, list2.size ());
add_peer (9999);
ASSERT_EQ (2, node.network.size ());
ASSERT_EQ (std::sqrt (2.F), node.network.size_sqrt ());
ASSERT_EQ (2, node.network.fanout ());
auto list3 (node.network.list (node.network.fanout ()));
ASSERT_EQ (2, list3.size ());
for (auto i (0); i < 1000; ++i)
{
ASSERT_NE (nullptr, system.nodes[0]->network.udp_channels.insert (nano::endpoint (boost::asio::ip::address_v6::loopback (), 10000 + i), system.nodes[0]->network_params.protocol.protocol_version));
add_peer (10000 + i);
}
auto list2 (system.nodes[0]->network.list_fanout ());
ASSERT_EQ (32, list2.size ());
ASSERT_EQ (1002, node.network.size ());
ASSERT_EQ (std::sqrt (1002.f), node.network.size_sqrt ());
size_t expected_size (std::ceil (std::sqrt (1002.f)));
ASSERT_EQ (expected_size, node.network.fanout ());
auto list4 (node.network.list (node.network.fanout ()));
ASSERT_EQ (expected_size, list4.size ());
}

// Test to make sure we don't repeatedly send keepalive messages to nodes that aren't responding
Expand Down
4 changes: 2 additions & 2 deletions nano/node/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -593,9 +593,9 @@ nano::vote_code nano::active_transactions::vote (std::shared_ptr<nano::vote> vot
}
if (at_least_one)
{
if (processed)
if (processed && !node.wallets.rep_counts ().have_half_rep ())
{
node.network.flood_vote (vote_a);
node.network.flood_vote (vote_a, 0.5f);
}
return replay ? nano::vote_code::replay : nano::vote_code::vote;
}
Expand Down
51 changes: 42 additions & 9 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,27 @@ bool nano::network::send_votes_cache (std::shared_ptr<nano::transport::channel>

void nano::network::flood_message (nano::message const & message_a, bool const is_droppable_a)
{
auto list (list_fanout ());
for (auto i (list.begin ()), n (list.end ()); i != n; ++i)
for (auto & i : list (fanout ()))
{
(*i)->send (message_a, nullptr, is_droppable_a);
i->send (message_a, nullptr, is_droppable_a);
}
}

void nano::network::flood_vote (std::shared_ptr<nano::vote> vote_a, float scale)
{
nano::confirm_ack message (vote_a);
for (auto & i : list_non_pr (fanout (scale)))
{
i->send (message, nullptr);
}
}

void nano::network::flood_vote_pr (std::shared_ptr<nano::vote> vote_a)
guilhermelawless marked this conversation as resolved.
Show resolved Hide resolved
{
nano::confirm_ack message (vote_a);
for (auto const & i : node.rep_crawler.principal_representatives ())
{
i.channel->send (message, nullptr, false);
}
}

Expand Down Expand Up @@ -264,7 +281,7 @@ void nano::network::broadcast_confirm_req (std::shared_ptr<nano::block> block_a)
if (list->empty () || node.rep_crawler.total_weight () < node.config.online_weight_minimum.number ())
{
// broadcast request to all peers (with max limit 2 * sqrt (peers count))
auto peers (node.network.list (std::min (static_cast<size_t> (100), 2 * node.network.size_sqrt ())));
auto peers (node.network.list (std::min<size_t> (100, node.network.fanout (2.F))));
guilhermelawless marked this conversation as resolved.
Show resolved Hide resolved
list->clear ();
list->insert (list->end (), peers.begin (), peers.end ());
}
Expand Down Expand Up @@ -583,13 +600,29 @@ std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list (size_
return result;
}

// Simulating with sqrt_broadcast_simulate shows we only need to broadcast to sqrt(total_peers) random peers in order to successfully publish to everyone with high probability
std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list_fanout ()
std::deque<std::shared_ptr<nano::transport::channel>> nano::network::list_non_pr (size_t count_a)
{
auto result (list (size_sqrt ()));
std::deque<std::shared_ptr<nano::transport::channel>> result;
tcp_channels.list (result);
udp_channels.list (result);
nano::random_pool_shuffle (result.begin (), result.end ());
result.erase (std::remove_if (result.begin (), result.end (), [this](std::shared_ptr<nano::transport::channel> const & channel) {
return this->node.rep_crawler.is_pr (*channel);
}),
result.end ());
if (result.size () > count_a)
{
result.resize (count_a, nullptr);
}
return result;
}

// Simulating with sqrt_broadcast_simulate shows we only need to broadcast to sqrt(total_peers) random peers in order to successfully publish to everyone with high probability
size_t nano::network::fanout (float scale) const
{
return static_cast<size_t> (std::ceil (scale * size_sqrt ()));
}

std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::network::random_set (size_t count_a) const
{
std::unordered_set<std::shared_ptr<nano::transport::channel>> result (tcp_channels.random_set (count_a));
Expand Down Expand Up @@ -713,9 +746,9 @@ size_t nano::network::size () const
return tcp_channels.size () + udp_channels.size ();
}

size_t nano::network::size_sqrt () const
float nano::network::size_sqrt () const
{
return (static_cast<size_t> (std::ceil (std::sqrt (size ()))));
return static_cast<float> (std::sqrt (size ()));
}

bool nano::network::empty () const
Expand Down
14 changes: 6 additions & 8 deletions nano/node/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,8 @@ class network final
random_fill (message.peers);
flood_message (message);
}
void flood_vote (std::shared_ptr<nano::vote> vote_a)
{
nano::confirm_ack message (vote_a);
flood_message (message);
}
void flood_vote (std::shared_ptr<nano::vote>, float scale);
void flood_vote_pr (std::shared_ptr<nano::vote>);
void flood_block (std::shared_ptr<nano::block> block_a, bool const is_droppable_a = true)
{
nano::publish publish (block_a);
Expand All @@ -136,8 +133,9 @@ class network final
// Should we reach out to this endpoint with a keepalive message
bool reachout (nano::endpoint const &, bool = false);
std::deque<std::shared_ptr<nano::transport::channel>> list (size_t);
// A list of random peers sized for the configured rebroadcast fanout
std::deque<std::shared_ptr<nano::transport::channel>> list_fanout ();
std::deque<std::shared_ptr<nano::transport::channel>> list_non_pr (size_t);
// Desired fanout for a given scale
size_t fanout (float scale = 1.0f) const;
void random_fill (std::array<nano::endpoint, 8> &) const;
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (size_t) const;
// Get the next peer for attempting a tcp bootstrap connection
Expand All @@ -150,7 +148,7 @@ class network final
void ongoing_syn_cookie_cleanup ();
void ongoing_keepalive ();
size_t size () const;
size_t size_sqrt () const;
float size_sqrt () const;
bool empty () const;
nano::message_buffer_manager buffer_container;
boost::asio::ip::udp::resolver resolver;
Expand Down
27 changes: 27 additions & 0 deletions nano/node/repcrawler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,18 @@ void nano::rep_crawler::query (std::shared_ptr<nano::transport::channel> & chann
query (peers);
}

bool nano::rep_crawler::is_pr (nano::transport::channel const & channel_a) const
{
nano::lock_guard<std::mutex> lock (probable_reps_mutex);
auto existing = probable_reps.get<tag_channel_ref> ().find (channel_a);
bool result = false;
if (existing != probable_reps.get<tag_channel_ref> ().end ())
{
result = existing->weight > node.minimum_principal_weight ();
}
return result;
}

void nano::rep_crawler::response (std::shared_ptr<nano::transport::channel> & channel_a, std::shared_ptr<nano::vote> & vote_a)
{
nano::lock_guard<std::mutex> lock (active_mutex);
Expand Down Expand Up @@ -302,6 +314,21 @@ std::vector<nano::representative> nano::rep_crawler::representatives (size_t cou
return result;
}

std::vector<nano::representative> nano::rep_crawler::principal_representatives (size_t count_a)
{
std::vector<representative> result;
auto minimum = node.minimum_principal_weight ();
nano::lock_guard<std::mutex> lock (probable_reps_mutex);
for (auto i (probable_reps.get<tag_weight> ().begin ()), n (probable_reps.get<tag_weight> ().end ()); i != n && result.size () < count_a; ++i)
{
if (i->weight > minimum)
{
result.push_back (*i);
}
}
return result;
}

std::vector<std::shared_ptr<nano::transport::channel>> nano::rep_crawler::representative_endpoints (size_t count_a)
{
std::vector<std::shared_ptr<nano::transport::channel>> result;
Expand Down
6 changes: 6 additions & 0 deletions nano/node/repcrawler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ class rep_crawler final
/** Attempt to determine if the peer manages one or more representative accounts */
void query (std::shared_ptr<nano::transport::channel> & channel_a);

/** Query if a peer manages a principle representative */
bool is_pr (nano::transport::channel const &) const;

/**
* Called when a non-replay vote on a block previously sent by query() is received. This indiciates
* with high probability that the endpoint is a representative node.
Expand All @@ -104,6 +107,9 @@ class rep_crawler final
/** Request a list of the top \p count_a known representatives in descending order of weight, optionally with a minimum version \p opt_version_min_a */
std::vector<representative> representatives (size_t count_a = std::numeric_limits<size_t>::max (), boost::optional<decltype (nano::protocol_constants::protocol_version_min)> const & opt_version_min_a = boost::none);

/** Request a list of the top \p count_a known principal representatives in descending order of weight. */
std::vector<representative> principal_representatives (size_t count_a = std::numeric_limits<size_t>::max ());

/** Request a list of the top \p count_a known representative endpoints. */
std::vector<std::shared_ptr<nano::transport::channel>> representative_endpoints (size_t count_a);

Expand Down
4 changes: 3 additions & 1 deletion nano/node/voting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ void nano::vote_generator::send (nano::unique_lock<std::mutex> & lock_a)
auto transaction (store.tx_begin_read ());
wallets.foreach_representative ([this, &hashes_l, &transaction](nano::public_key const & pub_a, nano::raw_key const & prv_a) {
auto vote (this->store.vote_generate (transaction, pub_a, prv_a, hashes_l));
this->vote_processor.vote (vote, std::make_shared<nano::transport::channel_udp> (this->network.udp_channels, this->network.endpoint (), this->network_params.protocol.protocol_version));
this->votes_cache.add (vote);
this->network.flood_vote_pr (vote);
this->network.flood_vote (vote, 2.0f);
this->vote_processor.vote (vote, std::make_shared<nano::transport::channel_udp> (this->network.udp_channels, this->network.endpoint (), this->network_params.protocol.protocol_version));
});
}
lock_a.lock ();
Expand Down
4 changes: 4 additions & 0 deletions nano/node/wallet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ class wallet_representative_counts
public:
uint64_t voting{ 0 }; // Representatives with at least the configured minimum voting weight
uint64_t half_principal{ 0 }; // Representatives with at least 50% of principal representative requirements
bool have_half_rep () const
guilhermelawless marked this conversation as resolved.
Show resolved Hide resolved
{
return half_principal > 0;
}
};

/**
Expand Down