Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow rep crawler targets with ephemeral ports #2527

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 34 additions & 11 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3428,16 +3428,15 @@ TEST (node, dont_write_lock_node)
finished_promise.set_value ();
}

// Test is unstable on github actions for windows, disable if CI detected
#if (defined(_WIN32) && CI)
TEST (node, DISABLED_bidirectional_tcp)
#else
TEST (node, bidirectional_tcp)
#endif
{
nano::system system;
nano::node_flags node_flags;
node_flags.disable_udp = true; // Disable UDP connections
// Disable bootstrap to start elections for new blocks
node_flags.disable_legacy_bootstrap = true;
node_flags.disable_lazy_bootstrap = true;
node_flags.disable_wallet_bootstrap = true;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto node1 = system.add_node (node_config, node_flags);
Expand Down Expand Up @@ -3466,17 +3465,32 @@ TEST (node, bidirectional_tcp)
{
ASSERT_NO_ERROR (system.poll ());
}
// Test block confirmation from node 1
// Test block confirmation from node 1 (add representative to node 1)
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
// Wait to find new reresentative
system.deadline_set (10s);
while (node2->rep_crawler.representative_count () == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
/* Wait for confirmation
To check connection we need only node 2 confirmation status
Node 1 election can be unconfirmed because representative private key was inserted after election start (and node 2 isn't flooding new votes to principal representatives) */
bool confirmed (false);
system.deadline_set (10s);
while (!confirmed)
{
auto transaction1 (node1->store.tx_begin_read ());
auto transaction2 (node2->store.tx_begin_read ());
confirmed = node1->ledger.block_confirmed (transaction1, send1->hash ()) && node2->ledger.block_confirmed (transaction2, send1->hash ());
confirmed = node2->ledger.block_confirmed (transaction2, send1->hash ());
ASSERT_NO_ERROR (system.poll ());
}
// Test block propagation & confirmation from node 2 (remove representative from node 1)
{
auto transaction (system.wallet (0)->wallets.tx_begin_write ());
system.wallet (0)->store.erase (transaction, nano::test_genesis_key.pub);
}
/* Test block propagation from node 2
Node 2 has only ephemeral TCP port open. Node 1 cannot establish connection to node 2 listening port */
auto send2 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, send1->hash (), nano::test_genesis_key.pub, nano::genesis_amount - 2 * nano::Gxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *node1->work_generate_blocking (send1->hash ())));
node2->process_active (send2);
node2->block_processor.flush ();
Expand All @@ -3485,14 +3499,23 @@ TEST (node, bidirectional_tcp)
{
ASSERT_NO_ERROR (system.poll ());
}
// Test block confirmation from node 2
// Test block confirmation from node 2 (add representative to node 2)
system.wallet (1)->insert_adhoc (nano::test_genesis_key.prv);
// Wait to find changed reresentative
system.deadline_set (10s);
while (node1->rep_crawler.representative_count () == 0)
{
ASSERT_NO_ERROR (system.poll ());
}
/* Wait for confirmation
To check connection we need only node 1 confirmation status
Node 2 election can be unconfirmed because representative private key was inserted after election start (and node 1 isn't flooding new votes to principal representatives) */
confirmed = false;
system.deadline_set (20s);
while (!confirmed)
{
auto transaction1 (node1->store.tx_begin_read ());
auto transaction2 (node2->store.tx_begin_read ());
confirmed = node1->ledger.block_confirmed (transaction1, send2->hash ()) && node2->ledger.block_confirmed (transaction2, send2->hash ());
confirmed = node1->ledger.block_confirmed (transaction1, send2->hash ());
ASSERT_NO_ERROR (system.poll ());
}
}
Expand Down
2 changes: 1 addition & 1 deletion nano/node/bootstrap/bootstrap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ nano::bootstrap_server::~bootstrap_server ()
auto exisiting_response_channel (node->network.tcp_channels.find_channel (remote_endpoint));
if (exisiting_response_channel != nullptr)
{
exisiting_response_channel->server = false;
exisiting_response_channel->temporary = false;
node->network.tcp_channels.erase (remote_endpoint);
}
}
Expand Down
6 changes: 3 additions & 3 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -654,9 +654,9 @@ size_t nano::network::fanout (float scale) const
return static_cast<size_t> (std::ceil (scale * size_sqrt ()));
}

