diff --git a/nano/boost/asio/ip/network_v6.hpp b/nano/boost/asio/ip/network_v6.hpp new file mode 100644 index 0000000000..b9f213ab63 --- /dev/null +++ b/nano/boost/asio/ip/network_v6.hpp @@ -0,0 +1,7 @@ +#pragma once + +#include + +DISABLE_ASIO_WARNINGS +#include +REENABLE_WARNINGS diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index f0464c6767..a5bd86da24 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -918,7 +919,10 @@ namespace transport { TEST (network, peer_max_tcp_attempts_subnetwork) { - nano::system system (1); + nano::node_flags node_flags; + node_flags.disable_max_peers_per_ip = true; + nano::system system; + system.add_node (node_flags); auto node (system.nodes[0]); for (auto i (0); i < node->network_params.network.max_peers_per_subnetwork; ++i) { @@ -927,9 +931,9 @@ namespace transport ASSERT_FALSE (node->network.tcp_channels.reachout (endpoint)); } ASSERT_EQ (0, node->network.size ()); - ASSERT_EQ (0, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::out)); + ASSERT_EQ (0, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::out)); ASSERT_TRUE (node->network.tcp_channels.reachout (nano::endpoint (boost::asio::ip::make_address_v6 ("::ffff:127.0.0.1"), nano::get_available_port ()))); - ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::out)); + ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::out)); } } } diff --git a/nano/core_test/socket.cpp b/nano/core_test/socket.cpp index a5746023b9..32d921dd01 100644 --- a/nano/core_test/socket.cpp +++ b/nano/core_test/socket.cpp @@ -1,3 +1,5 @@ +#include +#include #include #include #include @@ -5,23 +7,22 @@ #include +#include +#include +#include +#include + using namespace std::chrono_literals; TEST (socket, max_connections) { - // this is here just so that ASSERT_TIMELY can be used nano::system system; - auto node_flags = nano::inactive_node_flag_defaults (); - node_flags.read_only = false; - nano::inactive_node inactivenode (nano::unique_path (), node_flags); - auto node = inactivenode.node; - - nano::thread_runner runner (node->io_ctx, 1); + auto node = system.add_node (); - auto server_port (nano::get_available_port ()); - boost::asio::ip::tcp::endpoint listen_endpoint (boost::asio::ip::address_v6::any (), server_port); - boost::asio::ip::tcp::endpoint dst_endpoint (boost::asio::ip::address_v6::loopback (), server_port); + auto server_port = nano::get_available_port (); + boost::asio::ip::tcp::endpoint listen_endpoint{ boost::asio::ip::address_v6::any (), server_port }; + boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), server_port }; // start a server socket that allows max 2 live connections auto server_socket = std::make_shared (*node, listen_endpoint, 2); @@ -37,10 +38,10 @@ TEST (socket, max_connections) }); // client side connection tracking - std::atomic connection_attempts = 0; + std::atomic connection_attempts = 0; auto connect_handler = [&connection_attempts] (boost::system::error_code const & ec_a) { ASSERT_EQ (ec_a.value (), 0); - connection_attempts++; + ++connection_attempts; }; // start 3 clients, 2 will persist but 1 will be dropped @@ -102,8 +103,280 @@ TEST (socket, max_connections) ASSERT_TIMELY (5s, server_sockets.size () == 5); // connections accepted by the server node->stop (); - runner.stop_event_processing (); - runner.join (); +} + +TEST (socket, max_connections_per_ip) +{ + nano::system system; + + auto node = system.add_node (); + ASSERT_FALSE (node->flags.disable_max_peers_per_ip); + + auto server_port = nano::get_available_port (); + boost::asio::ip::tcp::endpoint listen_endpoint{ boost::asio::ip::address_v6::any (), server_port }; + boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), server_port }; + + const auto max_ip_connections = node->network_params.network.max_peers_per_ip; + ASSERT_TRUE (max_ip_connections >= 1); + + const auto max_global_connections = 1000; + + auto server_socket = std::make_shared (*node, listen_endpoint, max_global_connections); + boost::system::error_code ec; + server_socket->start (ec); + ASSERT_FALSE (ec); + + // successful incoming connections are stored in server_sockets to keep them alive (server side) + std::vector> server_sockets; + server_socket->on_connection ([&server_sockets] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { + server_sockets.push_back (new_connection); + return true; + }); + + // client side connection tracking + std::atomic connection_attempts = 0; + auto connect_handler = [&connection_attempts] (boost::system::error_code const & ec_a) { + ASSERT_EQ (ec_a.value (), 0); + ++connection_attempts; + }; + + // start n clients, n-1 will persist but 1 will be dropped, where n == max_ip_connections + std::vector> client_list; + client_list.reserve (max_ip_connections + 1); + + for (auto idx = 0; idx < max_ip_connections + 1; ++idx) + { + auto client = std::make_shared (*node); + client->async_connect (dst_endpoint, connect_handler); + client_list.push_back (client); + } + + auto get_tcp_max_per_ip = [&node] () { + return node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::in); + }; + + auto get_tcp_accept_successes = [&node] () { + return node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in); + }; + + ASSERT_TIMELY (5s, get_tcp_accept_successes () == max_ip_connections); + ASSERT_TIMELY (5s, get_tcp_max_per_ip () == 1); + ASSERT_TIMELY (5s, connection_attempts == max_ip_connections + 1); + + node->stop (); +} + +TEST (socket, limited_subnet_address) +{ + auto address = boost::asio::ip::make_address ("a41d:b7b2:8298:cf45:672e:bd1a:e7fb:f713"); + auto network = nano::socket_functions::get_ipv6_subnet_address (address.to_v6 (), 32); // network prefix = 32. + ASSERT_EQ ("a41d:b7b2:8298:cf45:672e:bd1a:e7fb:f713/32", network.to_string ()); + ASSERT_EQ ("a41d:b7b2::/32", network.canonical ().to_string ()); +} + +TEST (socket, first_ipv6_subnet_address) +{ + auto address = boost::asio::ip::make_address ("a41d:b7b2:8298:cf45:672e:bd1a:e7fb:f713"); + auto first_address = nano::socket_functions::first_ipv6_subnet_address (address.to_v6 (), 32); // network prefix = 32. + ASSERT_EQ ("a41d:b7b2::", first_address.to_string ()); +} + +TEST (socket, last_ipv6_subnet_address) +{ + auto address = boost::asio::ip::make_address ("a41d:b7b2:8298:cf45:672e:bd1a:e7fb:f713"); + auto last_address = nano::socket_functions::last_ipv6_subnet_address (address.to_v6 (), 32); // network prefix = 32. + ASSERT_EQ ("a41d:b7b2:ffff:ffff:ffff:ffff:ffff:ffff", last_address.to_string ()); +} + +TEST (socket, count_subnetwork_connections) +{ + nano::system system; + auto node = system.add_node (); + + auto address0 = boost::asio::ip::make_address ("a41d:b7b1:ffff:ffff:ffff:ffff:ffff:ffff"); // out of network prefix + auto address1 = boost::asio::ip::make_address ("a41d:b7b2:8298:cf45:672e:bd1a:e7fb:f713"); // referece address + auto address2 = boost::asio::ip::make_address ("a41d:b7b2::"); // start of the network range + auto address3 = boost::asio::ip::make_address ("a41d:b7b2::1"); + auto address4 = boost::asio::ip::make_address ("a41d:b7b2:ffff:ffff:ffff:ffff:ffff:ffff"); // end of the network range + auto address5 = boost::asio::ip::make_address ("a41d:b7b3::"); // out of the network prefix + auto address6 = boost::asio::ip::make_address ("a41d:b7b3::1"); // out of the network prefix + + auto connection0 = std::make_shared (*node); + auto connection1 = std::make_shared (*node); + auto connection2 = std::make_shared (*node); + auto connection3 = std::make_shared (*node); + auto connection4 = std::make_shared (*node); + auto connection5 = std::make_shared (*node); + auto connection6 = std::make_shared (*node); + + nano::address_socket_mmap connections_per_address; + connections_per_address.emplace (address0, connection0); + connections_per_address.emplace (address1, connection1); + connections_per_address.emplace (address2, connection2); + connections_per_address.emplace (address3, connection3); + connections_per_address.emplace (address4, connection4); + connections_per_address.emplace (address5, connection5); + connections_per_address.emplace (address6, connection6); + + // Asserts it counts only the connections for the specified address and its network prefix. + ASSERT_EQ (4, nano::socket_functions::count_subnetwork_connections (connections_per_address, address1.to_v6 (), 32)); +} + +TEST (socket, max_connections_per_subnetwork) +{ + nano::system system; + + nano::node_flags node_flags; + // disabling IP limit because it will be used the same IP address to check they come from the same subnetwork. + node_flags.disable_max_peers_per_ip = true; + node_flags.disable_max_peers_per_subnetwork = false; + auto node = system.add_node (node_flags); + ASSERT_TRUE (node->flags.disable_max_peers_per_ip); + ASSERT_FALSE (node->flags.disable_max_peers_per_subnetwork); + + auto server_port = nano::get_available_port (); + boost::asio::ip::tcp::endpoint listen_endpoint{ boost::asio::ip::address_v6::any (), server_port }; + boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), server_port }; + + const auto max_subnetwork_connections = node->network_params.network.max_peers_per_subnetwork; + ASSERT_TRUE (max_subnetwork_connections >= 1); + + const auto max_global_connections = 1000; + + auto server_socket = std::make_shared (*node, listen_endpoint, max_global_connections); + boost::system::error_code ec; + server_socket->start (ec); + ASSERT_FALSE (ec); + + // successful incoming connections are stored in server_sockets to keep them alive (server side) + std::vector> server_sockets; + server_socket->on_connection ([&server_sockets] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { + server_sockets.push_back (new_connection); + return true; + }); + + // client side connection tracking + std::atomic connection_attempts = 0; + auto connect_handler = [&connection_attempts] (boost::system::error_code const & ec_a) { + ASSERT_EQ (ec_a.value (), 0); + ++connection_attempts; + }; + + // start n clients, n-1 will persist but 1 will be dropped, where n == max_subnetwork_connections + std::vector> client_list; + client_list.reserve (max_subnetwork_connections + 1); + + for (auto idx = 0; idx < max_subnetwork_connections + 1; ++idx) + { + auto client = std::make_shared (*node); + client->async_connect (dst_endpoint, connect_handler); + client_list.push_back (client); + } + + auto get_tcp_max_per_subnetwork = [&node] () { + return node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::in); + }; + + auto get_tcp_accept_successes = [&node] () { + return node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in); + }; + + ASSERT_TIMELY (5s, get_tcp_accept_successes () == max_subnetwork_connections); + ASSERT_TIMELY (5s, get_tcp_max_per_subnetwork () == 1); + ASSERT_TIMELY (5s, connection_attempts == max_subnetwork_connections + 1); + + node->stop (); +} + +TEST (socket, disabled_max_peers_per_ip) +{ + nano::system system; + + nano::node_flags node_flags; + node_flags.disable_max_peers_per_ip = true; + auto node = system.add_node (node_flags); + ASSERT_TRUE (node->flags.disable_max_peers_per_ip); + + auto server_port = nano::get_available_port (); + boost::asio::ip::tcp::endpoint listen_endpoint{ boost::asio::ip::address_v6::any (), server_port }; + boost::asio::ip::tcp::endpoint dst_endpoint{ boost::asio::ip::address_v6::loopback (), server_port }; + + const auto max_ip_connections = node->network_params.network.max_peers_per_ip; + ASSERT_TRUE (max_ip_connections >= 1); + + const auto max_global_connections = 1000; + + auto server_socket = std::make_shared (*node, listen_endpoint, max_global_connections); + boost::system::error_code ec; + server_socket->start (ec); + ASSERT_FALSE (ec); + + // successful incoming connections are stored in server_sockets to keep them alive (server side) + std::vector> server_sockets; + server_socket->on_connection ([&server_sockets] (std::shared_ptr const & new_connection, boost::system::error_code const & ec_a) { + server_sockets.push_back (new_connection); + return true; + }); + + // client side connection tracking + std::atomic connection_attempts = 0; + auto connect_handler = [&connection_attempts] (boost::system::error_code const & ec_a) { + ASSERT_EQ (ec_a.value (), 0); + ++connection_attempts; + }; + + // start n clients, n-1 will persist but 1 will be dropped, where n == max_ip_connections + std::vector> client_list; + client_list.reserve (max_ip_connections + 1); + + for (auto idx = 0; idx < max_ip_connections + 1; ++idx) + { + auto client = std::make_shared (*node); + client->async_connect (dst_endpoint, connect_handler); + client_list.push_back (client); + } + + auto get_tcp_max_per_ip = [&node] () { + return node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::in); + }; + + auto get_tcp_accept_successes = [&node] () { + return node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in); + }; + + ASSERT_TIMELY (5s, get_tcp_accept_successes () == max_ip_connections + 1); + ASSERT_TIMELY (5s, get_tcp_max_per_ip () == 0); + ASSERT_TIMELY (5s, connection_attempts == max_ip_connections + 1); + + node->stop (); +} + +TEST (socket, disconnection_of_silent_connections) +{ + nano::system system; + auto node = system.add_node (); + auto socket = std::make_shared (*node); + // Classify the socket type as real-time as the disconnections are done only for this connection type. + socket->type_set (nano::socket::type_t::realtime); + // Silent connections are connections open by external peers that don't contribute with any data. + socket->set_silent_connection_tolerance_time (std::chrono::seconds{ 5 }); + auto bootstrap_endpoint = node->bootstrap.endpoint (); + std::atomic connected{ false }; + // Opening a connection that will be closed because it remains silent during the tolerance time. + socket->async_connect (bootstrap_endpoint, [socket, &connected] (boost::system::error_code const & ec) { + ASSERT_FALSE (ec); + connected = true; + }); + ASSERT_TIMELY (4s, connected); + // Checking the connection was closed. + ASSERT_TIMELY (10s, socket->is_closed ()); + + auto get_tcp_silent_connection_drops = [&node] () { + return node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_silent_connection_drop, nano::stat::dir::in); + }; + ASSERT_EQ (1, get_tcp_silent_connection_drops ()); + + node->stop (); } TEST (socket, drop_policy) @@ -173,6 +446,8 @@ TEST (socket, concurrent_writes) { auto node_flags = nano::inactive_node_flag_defaults (); node_flags.read_only = false; + node_flags.disable_max_peers_per_ip = true; + node_flags.disable_max_peers_per_subnetwork = true; nano::inactive_node inactivenode (nano::unique_path (), node_flags); auto node = inactivenode.node; diff --git a/nano/lib/config.hpp b/nano/lib/config.hpp index cd101d921f..e6eb1c50d7 100644 --- a/nano/lib/config.hpp +++ b/nano/lib/config.hpp @@ -156,10 +156,12 @@ class network_constants request_interval_ms = is_dev_network () ? 20 : 500; cleanup_period = is_dev_network () ? std::chrono::seconds (1) : std::chrono::seconds (60); idle_timeout = is_dev_network () ? cleanup_period * 15 : cleanup_period * 2; + silent_connection_tolerance_time = std::chrono::seconds (120); syn_cookie_cutoff = std::chrono::seconds (5); bootstrap_interval = std::chrono::seconds (15 * 60); max_peers_per_ip = is_dev_network () ? 10 : 5; max_peers_per_subnetwork = max_peers_per_ip * 4; + ipv6_subnetwork_prefix_for_limiting = 64; // Equivalent to network prefix /64. peer_dump_interval = is_dev_network () ? std::chrono::seconds (1) : std::chrono::seconds (5 * 60); } @@ -188,12 +190,14 @@ class network_constants } /** Default maximum idle time for a socket before it's automatically closed */ std::chrono::seconds idle_timeout; + std::chrono::seconds silent_connection_tolerance_time; std::chrono::seconds syn_cookie_cutoff; std::chrono::seconds bootstrap_interval; - /** Maximum number of peers per IP */ + /** Maximum number of peers per IP. It is also the max number of connections per IP */ size_t max_peers_per_ip; /** Maximum number of peers per subnetwork */ size_t max_peers_per_subnetwork; + size_t ipv6_subnetwork_prefix_for_limiting; std::chrono::seconds peer_dump_interval; /** Returns the network this object contains values for */ diff --git a/nano/lib/stats.cpp b/nano/lib/stats.cpp index 7df9eab711..3325ba0d8b 100644 --- a/nano/lib/stats.cpp +++ b/nano/lib/stats.cpp @@ -775,6 +775,15 @@ std::string nano::stat::detail_to_string (uint32_t key) case nano::stat::detail::tcp_max_per_ip: res = "tcp_max_per_ip"; break; + case nano::stat::detail::tcp_max_per_subnetwork: + res = "tcp_max_per_subnetwork"; + break; + case nano::stat::detail::tcp_silent_connection_drop: + res = "tcp_silent_connection_drop"; + break; + case nano::stat::detail::tcp_io_timeout_drop: + res = "tcp_io_timeout_drop"; + break; case nano::stat::detail::unreachable_host: res = "unreachable_host"; break; @@ -808,6 +817,12 @@ std::string nano::stat::detail_to_string (uint32_t key) case nano::stat::detail::outdated_version: res = "outdated_version"; break; + case nano::stat::detail::udp_max_per_ip: + res = "udp_max_per_ip"; + break; + case nano::stat::detail::udp_max_per_subnetwork: + res = "udp_max_per_subnetwork"; + break; case nano::stat::detail::blocks_confirmed: res = "blocks_confirmed"; break; diff --git a/nano/lib/stats.hpp b/nano/lib/stats.hpp index ea5ba2a1fe..3eed7d4014 100644 --- a/nano/lib/stats.hpp +++ b/nano/lib/stats.hpp @@ -337,6 +337,8 @@ class stat final invalid_telemetry_req_message, invalid_telemetry_ack_message, outdated_version, + udp_max_per_ip, + udp_max_per_subnetwork, // tcp tcp_accept_success, @@ -345,6 +347,9 @@ class stat final tcp_write_no_socket_drop, tcp_excluded, tcp_max_per_ip, + tcp_max_per_subnetwork, + tcp_silent_connection_drop, + tcp_io_timeout_drop, // ipc invocations, diff --git a/nano/node/bootstrap/bootstrap_server.cpp b/nano/node/bootstrap/bootstrap_server.cpp index b7632ef717..a4993cd6d7 100644 --- a/nano/node/bootstrap/bootstrap_server.cpp +++ b/nano/node/bootstrap/bootstrap_server.cpp @@ -749,5 +749,5 @@ bool nano::bootstrap_server::is_bootstrap_connection () bool nano::bootstrap_server::is_realtime_connection () { - return socket->type () == nano::socket::type_t::realtime || socket->type () == nano::socket::type_t::realtime_response_server; + return socket->is_realtime_connection (); } diff --git a/nano/node/nodeconfig.hpp b/nano/node/nodeconfig.hpp index 8dab45086b..54a219906b 100644 --- a/nano/node/nodeconfig.hpp +++ b/nano/node/nodeconfig.hpp @@ -144,6 +144,7 @@ class node_flags final bool disable_block_processor_republishing{ false }; bool allow_bootstrap_peers_duplicates{ false }; bool disable_max_peers_per_ip{ false }; // For testing only + bool disable_max_peers_per_subnetwork{ false }; // For testing only bool force_use_write_database_queue{ false }; // For testing only. RocksDB does not use the database queue, but some tests rely on it being used. bool disable_search_pending{ false }; // For testing only bool enable_pruning{ false }; diff --git a/nano/node/socket.cpp b/nano/node/socket.cpp index 7e6ad87bd7..1e4d78d7f6 100644 --- a/nano/node/socket.cpp +++ b/nano/node/socket.cpp @@ -1,12 +1,19 @@ #include #include +#include +#include +#include #include #include #include +#include #include +#include +#include #include +#include nano::socket::socket (nano::node & node_a) : strand{ node_a.io_ctx.get_executor () }, @@ -14,7 +21,9 @@ nano::socket::socket (nano::node & node_a) : node{ node_a }, next_deadline{ std::numeric_limits::max () }, last_completion_time{ 0 }, - io_timeout{ node_a.config.tcp_io_timeout } + last_receive_time{ 0 }, + io_timeout{ node_a.config.tcp_io_timeout }, + silent_connection_tolerance_time{ node_a.network_params.network.silent_connection_tolerance_time } { } @@ -51,6 +60,7 @@ void nano::socket::async_read (std::shared_ptr> const & buf [this_l, buffer_a, callback_a] (boost::system::error_code const & ec, std::size_t size_a) { this_l->node.stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::in, size_a); this_l->stop_timer (); + this_l->update_last_receive_time (); callback_a (ec, size_a); })); })); @@ -117,6 +127,11 @@ void nano::socket::stop_timer () last_completion_time = nano::seconds_since_epoch (); } +void nano::socket::update_last_receive_time () +{ + last_receive_time = nano::seconds_since_epoch (); +} + void nano::socket::checkup () { std::weak_ptr this_w (shared_from_this ()); @@ -124,7 +139,18 @@ void nano::socket::checkup () if (auto this_l = this_w.lock ()) { uint64_t now (nano::seconds_since_epoch ()); + auto condition_to_disconnect{ false }; + if (this_l->is_realtime_connection () && now - this_l->last_receive_time > this_l->silent_connection_tolerance_time.count ()) + { + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_silent_connection_drop, nano::stat::dir::in); + condition_to_disconnect = true; + } if (this_l->next_deadline != std::numeric_limits::max () && now - this_l->last_completion_time > this_l->next_deadline) + { + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_io_timeout_drop, nano::stat::dir::in); + condition_to_disconnect = true; + } + if (condition_to_disconnect) { if (this_l->node.config.logging.network_timeout_logging ()) { @@ -157,6 +183,14 @@ void nano::socket::timeout_set (std::chrono::seconds io_timeout_a) io_timeout = io_timeout_a; } +void nano::socket::set_silent_connection_tolerance_time (std::chrono::seconds tolerance_time_a) +{ + auto this_l (shared_from_this ()); + boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l, tolerance_time_a] () { + this_l->silent_connection_tolerance_time = tolerance_time_a; + })); +} + void nano::socket::close () { auto this_l (shared_from_this ()); @@ -221,17 +255,80 @@ void nano::server_socket::close () boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l] () { this_l->close_internal (); this_l->acceptor.close (); - for (auto & connection_w : this_l->connections) + for (auto & address_connection_pair : this_l->connections_per_address) { - if (auto connection_l = connection_w.lock ()) + if (auto connection_l = address_connection_pair.second.lock ()) { connection_l->close (); } } - this_l->connections.clear (); + this_l->connections_per_address.clear (); })); } +boost::asio::ip::network_v6 nano::socket_functions::get_ipv6_subnet_address (boost::asio::ip::address_v6 const & ip_address, size_t network_prefix) +{ + return boost::asio::ip::make_network_v6 (ip_address, network_prefix); +} + +boost::asio::ip::address nano::socket_functions::first_ipv6_subnet_address (boost::asio::ip::address_v6 const & ip_address, size_t network_prefix) +{ + auto range = get_ipv6_subnet_address (ip_address, network_prefix).hosts (); + debug_assert (!range.empty ()); + return *(range.begin ()); +} + +boost::asio::ip::address nano::socket_functions::last_ipv6_subnet_address (boost::asio::ip::address_v6 const & ip_address, size_t network_prefix) +{ + auto range = get_ipv6_subnet_address (ip_address, network_prefix).hosts (); + debug_assert (!range.empty ()); + return *(--range.end ()); +} + +size_t nano::socket_functions::count_subnetwork_connections ( +nano::address_socket_mmap const & per_address_connections, +boost::asio::ip::address_v6 const & remote_address, +size_t network_prefix) +{ + auto range = get_ipv6_subnet_address (remote_address, network_prefix).hosts (); + if (range.empty ()) + { + return 0; + } + auto const first_ip = first_ipv6_subnet_address (remote_address, network_prefix); + auto const last_ip = last_ipv6_subnet_address (remote_address, network_prefix); + auto const counted_connections = std::distance (per_address_connections.lower_bound (first_ip), per_address_connections.upper_bound (last_ip)); + return counted_connections; +} + +bool nano::server_socket::limit_reached_for_incoming_subnetwork_connections (std::shared_ptr const & new_connection) +{ + debug_assert (strand.running_in_this_thread ()); + if (node.flags.disable_max_peers_per_subnetwork) + { + // If the limit is disabled, then it is unreachable. + return false; + } + auto const counted_connections = socket_functions::count_subnetwork_connections ( + connections_per_address, + nano::transport::mapped_from_v4_or_v6 (new_connection->remote.address ()), + node.network_params.network.ipv6_subnetwork_prefix_for_limiting); + return counted_connections >= node.network_params.network.max_peers_per_subnetwork; +} + +bool nano::server_socket::limit_reached_for_incoming_ip_connections (std::shared_ptr const & new_connection) +{ + debug_assert (strand.running_in_this_thread ()); + if (node.flags.disable_max_peers_per_ip) + { + // If the limit is disabled, then it is unreachable. + return false; + } + auto const address_connections_range = connections_per_address.equal_range (new_connection->remote.address ()); + auto const counted_connections = std::distance (address_connections_range.first, address_connections_range.second); + return counted_connections >= node.network_params.network.max_peers_per_ip; +} + void nano::server_socket::on_connection (std::function const &, boost::system::error_code const &)> callback_a) { auto this_l (std::static_pointer_cast (shared_from_this ())); @@ -250,14 +347,41 @@ void nano::server_socket::on_connection (std::functionevict_dead_connections (); - if (this_l->connections.size () >= this_l->max_inbound_connections) + if (this_l->connections_per_address.size () >= this_l->max_inbound_connections) { - this_l->node.logger.always_log ("Network: max_inbound_connections reached, unable to open new connection"); + this_l->node.logger.try_log ("Network: max_inbound_connections reached, unable to open new connection"); this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in); this_l->on_connection_requeue_delayed (callback_a); return; } + if (this_l->limit_reached_for_incoming_ip_connections (new_connection)) + { + auto const remote_ip_address = new_connection->remote_endpoint ().address (); + auto const log_message = boost::str ( + boost::format ("Network: max connections per IP (max_peers_per_ip) was reached for %1%, unable to open new connection") + % remote_ip_address.to_string ()); + this_l->node.logger.try_log (log_message); + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_ip, nano::stat::dir::in); + this_l->on_connection_requeue_delayed (callback_a); + return; + } + + if (this_l->limit_reached_for_incoming_subnetwork_connections (new_connection)) + { + auto const remote_ip_address = new_connection->remote_endpoint ().address (); + debug_assert (remote_ip_address.is_v6 ()); + auto const remote_subnet = socket_functions::get_ipv6_subnet_address (remote_ip_address.to_v6 (), this_l->node.network_params.network.max_peers_per_subnetwork); + auto const log_message = boost::str ( + boost::format ("Network: max connections per subnetwork (max_peers_per_subnetwork) was reached for subnetwork %1% (remote IP: %2%), unable to open new connection") + % remote_subnet.canonical ().to_string () + % remote_ip_address.to_string ()); + this_l->node.logger.try_log (log_message); + this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::in); + this_l->on_connection_requeue_delayed (callback_a); + return; + } + if (!ec_a) { // Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start @@ -265,7 +389,7 @@ void nano::server_socket::on_connection (std::functioncheckup (); new_connection->start_timer (this_l->node.network_params.network.is_dev_network () ? std::chrono::seconds (2) : this_l->node.network_params.network.idle_timeout); this_l->node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in); - this_l->connections.push_back (new_connection); + this_l->connections_per_address.emplace (new_connection->remote.address (), new_connection); if (callback_a (new_connection, ec_a)) { this_l->on_connection (callback_a); @@ -329,5 +453,13 @@ bool nano::server_socket::is_temporary_error (boost::system::error_code const ec void nano::server_socket::evict_dead_connections () { debug_assert (strand.running_in_this_thread ()); - connections.erase (std::remove_if (connections.begin (), connections.end (), [] (auto & connection) { return connection.expired (); }), connections.end ()); + for (auto it = connections_per_address.begin (); it != connections_per_address.end ();) + { + if (it->second.expired ()) + { + it = connections_per_address.erase (it); + continue; + } + ++it; + } } diff --git a/nano/node/socket.hpp b/nano/node/socket.hpp index 1b4868fd3d..cda8c2bc03 100644 --- a/nano/node/socket.hpp +++ b/nano/node/socket.hpp @@ -8,9 +8,21 @@ #include #include +#include #include #include +namespace boost +{ +namespace asio +{ + namespace ip + { + class network_v6; + } +} +} + namespace nano { /** Policy to affect at which stage a buffer can be dropped */ @@ -60,6 +72,7 @@ class socket : public std::enable_shared_from_this /** This can be called to change the maximum idle time, e.g. based on the type of traffic detected. */ void timeout_set (std::chrono::seconds io_timeout_a); void start_timer (std::chrono::seconds deadline_a); + void set_silent_connection_tolerance_time (std::chrono::seconds tolerance_time_a); bool max () const { return queue_size >= queue_size_max; @@ -76,6 +89,14 @@ class socket : public std::enable_shared_from_this { type_m = type_a; } + bool is_realtime_connection () + { + return type () == nano::socket::type_t::realtime || type () == nano::socket::type_t::realtime_response_server; + } + bool is_closed () + { + return closed; + } protected: /** Holds the buffer and callback for queued writes */ @@ -95,8 +116,10 @@ class socket : public std::enable_shared_from_this std::atomic next_deadline; std::atomic last_completion_time; + std::atomic last_receive_time; std::atomic timed_out{ false }; std::atomic io_timeout; + std::chrono::seconds silent_connection_tolerance_time; std::atomic queue_size{ 0 }; /** Set by close() - completion handlers must check this. This is more reliable than checking @@ -105,6 +128,7 @@ class socket : public std::enable_shared_from_this void close_internal (); void start_timer (); void stop_timer (); + void update_last_receive_time (); void checkup (); private: @@ -114,6 +138,16 @@ class socket : public std::enable_shared_from_this static std::size_t constexpr queue_size_max = 128; }; +using address_socket_mmap = std::multimap>; + +namespace socket_functions +{ + boost::asio::ip::network_v6 get_ipv6_subnet_address (boost::asio::ip::address_v6 const &, size_t); + boost::asio::ip::address first_ipv6_subnet_address (boost::asio::ip::address_v6 const &, size_t); + boost::asio::ip::address last_ipv6_subnet_address (boost::asio::ip::address_v6 const &, size_t); + size_t count_subnetwork_connections (nano::address_socket_mmap const &, boost::asio::ip::address_v6 const &, size_t); +} + /** Socket class for TCP servers */ class server_socket final : public socket { @@ -138,12 +172,15 @@ class server_socket final : public socket } private: - std::vector> connections; + nano::address_socket_mmap connections_per_address; boost::asio::ip::tcp::acceptor acceptor; boost::asio::ip::tcp::endpoint local; std::size_t max_inbound_connections; void evict_dead_connections (); bool is_temporary_error (boost::system::error_code const ec_a); void on_connection_requeue_delayed (std::function const & new_connection, boost::system::error_code const &)>); + /** Checks whether the maximum number of connections per IP was reached. If so, it returns true. */ + bool limit_reached_for_incoming_ip_connections (std::shared_ptr const & new_connection); + bool limit_reached_for_incoming_subnetwork_connections (std::shared_ptr const & new_connection); }; } diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 75ba49ded6..d5bfffef08 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -362,17 +362,17 @@ void nano::transport::tcp_channels::stop () bool nano::transport::tcp_channels::max_ip_connections (nano::tcp_endpoint const & endpoint_a) { - bool result (false); - if (!node.flags.disable_max_peers_per_ip) + if (node.flags.disable_max_peers_per_ip) { - auto const address (nano::transport::ipv4_address_or_ipv6_subnet (endpoint_a.address ())); - auto const subnet (nano::transport::map_address_to_subnetwork (endpoint_a.address ())); - nano::unique_lock lock (mutex); - result = channels.get ().count (address) >= node.network_params.network.max_peers_per_ip || channels.get ().count (subnet) >= node.network_params.network.max_peers_per_subnetwork; - if (!result) - { - result = attempts.get ().count (address) >= node.network_params.network.max_peers_per_ip || attempts.get ().count (subnet) >= node.network_params.network.max_peers_per_subnetwork; - } + return false; + } + bool result{ false }; + auto const address (nano::transport::ipv4_address_or_ipv6_subnet (endpoint_a.address ())); + nano::unique_lock lock (mutex); + result = channels.get ().count (address) >= node.network_params.network.max_peers_per_ip; + if (!result) + { + result = attempts.get ().count (address) >= node.network_params.network.max_peers_per_ip; } if (result) { @@ -381,11 +381,37 @@ bool nano::transport::tcp_channels::max_ip_connections (nano::tcp_endpoint const return result; } +bool nano::transport::tcp_channels::max_subnetwork_connections (nano::tcp_endpoint const & endpoint_a) +{ + if (node.flags.disable_max_peers_per_subnetwork) + { + return false; + } + bool result{ false }; + auto const subnet (nano::transport::map_address_to_subnetwork (endpoint_a.address ())); + nano::unique_lock lock (mutex); + result = channels.get ().count (subnet) >= node.network_params.network.max_peers_per_subnetwork; + if (!result) + { + result = attempts.get ().count (subnet) >= node.network_params.network.max_peers_per_subnetwork; + } + if (result) + { + node.stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_max_per_subnetwork, nano::stat::dir::out); + } + return result; +} + +bool nano::transport::tcp_channels::max_ip_or_subnetwork_connections (nano::tcp_endpoint const & endpoint_a) +{ + return max_ip_connections (endpoint_a) || max_subnetwork_connections (endpoint_a); +} + bool nano::transport::tcp_channels::reachout (nano::endpoint const & endpoint_a) { auto tcp_endpoint (nano::transport::map_endpoint_to_tcp (endpoint_a)); // Don't overload single IP - bool error = node.network.excluded_peers.check (tcp_endpoint) || max_ip_connections (tcp_endpoint); + bool error = node.network.excluded_peers.check (tcp_endpoint) || max_ip_or_subnetwork_connections (tcp_endpoint); if (!error && !node.flags.disable_tcp_realtime) { // Don't keepalive to nodes that already sent us something diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index 611791b508..353f35de37 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -91,7 +91,9 @@ namespace transport void stop (); void process_messages (); void process_message (nano::message const &, nano::tcp_endpoint const &, nano::account const &, std::shared_ptr const &); - bool max_ip_connections (nano::tcp_endpoint const &); + bool max_ip_connections (nano::tcp_endpoint const & endpoint_a); + bool max_subnetwork_connections (nano::tcp_endpoint const & endpoint_a); + bool max_ip_or_subnetwork_connections (nano::tcp_endpoint const & endpoint_a); // Should we reach out to this endpoint with a keepalive message bool reachout (nano::endpoint const &); std::unique_ptr collect_container_info (std::string const &); diff --git a/nano/node/transport/transport.cpp b/nano/node/transport/transport.cpp index 718df5d39b..79998ec594 100644 --- a/nano/node/transport/transport.cpp +++ b/nano/node/transport/transport.cpp @@ -2,6 +2,9 @@ #include #include +#include +#include +#include #include #include @@ -82,7 +85,7 @@ nano::tcp_endpoint nano::transport::map_endpoint_to_tcp (nano::endpoint const & boost::asio::ip::address nano::transport::map_address_to_subnetwork (boost::asio::ip::address const & address_a) { debug_assert (address_a.is_v6 ()); - static short const ipv6_subnet_prefix_length = 32; // Limits for /32 IPv6 subnetwork + static short const ipv6_subnet_prefix_length = 32; // Equivalent to network prefix /32. static short const ipv4_subnet_prefix_length = (128 - 32) + 24; // Limits for /24 IPv4 subnetwork return address_a.to_v6 ().is_v4_mapped () ? boost::asio::ip::make_network_v6 (address_a.to_v6 (), ipv4_subnet_prefix_length).network () : boost::asio::ip::make_network_v6 (address_a.to_v6 (), ipv6_subnet_prefix_length).network (); } @@ -159,12 +162,14 @@ std::string nano::transport::channel_loopback::to_string () const return boost::str (boost::format ("%1%") % endpoint); } -namespace -{ -boost::asio::ip::address_v6 mapped_from_v4_bytes (unsigned long address_a) +boost::asio::ip::address_v6 nano::transport::mapped_from_v4_bytes (unsigned long address_a) { return boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (address_a)); } + +boost::asio::ip::address_v6 nano::transport::mapped_from_v4_or_v6 (boost::asio::ip::address const & address_a) +{ + return address_a.is_v4 () ? boost::asio::ip::address_v6::v4_mapped (address_a.to_v4 ()) : address_a.to_v6 (); } bool nano::transport::reserved_address (nano::endpoint const & endpoint_a, bool allow_local_peers) diff --git a/nano/node/transport/transport.hpp b/nano/node/transport/transport.hpp index 56e5f6b134..66af0baa48 100644 --- a/nano/node/transport/transport.hpp +++ b/nano/node/transport/transport.hpp @@ -30,6 +30,8 @@ namespace transport nano::tcp_endpoint map_endpoint_to_tcp (nano::endpoint const &); boost::asio::ip::address map_address_to_subnetwork (boost::asio::ip::address const &); boost::asio::ip::address ipv4_address_or_ipv6_subnet (boost::asio::ip::address const &); + boost::asio::ip::address_v6 mapped_from_v4_bytes (unsigned long); + boost::asio::ip::address_v6 mapped_from_v4_or_v6 (boost::asio::ip::address const &); // Unassigned, reserved, self bool reserved_address (nano::endpoint const &, bool = false); static std::chrono::seconds constexpr syn_cookie_cutoff = std::chrono::seconds (5); diff --git a/nano/node/transport/udp.cpp b/nano/node/transport/udp.cpp index 9b4885ba9b..8013e1a81c 100644 --- a/nano/node/transport/udp.cpp +++ b/nano/node/transport/udp.cpp @@ -100,7 +100,7 @@ std::shared_ptr nano::transport::udp_channels::ins { debug_assert (endpoint_a.address ().is_v6 ()); std::shared_ptr result; - if (!node.network.not_a_peer (endpoint_a, node.config.allow_local_peers) && (node.network_params.network.is_dev_network () || !max_ip_connections (endpoint_a))) + if (!node.network.not_a_peer (endpoint_a, node.config.allow_local_peers) && (node.network_params.network.is_dev_network () || !max_ip_or_subnetwork_connections (endpoint_a))) { nano::unique_lock lock (mutex); auto existing (channels.get ().find (endpoint_a)); @@ -373,7 +373,7 @@ class udp_message_visitor : public nano::message_visitor } void keepalive (nano::keepalive const & message_a) override { - if (!node.network.udp_channels.max_ip_connections (endpoint)) + if (!node.network.udp_channels.max_ip_or_subnetwork_connections (endpoint)) { auto cookie (node.network.syn_cookies.assign (endpoint)); if (cookie) @@ -630,21 +630,45 @@ std::shared_ptr nano::transport::udp_channels::create bool nano::transport::udp_channels::max_ip_connections (nano::endpoint const & endpoint_a) { - bool result (false); - if (!node.flags.disable_max_peers_per_ip) + if (node.flags.disable_max_peers_per_ip) { - auto const address (nano::transport::ipv4_address_or_ipv6_subnet (endpoint_a.address ())); - auto const subnet (nano::transport::map_address_to_subnetwork (endpoint_a.address ())); - nano::unique_lock lock (mutex); - result = channels.get ().count (address) >= node.network_params.network.max_peers_per_ip || channels.get ().count (subnet) >= node.network_params.network.max_peers_per_subnetwork; + return false; + } + auto const address (nano::transport::ipv4_address_or_ipv6_subnet (endpoint_a.address ())); + nano::unique_lock lock (mutex); + auto const result = channels.get ().count (address) >= node.network_params.network.max_peers_per_ip; + if (!result) + { + node.stats.inc (nano::stat::type::udp, nano::stat::detail::udp_max_per_ip, nano::stat::dir::out); } return result; } +bool nano::transport::udp_channels::max_subnetwork_connections (nano::endpoint const & endpoint_a) +{ + if (node.flags.disable_max_peers_per_subnetwork) + { + return false; + } + auto const subnet (nano::transport::map_address_to_subnetwork (endpoint_a.address ())); + nano::unique_lock lock (mutex); + auto const result = channels.get ().count (subnet) >= node.network_params.network.max_peers_per_subnetwork; + if (!result) + { + node.stats.inc (nano::stat::type::udp, nano::stat::detail::udp_max_per_subnetwork, nano::stat::dir::out); + } + return result; +} + +bool nano::transport::udp_channels::max_ip_or_subnetwork_connections (nano::endpoint const & endpoint_a) +{ + return max_ip_connections (endpoint_a) || max_subnetwork_connections (endpoint_a); +} + bool nano::transport::udp_channels::reachout (nano::endpoint const & endpoint_a) { // Don't overload single IP - bool error = max_ip_connections (endpoint_a); + bool error = max_ip_or_subnetwork_connections (endpoint_a); if (!error && !node.flags.disable_udp) { auto endpoint_l (nano::transport::map_endpoint_to_v6 (endpoint_a)); diff --git a/nano/node/transport/udp.hpp b/nano/node/transport/udp.hpp index 6eb0e9bc3c..25e88e56d1 100644 --- a/nano/node/transport/udp.hpp +++ b/nano/node/transport/udp.hpp @@ -96,7 +96,9 @@ namespace transport void receive_action (nano::message_buffer *); void process_packets (); std::shared_ptr create (nano::endpoint const &); - bool max_ip_connections (nano::endpoint const &); + bool max_ip_connections (nano::endpoint const & endpoint_a); + bool max_subnetwork_connections (nano::endpoint const & endpoint_a); + bool max_ip_or_subnetwork_connections (nano::endpoint const & endpoint_a); // Should we reach out to this endpoint with a keepalive message bool reachout (nano::endpoint const &); std::unique_ptr collect_container_info (std::string const &);