Skip to content

Commit

Permalink
Use attempts list for TCP channels (#2581)
Browse files Browse the repository at this point in the history
* Use attempts list for TCP channels to prevent multiple concurrent connections start to same peer
* Erase from attempts list after realtime TCP connection failure (udp fallback function) or success (insert function)
* Explicitly close sockets after realtime TCP connection start failure
* Use tags for TCP & UDP attempts lists
* New test for max attempts
* Limit max peers per IP for live & beta networks to 5
* Debug assert if there is limit overflow in tests
* And special flag to allow using more connections
  • Loading branch information
SergiySW committed Mar 10, 2020
1 parent b23d7c3 commit c42709d
Show file tree
Hide file tree
Showing 13 changed files with 79 additions and 17 deletions.
19 changes: 19 additions & 0 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -927,6 +927,25 @@ TEST (network, replace_port)
node1->stop ();
}

TEST (network, peer_max_tcp_attempts)
{
nano::system system (1);
auto node (system.nodes[0]);
// Add nodes that can accept TCP connection, but not node ID handshake
nano::node_flags node_flags;
node_flags.disable_tcp_realtime = true;
for (auto i (0); i < node->network_params.node.max_peers_per_ip; ++i)
{
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work, node_flags));
node2->start ();
system.nodes.push_back (node2);
// Start TCP attempt
node->network.merge_peer (node2->network.endpoint ());
}
ASSERT_EQ (0, node->network.size ());
ASSERT_TRUE (node->network.tcp_channels.reachout (nano::endpoint (node->network.endpoint ().address (), nano::get_available_port ())));
}

// The test must be completed in less than 1 second
TEST (bandwidth_limiter, validate)
{
Expand Down
8 changes: 7 additions & 1 deletion nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <numeric>

nano::network::network (nano::node & node_a, uint16_t port_a) :
syn_cookies (node_a.network_params.node.max_peers_per_ip),
buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receive buffer
resolver (node_a.io_ctx),
limiter (node_a.config.bandwidth_limit),
Expand Down Expand Up @@ -815,14 +816,19 @@ void nano::message_buffer_manager::stop ()
condition.notify_all ();
}

nano::syn_cookies::syn_cookies (size_t max_cookies_per_ip_a) :
max_cookies_per_ip (max_cookies_per_ip_a)
{
}

