Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use attempts list for TCP channels #2581

Merged
merged 8 commits into from
Mar 10, 2020
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions nano/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -916,6 +916,25 @@ TEST (network, replace_port)
node1->stop ();
}

TEST (network, peer_max_tcp_attempts)
{
nano::system system (1);
auto node (system.nodes[0]);
// Add nodes that can accept TCP connection, but not node ID handshake
nano::node_flags node_flags;
node_flags.disable_tcp_realtime = true;
for (auto i (0); i < node->network_params.node.max_peers_per_ip; ++i)
{
auto node2 (std::make_shared<nano::node> (system.io_ctx, nano::get_available_port (), nano::unique_path (), system.alarm, system.logging, system.work, node_flags));
node2->start ();
system.nodes.push_back (node2);
// Start TCP attempt
node->network.merge_peer (node2->network.endpoint ());
}
ASSERT_EQ (0, node->network.size ());
ASSERT_TRUE (node->network.tcp_channels.reachout (nano::endpoint (node->network.endpoint ().address (), nano::get_available_port ())));
}

// The test must be completed in less than 1 second
TEST (bandwidth_limiter, validate)
{
Expand Down
8 changes: 7 additions & 1 deletion nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <numeric>

nano::network::network (nano::node & node_a, uint16_t port_a) :
syn_cookies (node_a.network_params.node.max_peers_per_ip),
buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receive buffer
resolver (node_a.io_ctx),
limiter (node_a.config.bandwidth_limit),
Expand Down Expand Up @@ -793,14 +794,19 @@ void nano::message_buffer_manager::stop ()
condition.notify_all ();
}

nano::syn_cookies::syn_cookies (size_t max_cookies_per_ip_a) :
max_cookies_per_ip (max_cookies_per_ip_a)
{
}

