From 2a040993713e059c03469b90cfe4aab5dae817b9 Mon Sep 17 00:00:00 2001 From: Sergey Kroshnin Date: Fri, 28 Feb 2020 19:26:43 +0300 Subject: [PATCH 1/8] Move TCP realtime messages prosessing to network threads --- nano/node/bootstrap/bootstrap_server.cpp | 35 ++------- nano/node/network.cpp | 95 +++++++++++++++++++++++- nano/node/network.hpp | 20 +++++ nano/node/transport/tcp.cpp | 29 +++----- nano/node/transport/tcp.hpp | 11 ++- 5 files changed, 143 insertions(+), 47 deletions(-) diff --git a/nano/node/bootstrap/bootstrap_server.cpp b/nano/node/bootstrap/bootstrap_server.cpp index e8c9cc1168..6809b8d7ce 100644 --- a/nano/node/bootstrap/bootstrap_server.cpp +++ b/nano/node/bootstrap/bootstrap_server.cpp @@ -574,34 +574,22 @@ class request_response_visitor : public nano::message_visitor void keepalive (nano::keepalive const & message_a) override { connection->finish_request_async (); - auto connection_l (connection->shared_from_this ()); - connection->node->background ([connection_l, message_a]() { - connection_l->node->network.tcp_channels.process_keepalive (message_a, connection_l->remote_endpoint); - }); + connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type}); } void publish (nano::publish const & message_a) override { connection->finish_request_async (); - auto connection_l (connection->shared_from_this ()); - connection->node->background ([connection_l, message_a]() { - connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type); - }); + connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type}); } void confirm_req (nano::confirm_req const & message_a) override { connection->finish_request_async (); - auto connection_l (connection->shared_from_this ()); - connection->node->background ([connection_l, message_a]() { - connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type); - }); + connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type}); } void confirm_ack (nano::confirm_ack const & message_a) override { connection->finish_request_async (); - auto connection_l (connection->shared_from_this ()); - connection->node->background ([connection_l, message_a]() { - connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type); - }); + connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type}); } void bulk_pull (nano::bulk_pull const &) override { @@ -626,18 +614,12 @@ class request_response_visitor : public nano::message_visitor void telemetry_req (nano::telemetry_req const & message_a) override { connection->finish_request_async (); - auto connection_l (connection->shared_from_this ()); - connection->node->background ([connection_l, message_a]() { - connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type); - }); + connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type}); } void telemetry_ack (nano::telemetry_ack const & message_a) override { connection->finish_request_async (); - auto connection_l (connection->shared_from_this ()); - connection->node->background ([connection_l, message_a]() { - connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type); - }); + connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type}); } void node_id_handshake (nano::node_id_handshake const & message_a) override { @@ -695,10 +677,7 @@ class request_response_visitor : public nano::message_visitor nano::account node_id (connection->remote_node_id); nano::bootstrap_server_type type (connection->type); debug_assert (node_id.is_zero () || type == nano::bootstrap_server_type::realtime); - auto connection_l (connection->shared_from_this ()); - connection->node->background ([connection_l, message_a, node_id, type]() { - connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, node_id, connection_l->socket, type); - }); + connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type}); } std::shared_ptr connection; }; diff --git a/nano/node/network.cpp b/nano/node/network.cpp index aba02256d9..2c30389ec6 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -12,6 +12,7 @@ nano::network::network (nano::node & node_a, uint16_t port_a) : buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receive buffer +tcp_message_manager (node_a.stats, node_a.config.tcp_incoming_connections_max), resolver (node_a.io_ctx), limiter (node_a.config.bandwidth_limit), node (node_a), @@ -22,6 +23,7 @@ disconnect_observer ([]() {}) { boost::thread::attributes attrs; nano::thread_attributes::set (attrs); + // UDP for (size_t i = 0; i < node.config.network_threads && !node.flags.disable_udp; ++i) { packet_processing_threads.emplace_back (attrs, [this]() { @@ -52,7 +54,42 @@ disconnect_observer ([]() {}) } if (this->node.config.logging.network_packet_logging ()) { - this->node.logger.try_log ("Exiting packet processing thread"); + this->node.logger.try_log ("Exiting UDP packet processing thread"); + } + }); + } + // TCP + for (size_t i = 0; i < node.config.network_threads && !node.flags.disable_tcp_realtime; ++i) + { + packet_processing_threads.emplace_back (attrs, [this]() { + nano::thread_role::set (nano::thread_role::name::packet_processing); + try + { + tcp_channels.process_messages (); + } + catch (boost::system::error_code & ec) + { + this->node.logger.try_log (FATAL_LOG_PREFIX, ec.message ()); + release_assert (false); + } + catch (std::error_code & ec) + { + this->node.logger.try_log (FATAL_LOG_PREFIX, ec.message ()); + release_assert (false); + } + catch (std::runtime_error & err) + { + this->node.logger.try_log (FATAL_LOG_PREFIX, err.what ()); + release_assert (false); + } + catch (...) + { + this->node.logger.try_log (FATAL_LOG_PREFIX, "Unknown exception"); + release_assert (false); + } + if (this->node.config.logging.network_packet_logging ()) + { + this->node.logger.try_log ("Exiting TCP packet processing thread"); } }); } @@ -87,6 +124,7 @@ void nano::network::stop () tcp_channels.stop (); resolver.cancel (); buffer_container.stop (); + tcp_message_manager.stop (); port = 0; for (auto & thread : packet_processing_threads) { @@ -376,6 +414,13 @@ class network_message_visitor : public nano::message_visitor } node.stats.inc (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in); node.network.merge_peers (message_a.peers); + // Check for special node port data + auto peer0 (message_a.peers[0]); + if (peer0.address () == boost::asio::ip::address_v6{} && peer0.port () != 0) + { + nano::endpoint new_endpoint (channel->get_tcp_endpoint ().address (), peer0.port ()); + node.network.merge_peer (new_endpoint); + } } void publish (nano::publish const & message_a) override { @@ -815,6 +860,54 @@ void nano::message_buffer_manager::stop () condition.notify_all (); } +nano::tcp_message_manager::tcp_message_manager (nano::stat & stats_a, unsigned incoming_connections_max_a) : +stats (stats_a), +max_entries (incoming_connections_max_a * max_entries_per_connection + 1) +{ + debug_assert (max_entries > 0); +} + +void nano::tcp_message_manager::put_message (nano::tcp_message_item const & item_a) +{ + { + nano::unique_lock lock (mutex); + if (entries.size () > max_entries && !stopped) + { + condition.wait (lock, [& stopped = stopped, &entries = entries, &max_entries = max_entries] { return stopped || entries.size () < max_entries; }); + } + entries.push_back (item_a); + } + condition.notify_all (); +} + +nano::tcp_message_item nano::tcp_message_manager::get_message () +{ + nano::unique_lock lock (mutex); + if (entries.empty () && !stopped) + { + condition.wait (lock, [& stopped = stopped, &entries = entries] { return stopped || !entries.empty (); }); + } + if (!entries.empty ()) + { + auto result (entries.front ()); + entries.pop_front (); + return result; + } + else + { + return nano::tcp_message_item{ std::make_shared (nano::keepalive ()), nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0), 0, nullptr, nano::bootstrap_server_type::undefined }; + } +} + +void nano::tcp_message_manager::stop () +{ + { + nano::lock_guard lock (mutex); + stopped = true; + } + condition.notify_all (); +} + boost::optional nano::syn_cookies::assign (nano::endpoint const & endpoint_a) { auto ip_addr (endpoint_a.address ()); diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 99c68bbc45..0d0f70df9b 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -65,6 +65,25 @@ class message_buffer_manager final std::vector entries; bool stopped; }; +class tcp_message_manager final +{ +public: + // Stats - Statistics + tcp_message_manager (nano::stat & stats_a, unsigned incoming_connections_max_a); + void put_message (nano::tcp_message_item const & item_a); + nano::tcp_message_item get_message (); + // Stop container and notify waiting threads + void stop (); + +private: + nano::stat & stats; + std::mutex mutex; + nano::condition_variable condition; + std::deque entries; + unsigned max_entries; + unsigned const max_entries_per_connection = 16; + bool stopped{ false }; +}; /** * Node ID cookies for node ID handshakes */ @@ -151,6 +170,7 @@ class network final boost::asio::ip::udp::resolver resolver; std::vector packet_processing_threads; nano::bandwidth_limiter limiter; + nano::tcp_message_manager tcp_message_manager; nano::node & node; nano::transport::udp_channels udp_channels; nano::transport::tcp_channels tcp_channels; diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index 450a266d6a..656d2105ec 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -258,6 +258,18 @@ nano::tcp_endpoint nano::transport::tcp_channels::bootstrap_peer (uint8_t connec return result; } +void nano::transport::tcp_channels::process_messages () +{ + while (!stopped) + { + auto item (node.network.tcp_message_manager.get_message ()); + if (item.message != nullptr) + { + process_message (*item.message, item.endpoint, item.node_id, item.socket, item.type); + } + } +} + void nano::transport::tcp_channels::process_message (nano::message const & message_a, nano::tcp_endpoint const & endpoint_a, nano::account const & node_id_a, std::shared_ptr socket_a, nano::bootstrap_server_type type_a) { if (!stopped) @@ -304,23 +316,6 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa } } -void nano::transport::tcp_channels::process_keepalive (nano::keepalive const & message_a, nano::tcp_endpoint const & endpoint_a) -{ - if (!max_ip_connections (endpoint_a)) - { - // Check for special node port data - auto peer0 (message_a.peers[0]); - if (peer0.address () == boost::asio::ip::address_v6{} && peer0.port () != 0) - { - nano::endpoint new_endpoint (endpoint_a.address (), peer0.port ()); - node.network.merge_peer (new_endpoint); - } - // Used to store sender endpoint information only - auto udp_channel (std::make_shared (node.network.udp_channels, nano::transport::map_tcp_to_endpoint (endpoint_a), node.network_params.protocol.protocol_version)); - node.network.process_message (message_a, udp_channel); - } -} - void nano::transport::tcp_channels::start () { ongoing_keepalive (); diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index b71dda5a0d..b2ab31afb8 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -18,6 +18,15 @@ namespace nano { class bootstrap_server; enum class bootstrap_server_type; +class tcp_message_item final +{ +public: + std::shared_ptr message; + nano::tcp_endpoint endpoint; + nano::account node_id; + std::shared_ptr socket; + nano::bootstrap_server_type type; +}; namespace transport { class tcp_channels; @@ -95,8 +104,8 @@ namespace transport void receive (); void start (); void stop (); + void process_messages (); void process_message (nano::message const &, nano::tcp_endpoint const &, nano::account const &, std::shared_ptr, nano::bootstrap_server_type); - void process_keepalive (nano::keepalive const &, nano::tcp_endpoint const &); bool max_ip_connections (nano::tcp_endpoint const &); // Should we reach out to this endpoint with a keepalive message bool reachout (nano::endpoint const &); From 07eb19b3aa806aea775ee9aae53fb75fc0f2333a Mon Sep 17 00:00:00 2001 From: Sergey Kroshnin Date: Fri, 28 Feb 2020 20:52:17 +0300 Subject: [PATCH 2/8] Fix some compilers issues --- nano/node/bootstrap/bootstrap_server.cpp | 14 +++++++------- nano/node/network.cpp | 2 +- nano/node/network.hpp | 2 +- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/nano/node/bootstrap/bootstrap_server.cpp b/nano/node/bootstrap/bootstrap_server.cpp index 6809b8d7ce..90334d5030 100644 --- a/nano/node/bootstrap/bootstrap_server.cpp +++ b/nano/node/bootstrap/bootstrap_server.cpp @@ -574,22 +574,22 @@ class request_response_visitor : public nano::message_visitor void keepalive (nano::keepalive const & message_a) override { connection->finish_request_async (); - connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type}); + connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type }); } void publish (nano::publish const & message_a) override { connection->finish_request_async (); - connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type}); + connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type }); } void confirm_req (nano::confirm_req const & message_a) override { connection->finish_request_async (); - connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type}); + connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type }); } void confirm_ack (nano::confirm_ack const & message_a) override { connection->finish_request_async (); - connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type}); + connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type }); } void bulk_pull (nano::bulk_pull const &) override { @@ -614,12 +614,12 @@ class request_response_visitor : public nano::message_visitor void telemetry_req (nano::telemetry_req const & message_a) override { connection->finish_request_async (); - connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type}); + connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type }); } void telemetry_ack (nano::telemetry_ack const & message_a) override { connection->finish_request_async (); - connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type}); + connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type }); } void node_id_handshake (nano::node_id_handshake const & message_a) override { @@ -677,7 +677,7 @@ class request_response_visitor : public nano::message_visitor nano::account node_id (connection->remote_node_id); nano::bootstrap_server_type type (connection->type); debug_assert (node_id.is_zero () || type == nano::bootstrap_server_type::realtime); - connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type}); + connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type }); } std::shared_ptr connection; }; diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 2c30389ec6..2d56ed0ff6 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -862,7 +862,7 @@ void nano::message_buffer_manager::stop () nano::tcp_message_manager::tcp_message_manager (nano::stat & stats_a, unsigned incoming_connections_max_a) : stats (stats_a), -max_entries (incoming_connections_max_a * max_entries_per_connection + 1) +max_entries (incoming_connections_max_a * nano::tcp_message_manager::max_entries_per_connection + 1) { debug_assert (max_entries > 0); } diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 0d0f70df9b..bae9bf7d1f 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -81,7 +81,7 @@ class tcp_message_manager final nano::condition_variable condition; std::deque entries; unsigned max_entries; - unsigned const max_entries_per_connection = 16; + static unsigned const max_entries_per_connection = 16; bool stopped{ false }; }; /** From 016d6318f9d01bca255d9af9aee4ecb56a5ceabf Mon Sep 17 00:00:00 2001 From: Sergey Kroshnin Date: Sat, 7 Mar 2020 02:46:46 +0300 Subject: [PATCH 3/8] Improve run_next () function mutex lock for different request types --- nano/node/bootstrap/bootstrap_server.cpp | 31 +++++++++++++++--------- nano/node/bootstrap/bootstrap_server.hpp | 2 +- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/nano/node/bootstrap/bootstrap_server.cpp b/nano/node/bootstrap/bootstrap_server.cpp index 90334d5030..d78900e49f 100644 --- a/nano/node/bootstrap/bootstrap_server.cpp +++ b/nano/node/bootstrap/bootstrap_server.cpp @@ -498,22 +498,22 @@ void nano::bootstrap_server::receive_node_id_handshake_action (boost::system::er void nano::bootstrap_server::add_request (std::unique_ptr message_a) { debug_assert (message_a != nullptr); - nano::lock_guard lock (mutex); + nano::unique_lock lock (mutex); auto start (requests.empty ()); requests.push (std::move (message_a)); if (start) { - run_next (); + run_next (lock); } } void nano::bootstrap_server::finish_request () { - nano::lock_guard lock (mutex); + nano::unique_lock lock (mutex); requests.pop (); if (!requests.empty ()) { - run_next (); + run_next (lock); } else { @@ -573,22 +573,18 @@ class request_response_visitor : public nano::message_visitor } void keepalive (nano::keepalive const & message_a) override { - connection->finish_request_async (); connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type }); } void publish (nano::publish const & message_a) override { - connection->finish_request_async (); connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type }); } void confirm_req (nano::confirm_req const & message_a) override { - connection->finish_request_async (); connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type }); } void confirm_ack (nano::confirm_ack const & message_a) override { - connection->finish_request_async (); connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type }); } void bulk_pull (nano::bulk_pull const &) override @@ -613,12 +609,10 @@ class request_response_visitor : public nano::message_visitor } void telemetry_req (nano::telemetry_req const & message_a) override { - connection->finish_request_async (); connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type }); } void telemetry_ack (nano::telemetry_ack const & message_a) override { - connection->finish_request_async (); connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type }); } void node_id_handshake (nano::node_id_handshake const & message_a) override @@ -683,11 +677,24 @@ class request_response_visitor : public nano::message_visitor }; } -void nano::bootstrap_server::run_next () +void nano::bootstrap_server::run_next (nano::unique_lock & lock_a) { debug_assert (!requests.empty ()); request_response_visitor visitor (shared_from_this ()); - requests.front ()->visit (visitor); + auto type (requests.front ()->header.type); + if (type == nano::message_type::bulk_pull || type == nano::message_type::bulk_pull_account || type == nano::message_type::bulk_push || type == nano::message_type::frontier_req || type == nano::message_type::node_id_handshake) + { + // Bootstrap & node ID (realtime start) + requests.front ()->visit (visitor); + } + else + { + // Realtime + auto request (std::move (requests.front ())); + requests.pop (); + lock_a.unlock (); + request->visit (visitor); + } } bool nano::bootstrap_server::is_bootstrap_connection () diff --git a/nano/node/bootstrap/bootstrap_server.hpp b/nano/node/bootstrap/bootstrap_server.hpp index 9e520955bb..699efb887a 100644 --- a/nano/node/bootstrap/bootstrap_server.hpp +++ b/nano/node/bootstrap/bootstrap_server.hpp @@ -62,7 +62,7 @@ class bootstrap_server final : public std::enable_shared_from_this & lock_a); bool is_bootstrap_connection (); bool is_realtime_connection (); std::shared_ptr> receive_buffer; From e28dad0266107b04e7b7701ea53b9760f496f1ca Mon Sep 17 00:00:00 2001 From: Sergey Kroshnin Date: Sat, 7 Mar 2020 04:33:34 +0300 Subject: [PATCH 4/8] Add timeout for run_next () --- nano/node/bootstrap/bootstrap_server.cpp | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/nano/node/bootstrap/bootstrap_server.cpp b/nano/node/bootstrap/bootstrap_server.cpp index d78900e49f..7cae35b34f 100644 --- a/nano/node/bootstrap/bootstrap_server.cpp +++ b/nano/node/bootstrap/bootstrap_server.cpp @@ -692,8 +692,19 @@ void nano::bootstrap_server::run_next (nano::unique_lock & lock_a) // Realtime auto request (std::move (requests.front ())); requests.pop (); + auto timeout_check (requests.empty ()); lock_a.unlock (); request->visit (visitor); + if (timeout_check) + { + std::weak_ptr this_w (shared_from_this ()); + node->alarm.add (std::chrono::steady_clock::now () + (node->config.tcp_io_timeout * 2) + std::chrono::seconds (1), [this_w]() { + if (auto this_l = this_w.lock ()) + { + this_l->timeout (); + } + }); + } } } From dfddb3df43896cd178554f2fe87995429f1f20dc Mon Sep 17 00:00:00 2001 From: Sergey Kroshnin Date: Wed, 25 Mar 2020 18:00:12 +0300 Subject: [PATCH 5/8] Always log network threads processing exceptions --- nano/node/network.cpp | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/nano/node/network.cpp b/nano/node/network.cpp index af69f1e275..0ca64bf0fa 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -36,22 +36,22 @@ disconnect_observer ([]() {}) } catch (boost::system::error_code & ec) { - this->node.logger.try_log (FATAL_LOG_PREFIX, ec.message ()); + this->node.logger.always_log (FATAL_LOG_PREFIX, ec.message ()); release_assert (false); } catch (std::error_code & ec) { - this->node.logger.try_log (FATAL_LOG_PREFIX, ec.message ()); + this->node.logger.always_log (FATAL_LOG_PREFIX, ec.message ()); release_assert (false); } catch (std::runtime_error & err) { - this->node.logger.try_log (FATAL_LOG_PREFIX, err.what ()); + this->node.logger.always_log (FATAL_LOG_PREFIX, err.what ()); release_assert (false); } catch (...) { - this->node.logger.try_log (FATAL_LOG_PREFIX, "Unknown exception"); + this->node.logger.always_log (FATAL_LOG_PREFIX, "Unknown exception"); release_assert (false); } if (this->node.config.logging.network_packet_logging ()) @@ -71,22 +71,22 @@ disconnect_observer ([]() {}) } catch (boost::system::error_code & ec) { - this->node.logger.try_log (FATAL_LOG_PREFIX, ec.message ()); + this->node.logger.always_log (FATAL_LOG_PREFIX, ec.message ()); release_assert (false); } catch (std::error_code & ec) { - this->node.logger.try_log (FATAL_LOG_PREFIX, ec.message ()); + this->node.logger.always_log (FATAL_LOG_PREFIX, ec.message ()); release_assert (false); } catch (std::runtime_error & err) { - this->node.logger.try_log (FATAL_LOG_PREFIX, err.what ()); + this->node.logger.always_log (FATAL_LOG_PREFIX, err.what ()); release_assert (false); } catch (...) { - this->node.logger.try_log (FATAL_LOG_PREFIX, "Unknown exception"); + this->node.logger.always_log (FATAL_LOG_PREFIX, "Unknown exception"); release_assert (false); } if (this->node.config.logging.network_packet_logging ()) From 9db2158e85bf7dd58bacca7cf60cbe3b820c3d27 Mon Sep 17 00:00:00 2001 From: Sergey Kroshnin Date: Wed, 25 Mar 2020 18:39:24 +0300 Subject: [PATCH 6/8] Remove unnecessary statements (Wesley) --- nano/node/network.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 0ca64bf0fa..5cad819546 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -890,9 +890,9 @@ void nano::tcp_message_manager::put_message (nano::tcp_message_item const & item { { nano::unique_lock lock (mutex); - if (entries.size () > max_entries && !stopped) + while (entries.size () > max_entries && !stopped) { - condition.wait (lock, [& stopped = stopped, &entries = entries, &max_entries = max_entries] { return stopped || entries.size () < max_entries; }); + condition.wait (lock); } entries.push_back (item_a); } @@ -902,9 +902,9 @@ void nano::tcp_message_manager::put_message (nano::tcp_message_item const & item nano::tcp_message_item nano::tcp_message_manager::get_message () { nano::unique_lock lock (mutex); - if (entries.empty () && !stopped) + while (entries.empty () && !stopped) { - condition.wait (lock, [& stopped = stopped, &entries = entries] { return stopped || !entries.empty (); }); + condition.wait (lock); } if (!entries.empty ()) { From faf1cb67e809f29e313b3a09acc3d05a8944fc07 Mon Sep 17 00:00:00 2001 From: Sergey Kroshnin Date: Wed, 8 Apr 2020 05:25:53 +0300 Subject: [PATCH 7/8] Comment for not removing requests.front () --- nano/node/bootstrap/bootstrap_server.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/nano/node/bootstrap/bootstrap_server.cpp b/nano/node/bootstrap/bootstrap_server.cpp index c06e68e28d..83d6b10b16 100644 --- a/nano/node/bootstrap/bootstrap_server.cpp +++ b/nano/node/bootstrap/bootstrap_server.cpp @@ -694,6 +694,7 @@ void nano::bootstrap_server::run_next (nano::unique_lock & lock_a) if (type == nano::message_type::bulk_pull || type == nano::message_type::bulk_pull_account || type == nano::message_type::bulk_push || type == nano::message_type::frontier_req || type == nano::message_type::node_id_handshake) { // Bootstrap & node ID (realtime start) + // Request removed from queue in request_response_visitor. For bootstrap with requests.front ().release (), for node ID with finish_request () requests.front ()->visit (visitor); } else From 2d6858f519bbc510f1cf7c256fd139e854e05358 Mon Sep 17 00:00:00 2001 From: Sergey Kroshnin Date: Tue, 14 Apr 2020 14:36:06 +0300 Subject: [PATCH 8/8] Remove leftover code & comments --- nano/node/network.cpp | 2 +- nano/node/network.hpp | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/nano/node/network.cpp b/nano/node/network.cpp index d38192aa10..3cfac9f909 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -904,7 +904,7 @@ nano::tcp_message_item nano::tcp_message_manager::get_message () } else { - return nano::tcp_message_item{ std::make_shared (nano::keepalive ()), nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0), 0, nullptr, nano::bootstrap_server_type::undefined }; + return nano::tcp_message_item{ std::make_shared (), nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0), 0, nullptr, nano::bootstrap_server_type::undefined }; } } diff --git a/nano/node/network.hpp b/nano/node/network.hpp index 1aa242cf8b..14ce4f6e44 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -69,7 +69,6 @@ class message_buffer_manager final class tcp_message_manager final { public: - // Stats - Statistics tcp_message_manager (nano::stat & stats_a, unsigned incoming_connections_max_a); void put_message (nano::tcp_message_item const & item_a); nano::tcp_message_item get_message ();