boost::optional<nano::uint256_union> nano::syn_cookies::assign (nano::endpoint const & endpoint_a)
{
auto ip_addr (endpoint_a.address ());
debug_assert (ip_addr.is_v6 ());
nano::lock_guard<std::mutex> lock (syn_cookie_mutex);
unsigned & ip_cookies = cookies_per_ip[ip_addr];
boost::optional<nano::uint256_union> result;
if (ip_cookies < nano::transport::max_peers_per_ip)
if (ip_cookies < max_cookies_per_ip)
{
if (cookies.find (endpoint_a) == cookies.end ())
{
Expand Down
2 changes: 2 additions & 0 deletions nano/node/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class message_buffer_manager final
class syn_cookies final
{
public:
syn_cookies (size_t);
void purge (std::chrono::steady_clock::time_point const &);
// Returns boost::none if the IP is rate capped on syn cookie requests,
// or if the endpoint already has a syn cookie query
Expand All @@ -90,6 +91,7 @@ class syn_cookies final
mutable std::mutex syn_cookie_mutex;
std::unordered_map<nano::endpoint, syn_cookie_info> cookies;
std::unordered_map<boost::asio::ip::address, unsigned> cookies_per_ip;
size_t max_cookies_per_ip;
};
class network final
{
Expand Down
1 change: 1 addition & 0 deletions nano/node/nodeconfig.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class node_flags final
bool disable_block_processor_republishing{ false };
bool disable_ongoing_telemetry_requests{ false };
bool allow_bootstrap_peers_duplicates{ false };
bool disable_max_peers_per_ip{ false }; // For testing only
bool fast_bootstrap{ false };
bool read_only{ false };
nano::confirmation_height_mode confirmation_height_processor_mode{ nano::confirmation_height_mode::automatic };
Expand Down
1 change: 1 addition & 0 deletions nano/node/testing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ std::shared_ptr<nano::node> nano::system::add_node (nano::node_config const & no
nodes.push_back (node);
if (nodes.size () > 1)
{
debug_assert (nodes.size () - 1 <= node->network_params.node.max_peers_per_ip); // Check that we don't start more nodes than limit for single IP address
auto begin = nodes.end () - 2;
for (auto i (begin), j (begin + 1), n (nodes.end ()); j != n; ++i, ++j)
{
Expand Down
23 changes: 19 additions & 4 deletions nano/node/transport/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ bool nano::transport::tcp_channels::insert (std::shared_ptr<nano::transport::cha
channels.get<node_id_tag> ().erase (node_id);
}
channels.get<endpoint_tag> ().emplace (channel_a, socket_a, bootstrap_server_a);
attempts.get<endpoint_tag> ().erase (endpoint);
error = false;
lock.unlock ();
node.network.channel_observer (channel_a);
Expand Down Expand Up @@ -349,8 +350,16 @@ void nano::transport::tcp_channels::stop ()

bool nano::transport::tcp_channels::max_ip_connections (nano::tcp_endpoint const & endpoint_a)
{
nano::unique_lock<std::mutex> lock (mutex);
bool result (channels.get<ip_address_tag> ().count (endpoint_a.address ()) >= nano::transport::max_peers_per_ip);
bool result (false);
if (!node.flags.disable_max_peers_per_ip)
{
nano::unique_lock<std::mutex> lock (mutex);
result = channels.get<ip_address_tag> ().count (endpoint_a.address ()) >= node.network_params.node.max_peers_per_ip;
if (!result)
{
result = attempts.get<ip_address_tag> ().count (endpoint_a.address ()) >= node.network_params.node.max_peers_per_ip;
}
}
return result;
}

Expand Down Expand Up @@ -396,8 +405,8 @@ void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point
auto disconnect_cutoff (channels.get<last_packet_sent_tag> ().lower_bound (cutoff_a));
channels.get<last_packet_sent_tag> ().erase (channels.get<last_packet_sent_tag> ().begin (), disconnect_cutoff);
// Remove keepalive attempt tracking for attempts older than cutoff
auto attempts_cutoff (attempts.get<1> ().lower_bound (cutoff_a));
attempts.get<1> ().erase (attempts.get<1> ().begin (), attempts_cutoff);
auto attempts_cutoff (attempts.get<last_attempt_tag> ().lower_bound (cutoff_a));
attempts.get<last_attempt_tag> ().erase (attempts.get<last_attempt_tag> ().begin (), attempts_cutoff);

// Cleanup any sockets which may still be existing from failed node id handshakes
node_id_handshake_sockets.erase (std::remove_if (node_id_handshake_sockets.begin (), node_id_handshake_sockets.end (), [this](auto socket) {
Expand Down Expand Up @@ -545,6 +554,7 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a
if (auto socket_l = channel->socket.lock ())
{
node_l->network.tcp_channels.remove_node_id_handshake_socket (socket_l);
socket_l->close ();
}
if (node_l->config.logging.network_node_id_handshake_logging ())
{
Expand Down Expand Up @@ -576,6 +586,7 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<n
if (auto socket_l = socket_w.lock ())
{
node_l->network.tcp_channels.remove_node_id_handshake_socket (socket_l);
socket_l->close ();
}
}
};
Expand Down Expand Up @@ -677,6 +688,10 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<n

void nano::transport::tcp_channels::udp_fallback (nano::endpoint const & endpoint_a, std::function<void(std::shared_ptr<nano::transport::channel>)> const & callback_a)
{
{
nano::lock_guard<std::mutex> lock (mutex);
attempts.get<endpoint_tag> ().erase (nano::transport::map_endpoint_to_tcp (endpoint_a));
}
if (callback_a && !node.flags.disable_udp)
{
auto channel_udp (node.network.udp_channels.create (endpoint_a));
Expand Down
13 changes: 10 additions & 3 deletions nano/node/transport/tcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ namespace transport
class last_bootstrap_attempt_tag
{
};
class last_attempt_tag
{
};
class node_id_tag
{
};
Expand Down Expand Up @@ -171,10 +174,12 @@ namespace transport
{
public:
nano::tcp_endpoint endpoint;
boost::asio::ip::address address;
std::chrono::steady_clock::time_point last_attempt{ std::chrono::steady_clock::now () };

explicit tcp_endpoint_attempt (nano::tcp_endpoint const & endpoint_a) :
endpoint (endpoint_a)
endpoint (endpoint_a),
address (endpoint_a.address ())
{
}
};
Expand All @@ -196,9 +201,11 @@ namespace transport
channels;
boost::multi_index_container<tcp_endpoint_attempt,
mi::indexed_by<
mi::hashed_unique<
mi::hashed_unique<mi::tag<endpoint_tag>,
mi::member<tcp_endpoint_attempt, nano::tcp_endpoint, &tcp_endpoint_attempt::endpoint>>,
mi::ordered_non_unique<
mi::hashed_non_unique<mi::tag<ip_address_tag>,
mi::member<tcp_endpoint_attempt, boost::asio::ip::address, &tcp_endpoint_attempt::address>>,
mi::ordered_non_unique<mi::tag<last_attempt_tag>,
mi::member<tcp_endpoint_attempt, std::chrono::steady_clock::time_point, &tcp_endpoint_attempt::last_attempt>>>>
attempts;
// clang-format on
Expand Down
2 changes: 0 additions & 2 deletions nano/node/transport/transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ namespace transport
nano::tcp_endpoint map_endpoint_to_tcp (nano::endpoint const &);
// Unassigned, reserved, self
bool reserved_address (nano::endpoint const &, bool = false);
// Maximum number of peers per IP
static size_t constexpr max_peers_per_ip = 10;
static std::chrono::seconds constexpr syn_cookie_cutoff = std::chrono::seconds (5);
enum class transport_type : uint8_t
{
Expand Down
13 changes: 9 additions & 4 deletions nano/node/transport/udp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ std::shared_ptr<nano::transport::channel_udp> nano::transport::udp_channels::ins
{
result = std::make_shared<nano::transport::channel_udp> (*this, endpoint_a, network_version_a);
channels.get<endpoint_tag> ().insert (result);
attempts.get<endpoint_tag> ().erase (endpoint_a);
lock.unlock ();
node.network.channel_observer (result);
}
Expand Down Expand Up @@ -633,8 +634,12 @@ std::shared_ptr<nano::transport::channel> nano::transport::udp_channels::create

bool nano::transport::udp_channels::max_ip_connections (nano::endpoint const & endpoint_a)
{
nano::unique_lock<std::mutex> lock (mutex);
bool result (channels.get<ip_address_tag> ().count (endpoint_a.address ()) >= nano::transport::max_peers_per_ip);
bool result (false);
if (!node.flags.disable_max_peers_per_ip)
{
nano::unique_lock<std::mutex> lock (mutex);
result = channels.get<ip_address_tag> ().count (endpoint_a.address ()) >= node.network_params.node.max_peers_per_ip;
}
return result;
}

Expand Down Expand Up @@ -677,8 +682,8 @@ void nano::transport::udp_channels::purge (std::chrono::steady_clock::time_point
auto disconnect_cutoff (channels.get<last_packet_received_tag> ().lower_bound (cutoff_a));
channels.get<last_packet_received_tag> ().erase (channels.get<last_packet_received_tag> ().begin (), disconnect_cutoff);
// Remove keepalive attempt tracking for attempts older than cutoff
auto attempts_cutoff (attempts.get<1> ().lower_bound (cutoff_a));
attempts.get<1> ().erase (attempts.get<1> ().begin (), attempts_cutoff);
auto attempts_cutoff (attempts.get<last_attempt_tag> ().lower_bound (cutoff_a));
attempts.get<last_attempt_tag> ().erase (attempts.get<last_attempt_tag> ().begin (), attempts_cutoff);
}

void nano::transport::udp_channels::ongoing_keepalive ()
Expand Down
7 changes: 5 additions & 2 deletions nano/node/transport/udp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ namespace transport
class last_bootstrap_attempt_tag
{
};
class last_attempt_tag
{
};
class node_id_tag
{
};
Expand Down Expand Up @@ -191,9 +194,9 @@ namespace transport
boost::multi_index_container<
endpoint_attempt,
mi::indexed_by<
mi::hashed_unique<
mi::hashed_unique<mi::tag<endpoint_tag>,
mi::member<endpoint_attempt, nano::endpoint, &endpoint_attempt::endpoint>>,
mi::ordered_non_unique<
mi::ordered_non_unique<mi::tag<last_attempt_tag>,
mi::member<endpoint_attempt, std::chrono::steady_clock::time_point, &endpoint_attempt::last_attempt>>>>
attempts;
// clang-format on
Expand Down
1 change: 1 addition & 0 deletions nano/secure/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ nano::node_constants::node_constants (nano::network_constants & network_constant
peer_interval = search_pending_interval;
unchecked_cleaning_interval = std::chrono::minutes (30);
process_confirmed_interval = network_constants.is_test_network () ? std::chrono::milliseconds (50) : std::chrono::milliseconds (500);
max_peers_per_ip = network_constants.is_test_network () ? 10 : 5;
max_weight_samples = network_constants.is_live_network () ? 4032 : 864;
weight_period = 5 * 60; // 5 minutes
}
Expand Down
2 changes: 2 additions & 0 deletions nano/secure/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,8 @@ class node_constants
std::chrono::seconds peer_interval;
std::chrono::minutes unchecked_cleaning_interval;
std::chrono::milliseconds process_confirmed_interval;
/** Maximum number of peers per IP */
size_t max_peers_per_ip;

/** The maximum amount of samples for a 2 week period on live or 3 days on beta */
uint64_t max_weight_samples;
Expand Down
4 changes: 3 additions & 1 deletion nano/slow_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ TEST (store, load)
// ulimit -n increasing may be required
TEST (node, fork_storm)
{
nano::system system (64);
nano::node_flags flags;
flags.disable_max_peers_per_ip = true;
nano::system system (64, nano::transport::transport_type::tcp, flags);
system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv);
auto previous (system.nodes[0]->latest (nano::test_genesis_key.pub));
auto balance (system.nodes[0]->balance (nano::test_genesis_key.pub));
Expand Down

0 comments on commit c42709d

Please sign in to comment.