Skip to content

Commit

Permalink
Move TCP messages processing to network threads (#2613)
Browse files Browse the repository at this point in the history
* Move TCP realtime messages prosessing to network threads
* Add timeout for run_next ()
* Always log network threads processing exceptions
* Remove unnecessary statements (Wesley)
* Comment for not removing requests.front ()
  • Loading branch information
SergiySW committed Apr 14, 2020
1 parent a4d4675 commit a0233d9
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 64 deletions.
78 changes: 38 additions & 40 deletions nano/node/bootstrap/bootstrap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,22 +520,22 @@ void nano::bootstrap_server::receive_node_id_handshake_action (boost::system::er
void nano::bootstrap_server::add_request (std::unique_ptr<nano::message> message_a)
{
debug_assert (message_a != nullptr);
nano::lock_guard<std::mutex> lock (mutex);
nano::unique_lock<std::mutex> lock (mutex);
auto start (requests.empty ());
requests.push (std::move (message_a));
if (start)
{
run_next ();
run_next (lock);
}
}

void nano::bootstrap_server::finish_request ()
{
nano::lock_guard<std::mutex> lock (mutex);
nano::unique_lock<std::mutex> lock (mutex);
requests.pop ();
if (!requests.empty ())
{
run_next ();
run_next (lock);
}
else
{
Expand Down Expand Up @@ -595,35 +595,19 @@ class request_response_visitor : public nano::message_visitor
}
void keepalive (nano::keepalive const & message_a) override
{
connection->finish_request_async ();
auto connection_l (connection->shared_from_this ());
connection->node->background ([connection_l, message_a]() {
connection_l->node->network.tcp_channels.process_keepalive (message_a, connection_l->remote_endpoint);
});
connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared<nano::keepalive> (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type });
}
void publish (nano::publish const & message_a) override
{
connection->finish_request_async ();
auto connection_l (connection->shared_from_this ());
connection->node->background ([connection_l, message_a]() {
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type);
});
connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared<nano::publish> (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type });
}
void confirm_req (nano::confirm_req const & message_a) override
{
connection->finish_request_async ();
auto connection_l (connection->shared_from_this ());
connection->node->background ([connection_l, message_a]() {
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type);
});
connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared<nano::confirm_req> (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type });
}
void confirm_ack (nano::confirm_ack const & message_a) override
{
connection->finish_request_async ();
auto connection_l (connection->shared_from_this ());
connection->node->background ([connection_l, message_a]() {
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type);
});
connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared<nano::confirm_ack> (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type });
}
void bulk_pull (nano::bulk_pull const &) override
{
Expand All @@ -647,19 +631,11 @@ class request_response_visitor : public nano::message_visitor
}
void telemetry_req (nano::telemetry_req const & message_a) override
{
connection->finish_request_async ();
auto connection_l (connection->shared_from_this ());
connection->node->background ([connection_l, message_a]() {
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type);
});
connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared<nano::telemetry_req> (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type });
}
void telemetry_ack (nano::telemetry_ack const & message_a) override
{
connection->finish_request_async ();
auto connection_l (connection->shared_from_this ());
connection->node->background ([connection_l, message_a]() {
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type);
});
connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared<nano::telemetry_ack> (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type });
}
void node_id_handshake (nano::node_id_handshake const & message_a) override
{
Expand Down Expand Up @@ -717,20 +693,42 @@ class request_response_visitor : public nano::message_visitor
nano::account node_id (connection->remote_node_id);
nano::bootstrap_server_type type (connection->type);
debug_assert (node_id.is_zero () || type == nano::bootstrap_server_type::realtime);
auto connection_l (connection->shared_from_this ());
connection->node->background ([connection_l, message_a, node_id, type]() {
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, node_id, connection_l->socket, type);
});
connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared<nano::node_id_handshake> (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type });
}
std::shared_ptr<nano::bootstrap_server> connection;
};
}

void nano::bootstrap_server::run_next ()
void nano::bootstrap_server::run_next (nano::unique_lock<std::mutex> & lock_a)
{
debug_assert (!requests.empty ());
request_response_visitor visitor (shared_from_this ());
requests.front ()->visit (visitor);
auto type (requests.front ()->header.type);
if (type == nano::message_type::bulk_pull || type == nano::message_type::bulk_pull_account || type == nano::message_type::bulk_push || type == nano::message_type::frontier_req || type == nano::message_type::node_id_handshake)
{
// Bootstrap & node ID (realtime start)
// Request removed from queue in request_response_visitor. For bootstrap with requests.front ().release (), for node ID with finish_request ()
requests.front ()->visit (visitor);
}
else
{
// Realtime
auto request (std::move (requests.front ()));
requests.pop ();
auto timeout_check (requests.empty ());
lock_a.unlock ();
request->visit (visitor);
if (timeout_check)
{
std::weak_ptr<nano::bootstrap_server> this_w (shared_from_this ());
node->alarm.add (std::chrono::steady_clock::now () + (node->config.tcp_io_timeout * 2) + std::chrono::seconds (1), [this_w]() {
if (auto this_l = this_w.lock ())
{
this_l->timeout ();
}
});
}
}
}