std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::network::random_set (size_t count_a, uint8_t min_version_a) const
std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::network::random_set (size_t count_a, uint8_t min_version_a, bool include_temporary_channels_a) const
{
std::unordered_set<std::shared_ptr<nano::transport::channel>> result (tcp_channels.random_set (count_a, min_version_a));
std::unordered_set<std::shared_ptr<nano::transport::channel>> result (tcp_channels.random_set (count_a, min_version_a, include_temporary_channels_a));
std::unordered_set<std::shared_ptr<nano::transport::channel>> udp_random (udp_channels.random_set (count_a, min_version_a));
for (auto i (udp_random.begin ()), n (udp_random.end ()); i != n && result.size () < count_a * 1.5; ++i)
{
Expand All @@ -671,7 +671,7 @@ std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::network::ran

void nano::network::random_fill (std::array<nano::endpoint, 8> & target_a) const
{
auto peers (random_set (target_a.size ()));
auto peers (random_set (target_a.size (), 0, false)); // Don't include channels with ephemeral remote ports
assert (peers.size () <= target_a.size ());
auto endpoint (nano::endpoint (boost::asio::ip::address_v6{}, 0));
assert (endpoint.address ().is_v6 ());
Expand Down
2 changes: 1 addition & 1 deletion nano/node/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class network final
// Desired fanout for a given scale
size_t fanout (float scale = 1.0f) const;
void random_fill (std::array<nano::endpoint, 8> &) const;
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (size_t, uint8_t = 0) const;
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (size_t, uint8_t = 0, bool = false) const;
// Get the next peer for attempting a tcp bootstrap connection
nano::tcp_endpoint bootstrap_peer (bool = false);
nano::endpoint endpoint ();
Expand Down
2 changes: 1 addition & 1 deletion nano/node/repcrawler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ std::vector<std::shared_ptr<nano::transport::channel>> nano::rep_crawler::get_cr
required_peer_count += required_peer_count / 2;

// The rest of the endpoints are picked randomly
auto random_peers (node.network.random_set (required_peer_count));
auto random_peers (node.network.random_set (required_peer_count, 0, true)); // Include channels with ephemeral remote ports
std::vector<std::shared_ptr<nano::transport::channel>> result;
result.insert (result.end (), random_peers.begin (), random_peers.end ());
return result;
Expand Down
2 changes: 1 addition & 1 deletion nano/node/telemetry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void nano::telemetry::add (nano::telemetry_data const & telemetry_data_a, nano::
void nano::telemetry::get_metrics_random_peers_async (std::function<void(telemetry_data_responses const &)> const & callback_a)
{
// These peers will only be used if there isn't an already ongoing batch telemetry request round
auto random_peers = network.random_set (network.size_sqrt (), network_params.protocol.telemetry_protocol_version_min);
auto random_peers = network.random_set (network.size_sqrt (), network_params.protocol.telemetry_protocol_version_min, true);
nano::lock_guard<std::mutex> guard (mutex);
if (!stopped && !random_peers.empty ())
{
Expand Down
12 changes: 6 additions & 6 deletions nano/node/transport/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ nano::transport::channel_tcp::~channel_tcp ()
// Close socket. Exception: socket is used by bootstrap_server
if (auto socket_l = socket.lock ())
{
if (!server)
if (!temporary)
{
socket_l->close ();
}
Expand Down Expand Up @@ -111,7 +111,7 @@ bool nano::transport::tcp_channels::insert (std::shared_ptr<nano::transport::cha
if (existing == channels.get<endpoint_tag> ().end ())
{
auto node_id (channel_a->get_node_id ());
if (!channel_a->server)
if (!channel_a->temporary)
{
channels.get<node_id_tag> ().erase (node_id);
}
Expand Down Expand Up @@ -152,7 +152,7 @@ std::shared_ptr<nano::transport::channel_tcp> nano::transport::tcp_channels::fin
return result;
}

std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::transport::tcp_channels::random_set (size_t count_a, uint8_t min_version) const
std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::transport::tcp_channels::random_set (size_t count_a, uint8_t min_version, bool include_temporary_channels_a) const
{
std::unordered_set<std::shared_ptr<nano::transport::channel>> result;
result.reserve (count_a);
Expand All @@ -169,7 +169,7 @@ std::unordered_set<std::shared_ptr<nano::transport::channel>> nano::transport::t
auto index (nano::random_pool::generate_word32 (0, static_cast<CryptoPP::word32> (peers_size - 1)));

auto channel = channels.get<random_access_tag> ()[index].channel;
if (channel->get_network_version () >= min_version && !channel->server)
if (channel->get_network_version () >= min_version && (include_temporary_channels_a || !channel->temporary))
{
result.insert (channel);
}
Expand Down Expand Up @@ -284,7 +284,7 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa
temporary_channel->set_network_version (message_a.header.version_using);
temporary_channel->set_last_packet_received (std::chrono::steady_clock::now ());
temporary_channel->set_last_packet_sent (std::chrono::steady_clock::now ());
temporary_channel->server = true;
temporary_channel->temporary = true;
assert (type_a == nano::bootstrap_server_type::realtime || type_a == nano::bootstrap_server_type::realtime_response_server);
// Don't insert temporary channels for response_server
if (type_a == nano::bootstrap_server_type::realtime)
Expand Down Expand Up @@ -603,7 +603,7 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<n
auto existing_channel (node_l->network.tcp_channels.find_node_id (node_id));
if (existing_channel)
{
process = existing_channel->server;
process = existing_channel->temporary;
}
}
if (process)
Expand Down
6 changes: 4 additions & 2 deletions nano/node/transport/tcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ namespace transport
}
std::weak_ptr<nano::socket> socket;
std::weak_ptr<nano::bootstrap_server> response_server;
bool server{ false };
/* Mark for temporary channels. Usually remote ports of these channels are ephemeral and received from incoming connections to server.
If remote part has open listening port, temporary channel will be replaced with direct connection to listening port soon. But if other side is behing NAT or firewall this connection can be pemanent. */
std::atomic<bool> temporary{ false };

nano::endpoint get_endpoint () const override
{
Expand Down Expand Up @@ -84,7 +86,7 @@ namespace transport
size_t size () const;
std::shared_ptr<nano::transport::channel_tcp> find_channel (nano::tcp_endpoint const &) const;
void random_fill (std::array<nano::endpoint, 8> &) const;
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (size_t, uint8_t = 0) const;
std::unordered_set<std::shared_ptr<nano::transport::channel>> random_set (size_t, uint8_t = 0, bool = false) const;
bool store_all (bool = true);
std::shared_ptr<nano::transport::channel_tcp> find_node_id (nano::account const &);
// Get the next peer for attempting a tcp connection
Expand Down