diff --git a/nano/node/bootstrap/bootstrap_server.cpp b/nano/node/bootstrap/bootstrap_server.cpp index d4f98f8868..83d6b10b16 100644 --- a/nano/node/bootstrap/bootstrap_server.cpp +++ b/nano/node/bootstrap/bootstrap_server.cpp @@ -507,22 +507,22 @@ void nano::bootstrap_server::receive_node_id_handshake_action (boost::system::er void nano::bootstrap_server::add_request (std::unique_ptr message_a) { debug_assert (message_a != nullptr); - nano::lock_guard lock (mutex); + nano::unique_lock lock (mutex); auto start (requests.empty ()); requests.push (std::move (message_a)); if (start) { - run_next (); + run_next (lock); } } void nano::bootstrap_server::finish_request () { - nano::lock_guard lock (mutex); + nano::unique_lock lock (mutex); requests.pop (); if (!requests.empty ()) { - run_next (); + run_next (lock); } else { @@ -582,35 +582,19 @@ class request_response_visitor : public nano::message_visitor } void keepalive (nano::keepalive const & message_a) override { - connection->finish_request_async (); - auto connection_l (connection->shared_from_this ()); - connection->node->background ([connection_l, message_a]() { - connection_l->node->network.tcp_channels.process_keepalive (message_a, connection_l->remote_endpoint); - }); + connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type }); } void publish (nano::publish const & message_a) override { - connection->finish_request_async (); - auto connection_l (connection->shared_from_this ()); - connection->node->background ([connection_l, message_a]() { - connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type); - }); + connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type }); } void confirm_req (nano::confirm_req const & message_a) override { - connection->finish_request_async (); - auto connection_l (connection->shared_from_this ()); - connection->node->background ([connection_l, message_a]() { - connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type); - }); + connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type }); } void confirm_ack (nano::confirm_ack const & message_a) override { - connection->finish_request_async (); - auto connection_l (connection->shared_from_this ()); - connection->node->background ([connection_l, message_a]() { - connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type); - }); + connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type }); } void bulk_pull (nano::bulk_pull const &) override { @@ -634,19 +618,11 @@ class request_response_visitor : public nano::message_visitor } void telemetry_req (nano::telemetry_req const & message_a) override { - connection->finish_request_async (); - auto connection_l (connection->shared_from_this ()); - connection->node->background ([connection_l, message_a]() { - connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type); - }); + connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type }); } void telemetry_ack (nano::telemetry_ack const & message_a) override { - connection->finish_request_async (); - auto connection_l (connection->shared_from_this ()); - connection->node->background ([connection_l, message_a]() { - connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type); - }); + connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type }); } void node_id_handshake (nano::node_id_handshake const & message_a) override { @@ -704,20 +680,42 @@ class request_response_visitor : public nano::message_visitor nano::account node_id (connection->remote_node_id); nano::bootstrap_server_type type (connection->type); debug_assert (node_id.is_zero () || type == nano::bootstrap_server_type::realtime); - auto connection_l (connection->shared_from_this ()); - connection->node->background ([connection_l, message_a, node_id, type]() { - connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, node_id, connection_l->socket, type); - }); + connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type }); } std::shared_ptr connection; }; } -void nano::bootstrap_server::run_next () +void nano::bootstrap_server::run_next (nano::unique_lock & lock_a) { debug_assert (!requests.empty ()); request_response_visitor visitor (shared_from_this ()); - requests.front ()->visit (visitor); + auto type (requests.front ()->header.type); + if (type == nano::message_type::bulk_pull || type == nano::message_type::bulk_pull_account || type == nano::message_type::bulk_push || type == nano::message_type::frontier_req || type == nano::message_type::node_id_handshake) + { + // Bootstrap & node ID (realtime start) + // Request removed from queue in request_response_visitor. For bootstrap with requests.front ().release (), for node ID with finish_request () + requests.front ()->visit (visitor); + } + else + { + // Realtime + auto request (std::move (requests.front ())); + requests.pop (); + auto timeout_check (requests.empty ()); + lock_a.unlock (); + request->visit (visitor); + if (timeout_check) + { + std::weak_ptr this_w (shared_from_this ()); + node->alarm.add (std::chrono::steady_clock::now () + (node->config.tcp_io_timeout * 2) + std::chrono::seconds (1), [this_w]() { + if (auto this_l = this_w.lock ()) + { + this_l->timeout (); + } + }); + } + } } bool nano::bootstrap_server::is_bootstrap_connection () diff --git a/nano/node/bootstrap/bootstrap_server.hpp b/nano/node/bootstrap/bootstrap_server.hpp index 9e520955bb..699efb887a 100644 --- a/nano/node/bootstrap/bootstrap_server.hpp +++ b/nano/node/bootstrap/bootstrap_server.hpp @@ -62,7 +62,7 @@ class bootstrap_server final : public std::enable_shared_from_this & lock_a); bool is_bootstrap_connection (); bool is_realtime_connection (); std::shared_ptr> receive_buffer; diff --git a/nano/node/network.cpp b/nano/node/network.cpp index f2b9ff6cdd..3cfac9f909 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -13,6 +13,7 @@ nano::network::network (nano::node & node_a, uint16_t port_a) : syn_cookies (node_a.network_params.node.max_peers_per_ip), buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receive buffer +tcp_message_manager (node_a.stats, node_a.config.tcp_incoming_connections_max), resolver (node_a.io_ctx), limiter (node_a.config.bandwidth_limit_burst_ratio, node_a.config.bandwidth_limit), node (node_a), @@ -24,6 +25,7 @@ disconnect_observer ([]() {}) { boost::thread::attributes attrs; nano::thread_attributes::set (attrs); + // UDP for (size_t i = 0; i < node.config.network_threads && !node.flags.disable_udp; ++i) { packet_processing_threads.emplace_back (attrs, [this]() { @@ -34,27 +36,62 @@ disconnect_observer ([]() {}) } catch (boost::system::error_code & ec) { - this->node.logger.try_log (FATAL_LOG_PREFIX, ec.message ()); + this->node.logger.always_log (FATAL_LOG_PREFIX, ec.message ()); release_assert (false); } catch (std::error_code & ec) { - this->node.logger.try_log (FATAL_LOG_PREFIX, ec.message ()); + this->node.logger.always_log (FATAL_LOG_PREFIX, ec.message ()); release_assert (false); } catch (std::runtime_error & err) { - this->node.logger.try_log (FATAL_LOG_PREFIX, err.what ()); + this->node.logger.always_log (FATAL_LOG_PREFIX, err.what ()); release_assert (false); } catch (...) { - this->node.logger.try_log (FATAL_LOG_PREFIX, "Unknown exception"); + this->node.logger.always_log (FATAL_LOG_PREFIX, "Unknown exception"); release_assert (false); } if (this->node.config.logging.network_packet_logging ()) { - this->node.logger.try_log ("Exiting packet processing thread"); + this->node.logger.try_log ("Exiting UDP packet processing thread"); + } + }); + } + // TCP + for (size_t i = 0; i < node.config.network_threads && !node.flags.disable_tcp_realtime; ++i) + { + packet_processing_threads.emplace_back (attrs, [this]() { + nano::thread_role::set (nano::thread_role::name::packet_processing); + try + { + tcp_channels.process_messages (); + } + catch (boost::system::error_code & ec) + { + this->node.logger.always_log (FATAL_LOG_PREFIX, ec.message ()); + release_assert (false); + } + catch (std::error_code & ec) + { + this->node.logger.always_log (FATAL_LOG_PREFIX, ec.message ()); + release_assert (false); + } + catch (std::runtime_error & err) + { + this->node.logger.always_log (FATAL_LOG_PREFIX, err.what ()); + release_assert (false); + } + catch (...) + { + this->node.logger.always_log (FATAL_LOG_PREFIX, "Unknown exception"); + release_assert (false); + } + if (this->node.config.logging.network_packet_logging ()) + { + this->node.logger.try_log ("Exiting TCP packet processing thread"); } }); } @@ -89,6 +126,7 @@ void nano::network::stop () tcp_channels.stop (); resolver.cancel (); buffer_container.stop (); + tcp_message_manager.stop (); port = 0; for (auto & thread : packet_processing_threads) { @@ -369,6 +407,13 @@ class network_message_visitor : public nano::message_visitor } node.stats.inc (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in); node.network.merge_peers (message_a.peers); + // Check for special node port data + auto peer0 (message_a.peers[0]); + if (peer0.address () == boost::asio::ip::address_v6{} && peer0.port () != 0) + { + nano::endpoint new_endpoint (channel->get_tcp_endpoint ().address (), peer0.port ()); + node.network.merge_peer (new_endpoint); + } } void publish (nano::publish const & message_a) override { @@ -824,6 +869,54 @@ void nano::message_buffer_manager::stop () condition.notify_all (); } +nano::tcp_message_manager::tcp_message_manager (nano::stat & stats_a, unsigned incoming_connections_max_a) : +stats (stats_a), +max_entries (incoming_connections_max_a * nano::tcp_message_manager::max_entries_per_connection + 1) +{ + debug_assert (max_entries > 0); +} + +void nano::tcp_message_manager::put_message (nano::tcp_message_item const & item_a) +{ + { + nano::unique_lock lock (mutex); + while (entries.size () > max_entries && !stopped) + { + condition.wait (lock); + } + entries.push_back (item_a); + } + condition.notify_all (); +} + +nano::tcp_message_item nano::tcp_message_manager::get_message () +{ + nano::unique_lock lock (mutex); + while (entries.empty () && !stopped) + { + condition.wait (lock); + } + if (!entries.empty ()) + { + auto result (entries.front ()); + entries.pop_front (); + return result; + } + else + { + return nano::tcp_message_item{ std::make_shared (), nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0), 0, nullptr, nano::bootstrap_server_type::undefined }; + } +} + +void nano::tcp_message_manager::stop () +{ + { + nano::lock_guard lock (mutex); + stopped = true; + } + condition.notify_all (); +} + nano::syn_cookies::syn_cookies (size_t max_cookies_per_ip_a) : max_cookies_per_ip (max_cookies_per_ip_a) { diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 0fb3b5a777..14ce4f6e44 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -66,6 +66,24 @@ class message_buffer_manager final std::vector entries; bool stopped; }; +class tcp_message_manager final +{ +public: + tcp_message_manager (nano::stat & stats_a, unsigned incoming_connections_max_a); + void put_message (nano::tcp_message_item const & item_a); + nano::tcp_message_item get_message (); + // Stop container and notify waiting threads + void stop (); + +private: + nano::stat & stats; + std::mutex mutex; + nano::condition_variable condition; + std::deque entries; + unsigned max_entries; + static unsigned const max_entries_per_connection = 16; + bool stopped{ false }; +}; /** * Node ID cookies for node ID handshakes */ @@ -156,6 +174,7 @@ class network final std::vector packet_processing_threads; nano::bandwidth_limiter limiter; nano::peer_exclusion excluded_peers; + nano::tcp_message_manager tcp_message_manager; nano::node & node; nano::network_filter publish_filter; nano::transport::udp_channels udp_channels; diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index f38343d98f..a675a03856 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -259,6 +259,18 @@ nano::tcp_endpoint nano::transport::tcp_channels::bootstrap_peer (uint8_t connec return result; } +void nano::transport::tcp_channels::process_messages () +{ + while (!stopped) + { + auto item (node.network.tcp_message_manager.get_message ()); + if (item.message != nullptr) + { + process_message (*item.message, item.endpoint, item.node_id, item.socket, item.type); + } + } +} + void nano::transport::tcp_channels::process_message (nano::message const & message_a, nano::tcp_endpoint const & endpoint_a, nano::account const & node_id_a, std::shared_ptr socket_a, nano::bootstrap_server_type type_a) { if (!stopped) @@ -305,23 +317,6 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa } } -void nano::transport::tcp_channels::process_keepalive (nano::keepalive const & message_a, nano::tcp_endpoint const & endpoint_a) -{ - if (!max_ip_connections (endpoint_a)) - { - // Check for special node port data - auto peer0 (message_a.peers[0]); - if (peer0.address () == boost::asio::ip::address_v6{} && peer0.port () != 0) - { - nano::endpoint new_endpoint (endpoint_a.address (), peer0.port ()); - node.network.merge_peer (new_endpoint); - } - // Used to store sender endpoint information only - auto udp_channel (std::make_shared (node.network.udp_channels, nano::transport::map_tcp_to_endpoint (endpoint_a), node.network_params.protocol.protocol_version)); - node.network.process_message (message_a, udp_channel); - } -} - void nano::transport::tcp_channels::start () { ongoing_keepalive (); diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index 6dd4738a19..41c7e50014 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -18,6 +18,15 @@ namespace nano { class bootstrap_server; enum class bootstrap_server_type; +class tcp_message_item final +{ +public: + std::shared_ptr message; + nano::tcp_endpoint endpoint; + nano::account node_id; + std::shared_ptr socket; + nano::bootstrap_server_type type; +}; namespace transport { class tcp_channels; @@ -95,8 +104,8 @@ namespace transport void receive (); void start (); void stop (); + void process_messages (); void process_message (nano::message const &, nano::tcp_endpoint const &, nano::account const &, std::shared_ptr, nano::bootstrap_server_type); - void process_keepalive (nano::keepalive const &, nano::tcp_endpoint const &); bool max_ip_connections (nano::tcp_endpoint const &); // Should we reach out to this endpoint with a keepalive message bool reachout (nano::endpoint const &);