bool nano::bootstrap_server::is_bootstrap_connection ()
Expand Down
2 changes: 1 addition & 1 deletion nano/node/bootstrap/bootstrap_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class bootstrap_server final : public std::enable_shared_from_this<nano::bootstr
void finish_request ();
void finish_request_async ();
void timeout ();
void run_next ();
void run_next (nano::unique_lock<std::mutex> & lock_a);
bool is_bootstrap_connection ();
bool is_realtime_connection ();
std::shared_ptr<std::vector<uint8_t>> receive_buffer;
Expand Down
103 changes: 98 additions & 5 deletions nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
nano::network::network (nano::node & node_a, uint16_t port_a) :
syn_cookies (node_a.network_params.node.max_peers_per_ip),
buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receive buffer
tcp_message_manager (node_a.stats, node_a.config.tcp_incoming_connections_max),
resolver (node_a.io_ctx),
limiter (node_a.config.bandwidth_limit_burst_ratio, node_a.config.bandwidth_limit),
node (node_a),
Expand All @@ -24,6 +25,7 @@ disconnect_observer ([]() {})
{
boost::thread::attributes attrs;
nano::thread_attributes::set (attrs);
// UDP
for (size_t i = 0; i < node.config.network_threads && !node.flags.disable_udp; ++i)
{
packet_processing_threads.emplace_back (attrs, [this]() {
Expand All @@ -34,27 +36,62 @@ disconnect_observer ([]() {})
}
catch (boost::system::error_code & ec)
{
this->node.logger.try_log (FATAL_LOG_PREFIX, ec.message ());
this->node.logger.always_log (FATAL_LOG_PREFIX, ec.message ());
release_assert (false);
}
catch (std::error_code & ec)
{
this->node.logger.try_log (FATAL_LOG_PREFIX, ec.message ());
this->node.logger.always_log (FATAL_LOG_PREFIX, ec.message ());
release_assert (false);
}
catch (std::runtime_error & err)
{
this->node.logger.try_log (FATAL_LOG_PREFIX, err.what ());
this->node.logger.always_log (FATAL_LOG_PREFIX, err.what ());
release_assert (false);
}
catch (...)
{
this->node.logger.try_log (FATAL_LOG_PREFIX, "Unknown exception");
this->node.logger.always_log (FATAL_LOG_PREFIX, "Unknown exception");
release_assert (false);
}
if (this->node.config.logging.network_packet_logging ())
{
this->node.logger.try_log ("Exiting packet processing thread");
this->node.logger.try_log ("Exiting UDP packet processing thread");
}
});
}
// TCP
for (size_t i = 0; i < node.config.network_threads && !node.flags.disable_tcp_realtime; ++i)
{
packet_processing_threads.emplace_back (attrs, [this]() {
nano::thread_role::set (nano::thread_role::name::packet_processing);
try
{
tcp_channels.process_messages ();
}
catch (boost::system::error_code & ec)
{
this->node.logger.always_log (FATAL_LOG_PREFIX, ec.message ());
release_assert (false);
}
catch (std::error_code & ec)
{
this->node.logger.always_log (FATAL_LOG_PREFIX, ec.message ());
release_assert (false);
}
catch (std::runtime_error & err)
{
this->node.logger.always_log (FATAL_LOG_PREFIX, err.what ());
release_assert (false);
}
catch (...)
{
this->node.logger.always_log (FATAL_LOG_PREFIX, "Unknown exception");
release_assert (false);
}
if (this->node.config.logging.network_packet_logging ())
{
this->node.logger.try_log ("Exiting TCP packet processing thread");
}
});
}
Expand Down Expand Up @@ -89,6 +126,7 @@ void nano::network::stop ()
tcp_channels.stop ();
resolver.cancel ();
buffer_container.stop ();
tcp_message_manager.stop ();
port = 0;
for (auto & thread : packet_processing_threads)
{
Expand Down Expand Up @@ -369,6 +407,13 @@ class network_message_visitor : public nano::message_visitor
}
node.stats.inc (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in);
node.network.merge_peers (message_a.peers);
// Check for special node port data
auto peer0 (message_a.peers[0]);
if (peer0.address () == boost::asio::ip::address_v6{} && peer0.port () != 0)
{
nano::endpoint new_endpoint (channel->get_tcp_endpoint ().address (), peer0.port ());
node.network.merge_peer (new_endpoint);
}
}
void publish (nano::publish const & message_a) override
{
Expand Down Expand Up @@ -836,6 +881,54 @@ void nano::message_buffer_manager::stop ()
condition.notify_all ();
}