boost::optional<nano::uint256_union> nano::syn_cookies::assign (nano::endpoint const & endpoint_a)
{
auto ip_addr (endpoint_a.address ());
debug_assert (ip_addr.is_v6 ());
nano::lock_guard<std::mutex> lock (syn_cookie_mutex);
unsigned & ip_cookies = cookies_per_ip[ip_addr];
boost::optional<nano::uint256_union> result;
if (ip_cookies < nano::transport::max_peers_per_ip)
if (ip_cookies < max_cookies_per_ip)
{
if (cookies.find (endpoint_a) == cookies.end ())
{
Expand Down
2 changes: 2 additions & 0 deletions nano/node/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class message_buffer_manager final
class syn_cookies final
{
public:
syn_cookies (size_t);
void purge (std::chrono::steady_clock::time_point const &);
// Returns boost::none if the IP is rate capped on syn cookie requests,
// or if the endpoint already has a syn cookie query
Expand All @@ -90,6 +91,7 @@ class syn_cookies final
mutable std::mutex syn_cookie_mutex;
std::unordered_map<nano::endpoint, syn_cookie_info> cookies;
std::unordered_map<boost::asio::ip::address, unsigned> cookies_per_ip;
size_t max_cookies_per_ip;
};
class network final
{
Expand Down
17 changes: 14 additions & 3 deletions nano/node/transport/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ bool nano::transport::tcp_channels::insert (std::shared_ptr<nano::transport::cha
channels.get<node_id_tag> ().erase (node_id);
}
channels.get<endpoint_tag> ().emplace (channel_a, socket_a, bootstrap_server_a);
attempts.get<endpoint_tag> ().erase (endpoint);
error = false;
lock.unlock ();
node.network.channel_observer (channel_a);
Expand Down Expand Up @@ -350,7 +351,11 @@ void nano::transport::tcp_channels::stop ()
bool nano::transport::tcp_channels::max_ip_connections (nano::tcp_endpoint const & endpoint_a)
{
nano::unique_lock<std::mutex> lock (mutex);
bool result (channels.get<ip_address_tag> ().count (endpoint_a.address ()) >= nano::transport::max_peers_per_ip);
bool result (channels.get<ip_address_tag> ().count (endpoint_a.address ()) >= node.network_params.node.max_peers_per_ip);
if (!result)
{
result = attempts.get<ip_address_tag> ().count (endpoint_a.address ()) >= node.network_params.node.max_peers_per_ip;
}
return result;
}

Expand Down Expand Up @@ -396,8 +401,8 @@ void nano::transport::tcp_channels::purge (std::chrono::steady_clock::time_point
auto disconnect_cutoff (channels.get<last_packet_sent_tag> ().lower_bound (cutoff_a));
channels.get<last_packet_sent_tag> ().erase (channels.get<last_packet_sent_tag> ().begin (), disconnect_cutoff);
// Remove keepalive attempt tracking for attempts older than cutoff
auto attempts_cutoff (attempts.get<1> ().lower_bound (cutoff_a));
attempts.get<1> ().erase (attempts.get<1> ().begin (), attempts_cutoff);
auto attempts_cutoff (attempts.get<last_attempt_tag> ().lower_bound (cutoff_a));
attempts.get<last_attempt_tag> ().erase (attempts.get<last_attempt_tag> ().begin (), attempts_cutoff);

// Cleanup any sockets which may still be existing from failed node id handshakes
node_id_handshake_sockets.erase (std::remove_if (node_id_handshake_sockets.begin (), node_id_handshake_sockets.end (), [this](auto socket) {
Expand Down Expand Up @@ -545,6 +550,7 @@ void nano::transport::tcp_channels::start_tcp (nano::endpoint const & endpoint_a
if (auto socket_l = channel->socket.lock ())
{
node_l->network.tcp_channels.remove_node_id_handshake_socket (socket_l);
socket_l->close ();
}
if (node_l->config.logging.network_node_id_handshake_logging ())
{
Expand Down Expand Up @@ -576,6 +582,7 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<n
if (auto socket_l = socket_w.lock ())
{
node_l->network.tcp_channels.remove_node_id_handshake_socket (socket_l);
socket_l->close ();
}
}
};
Expand Down Expand Up @@ -677,6 +684,10 @@ void nano::transport::tcp_channels::start_tcp_receive_node_id (std::shared_ptr<n

void nano::transport::tcp_channels::udp_fallback (nano::endpoint const & endpoint_a, std::function<void(std::shared_ptr<nano::transport::channel>)> const & callback_a)
{
{
nano::lock_guard<std::mutex> lock (mutex);
attempts.get<endpoint_tag> ().erase (nano::transport::map_endpoint_to_tcp (endpoint_a));
}
if (callback_a && !node.flags.disable_udp)
{
auto channel_udp (node.network.udp_channels.create (endpoint_a));
Expand Down
13 changes: 10 additions & 3 deletions nano/node/transport/tcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ namespace transport
class last_bootstrap_attempt_tag
{
};
class last_attempt_tag
{
};
class node_id_tag
{
};
Expand Down Expand Up @@ -171,10 +174,12 @@ namespace transport
{
public:
nano::tcp_endpoint endpoint;
boost::asio::ip::address address;
std::chrono::steady_clock::time_point last_attempt{ std::chrono::steady_clock::now () };

explicit tcp_endpoint_attempt (nano::tcp_endpoint const & endpoint_a) :
endpoint (endpoint_a)
endpoint (endpoint_a),
address (endpoint_a.address ())
{
}
};
Expand All @@ -196,9 +201,11 @@ namespace transport
channels;
boost::multi_index_container<tcp_endpoint_attempt,
mi::indexed_by<
mi::hashed_unique<
mi::hashed_unique<mi::tag<endpoint_tag>,
mi::member<tcp_endpoint_attempt, nano::tcp_endpoint, &tcp_endpoint_attempt::endpoint>>,
mi::ordered_non_unique<
mi::hashed_non_unique<mi::tag<ip_address_tag>,
mi::member<tcp_endpoint_attempt, boost::asio::ip::address, &tcp_endpoint_attempt::address>>,
mi::ordered_non_unique<mi::tag<last_attempt_tag>,
mi::member<tcp_endpoint_attempt, std::chrono::steady_clock::time_point, &tcp_endpoint_attempt::last_attempt>>>>
attempts;
// clang-format on
Expand Down
2 changes: 0 additions & 2 deletions nano/node/transport/transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ namespace transport
nano::tcp_endpoint map_endpoint_to_tcp (nano::endpoint const &);
// Unassigned, reserved, self
bool reserved_address (nano::endpoint const &, bool = false);
// Maximum number of peers per IP
static size_t constexpr max_peers_per_ip = 10;
static std::chrono::seconds constexpr syn_cookie_cutoff = std::chrono::seconds (5);
enum class transport_type : uint8_t
{
Expand Down
7 changes: 4 additions & 3 deletions nano/node/transport/udp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ std::shared_ptr<nano::transport::channel_udp> nano::transport::udp_channels::ins
{
result = std::make_shared<nano::transport::channel_udp> (*this, endpoint_a, network_version_a);
channels.get<endpoint_tag> ().insert (result);
attempts.get<endpoint_tag> ().erase (endpoint_a);
lock.unlock ();
node.network.channel_observer (result);
}
Expand Down Expand Up @@ -634,7 +635,7 @@ std::shared_ptr<nano::transport::channel> nano::transport::udp_channels::create
bool nano::transport::udp_channels::max_ip_connections (nano::endpoint const & endpoint_a)
{
nano::unique_lock<std::mutex> lock (mutex);
bool result (channels.get<ip_address_tag> ().count (endpoint_a.address ()) >= nano::transport::max_peers_per_ip);
bool result (channels.get<ip_address_tag> ().count (endpoint_a.address ()) >= node.network_params.node.max_peers_per_ip);
return result;
}

Expand Down Expand Up @@ -677,8 +678,8 @@ void nano::transport::udp_channels::purge (std::chrono::steady_clock::time_point
auto disconnect_cutoff (channels.get<last_packet_received_tag> ().lower_bound (cutoff_a));
channels.get<last_packet_received_tag> ().erase (channels.get<last_packet_received_tag> ().begin (), disconnect_cutoff);
// Remove keepalive attempt tracking for attempts older than cutoff
auto attempts_cutoff (attempts.get<1> ().lower_bound (cutoff_a));
attempts.get<1> ().erase (attempts.get<1> ().begin (), attempts_cutoff);
auto attempts_cutoff (attempts.get<last_attempt_tag> ().lower_bound (cutoff_a));
attempts.get<last_attempt_tag> ().erase (attempts.get<last_attempt_tag> ().begin (), attempts_cutoff);
}

void nano::transport::udp_channels::ongoing_keepalive ()
Expand Down
7 changes: 5 additions & 2 deletions nano/node/transport/udp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ namespace transport
class last_bootstrap_attempt_tag
{
};
class last_attempt_tag
{
};
class node_id_tag
{
};
Expand Down Expand Up @@ -191,9 +194,9 @@ namespace transport
boost::multi_index_container<
endpoint_attempt,
mi::indexed_by<
mi::hashed_unique<
mi::hashed_unique<mi::tag<endpoint_tag>,
mi::member<endpoint_attempt, nano::endpoint, &endpoint_attempt::endpoint>>,
mi::ordered_non_unique<
mi::ordered_non_unique<mi::tag<last_attempt_tag>,
mi::member<endpoint_attempt, std::chrono::steady_clock::time_point, &endpoint_attempt::last_attempt>>>>
attempts;
// clang-format on
Expand Down
1 change: 1 addition & 0 deletions nano/secure/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ nano::node_constants::node_constants (nano::network_constants & network_constant
peer_interval = search_pending_interval;
unchecked_cleaning_interval = std::chrono::minutes (30);
process_confirmed_interval = network_constants.is_test_network () ? std::chrono::milliseconds (50) : std::chrono::milliseconds (500);
max_peers_per_ip = network_constants.is_test_network () ? 10 : 5;
max_weight_samples = network_constants.is_live_network () ? 4032 : 864;
weight_period = 5 * 60; // 5 minutes
}
Expand Down
2 changes: 2 additions & 0 deletions nano/secure/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,8 @@ class node_constants
std::chrono::seconds peer_interval;
std::chrono::minutes unchecked_cleaning_interval;
std::chrono::milliseconds process_confirmed_interval;
/** Maximum number of peers per IP */
size_t max_peers_per_ip;

/** The maximum amount of samples for a 2 week period on live or 3 days on beta */
uint64_t max_weight_samples;
Expand Down