nano::tcp_message_manager::tcp_message_manager (nano::stat & stats_a, unsigned incoming_connections_max_a) :
stats (stats_a),
max_entries (incoming_connections_max_a * nano::tcp_message_manager::max_entries_per_connection + 1)
{
debug_assert (max_entries > 0);
}

void nano::tcp_message_manager::put_message (nano::tcp_message_item const & item_a)
{
{
nano::unique_lock<std::mutex> lock (mutex);
while (entries.size () > max_entries && !stopped)
{
condition.wait (lock);
}
entries.push_back (item_a);
}
condition.notify_all ();
}

nano::tcp_message_item nano::tcp_message_manager::get_message ()
{
nano::unique_lock<std::mutex> lock (mutex);
while (entries.empty () && !stopped)
{
condition.wait (lock);
}
if (!entries.empty ())
{
auto result (entries.front ());
entries.pop_front ();
return result;
}
else
{
return nano::tcp_message_item{ std::make_shared<nano::keepalive> (), nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0), 0, nullptr, nano::bootstrap_server_type::undefined };
}
}

void nano::tcp_message_manager::stop ()
{
{
nano::lock_guard<std::mutex> lock (mutex);
stopped = true;
}
condition.notify_all ();
}

nano::syn_cookies::syn_cookies (size_t max_cookies_per_ip_a) :
max_cookies_per_ip (max_cookies_per_ip_a)
{
Expand Down
19 changes: 19 additions & 0 deletions nano/node/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,24 @@ class message_buffer_manager final
std::vector<nano::message_buffer> entries;
bool stopped;
};
class tcp_message_manager final
{
public:
tcp_message_manager (nano::stat & stats_a, unsigned incoming_connections_max_a);
void put_message (nano::tcp_message_item const & item_a);
nano::tcp_message_item get_message ();
// Stop container and notify waiting threads
void stop ();

private:
nano::stat & stats;
std::mutex mutex;
nano::condition_variable condition;
std::deque<nano::tcp_message_item> entries;
unsigned max_entries;
static unsigned const max_entries_per_connection = 16;
bool stopped{ false };
};
/**
* Node ID cookies for node ID handshakes
*/
Expand Down Expand Up @@ -157,6 +175,7 @@ class network final
std::vector<boost::thread> packet_processing_threads;
nano::bandwidth_limiter limiter;
nano::peer_exclusion excluded_peers;
nano::tcp_message_manager tcp_message_manager;
nano::node & node;
nano::network_filter publish_filter;
nano::transport::udp_channels udp_channels;
Expand Down
29 changes: 12 additions & 17 deletions nano/node/transport/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,18 @@ nano::tcp_endpoint nano::transport::tcp_channels::bootstrap_peer (uint8_t connec
return result;
}

void nano::transport::tcp_channels::process_messages ()
{
while (!stopped)
{
auto item (node.network.tcp_message_manager.get_message ());
if (item.message != nullptr)
{
process_message (*item.message, item.endpoint, item.node_id, item.socket, item.type);
}
}
}

void nano::transport::tcp_channels::process_message (nano::message const & message_a, nano::tcp_endpoint const & endpoint_a, nano::account const & node_id_a, std::shared_ptr<nano::socket> socket_a, nano::bootstrap_server_type type_a)
{
if (!stopped && message_a.header.version_using >= protocol_constants ().protocol_version_min (node.ledger.cache.epoch_2_started))
Expand Down Expand Up @@ -308,23 +320,6 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa
}
}

void nano::transport::tcp_channels::process_keepalive (nano::keepalive const & message_a, nano::tcp_endpoint const & endpoint_a)
{
if (!max_ip_connections (endpoint_a))
{
// Check for special node port data
auto peer0 (message_a.peers[0]);
if (peer0.address () == boost::asio::ip::address_v6{} && peer0.port () != 0)
{
nano::endpoint new_endpoint (endpoint_a.address (), peer0.port ());
node.network.merge_peer (new_endpoint);
}
// Used to store sender endpoint information only
auto udp_channel (std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, nano::transport::map_tcp_to_endpoint (endpoint_a), node.network_params.protocol.protocol_version));
node.network.process_message (message_a, udp_channel);
}
}

void nano::transport::tcp_channels::start ()
{
ongoing_keepalive ();
Expand Down
Loading

0 comments on commit a0233d9

Please sign in to comment.