Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move TCP messages processing to network threads #2613

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 37 additions & 40 deletions nano/node/bootstrap/bootstrap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -498,22 +498,22 @@ void nano::bootstrap_server::receive_node_id_handshake_action (boost::system::er
void nano::bootstrap_server::add_request (std::unique_ptr<nano::message> message_a)
{
debug_assert (message_a != nullptr);
nano::lock_guard<std::mutex> lock (mutex);
nano::unique_lock<std::mutex> lock (mutex);
auto start (requests.empty ());
requests.push (std::move (message_a));
if (start)
{
run_next ();
run_next (lock);
}
}

void nano::bootstrap_server::finish_request ()
{
nano::lock_guard<std::mutex> lock (mutex);
nano::unique_lock<std::mutex> lock (mutex);
requests.pop ();
if (!requests.empty ())
{
run_next ();
run_next (lock);
}
else
{
Expand Down Expand Up @@ -573,35 +573,19 @@ class request_response_visitor : public nano::message_visitor
}
void keepalive (nano::keepalive const & message_a) override
{
connection->finish_request_async ();
auto connection_l (connection->shared_from_this ());
connection->node->background ([connection_l, message_a]() {
connection_l->node->network.tcp_channels.process_keepalive (message_a, connection_l->remote_endpoint);
});
connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared<nano::keepalive> (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type });
}
void publish (nano::publish const & message_a) override
{
connection->finish_request_async ();
auto connection_l (connection->shared_from_this ());
connection->node->background ([connection_l, message_a]() {
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type);
});
connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared<nano::publish> (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type });
}
void confirm_req (nano::confirm_req const & message_a) override
{
connection->finish_request_async ();
auto connection_l (connection->shared_from_this ());
connection->node->background ([connection_l, message_a]() {
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type);
});
connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared<nano::confirm_req> (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type });
}
void confirm_ack (nano::confirm_ack const & message_a) override
{
connection->finish_request_async ();
auto connection_l (connection->shared_from_this ());
connection->node->background ([connection_l, message_a]() {
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type);
});
connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared<nano::confirm_ack> (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type });
}
void bulk_pull (nano::bulk_pull const &) override
{
Expand All @@ -625,19 +609,11 @@ class request_response_visitor : public nano::message_visitor
}
void telemetry_req (nano::telemetry_req const & message_a) override
{
connection->finish_request_async ();
auto connection_l (connection->shared_from_this ());
connection->node->background ([connection_l, message_a]() {
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type);
});
connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared<nano::telemetry_req> (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type });
}
void telemetry_ack (nano::telemetry_ack const & message_a) override
{
connection->finish_request_async ();
auto connection_l (connection->shared_from_this ());
connection->node->background ([connection_l, message_a]() {
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type);
});
connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared<nano::telemetry_ack> (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type });
}
void node_id_handshake (nano::node_id_handshake const & message_a) override
{
Expand Down Expand Up @@ -695,20 +671,41 @@ class request_response_visitor : public nano::message_visitor
nano::account node_id (connection->remote_node_id);
nano::bootstrap_server_type type (connection->type);
debug_assert (node_id.is_zero () || type == nano::bootstrap_server_type::realtime);
auto connection_l (connection->shared_from_this ());
connection->node->background ([connection_l, message_a, node_id, type]() {
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, node_id, connection_l->socket, type);
});
connection->node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::make_shared<nano::node_id_handshake> (message_a), connection->remote_endpoint, connection->remote_node_id, connection->socket, connection->type });
}
std::shared_ptr<nano::bootstrap_server> connection;
};
}

void nano::bootstrap_server::run_next ()
void nano::bootstrap_server::run_next (nano::unique_lock<std::mutex> & lock_a)
{
debug_assert (!requests.empty ());
request_response_visitor visitor (shared_from_this ());
requests.front ()->visit (visitor);
auto type (requests.front ()->header.type);
if (type == nano::message_type::bulk_pull || type == nano::message_type::bulk_pull_account || type == nano::message_type::bulk_push || type == nano::message_type::frontier_req || type == nano::message_type::node_id_handshake)
{
// Bootstrap & node ID (realtime start)
requests.front ()->visit (visitor);
guilhermelawless marked this conversation as resolved.
Show resolved Hide resolved
}
else
{
// Realtime
auto request (std::move (requests.front ()));
requests.pop ();
auto timeout_check (requests.empty ());
lock_a.unlock ();
request->visit (visitor);
if (timeout_check)
{
std::weak_ptr<nano::bootstrap_server> this_w (shared_from_this ());
node->alarm.add (std::chrono::steady_clock::now () + (node->config.tcp_io_timeout * 2) + std::chrono::seconds (1), [this_w]() {
if (auto this_l = this_w.lock ())
{
this_l->timeout ();
}
});
}
}
}

bool nano::bootstrap_server::is_bootstrap_connection ()
Expand Down
2 changes: 1 addition & 1 deletion nano/node/bootstrap/bootstrap_server.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class bootstrap_server final : public std::enable_shared_from_this<nano::bootstr
void finish_request ();
void finish_request_async ();
void timeout ();
void run_next ();
void run_next (nano::unique_lock<std::mutex> & lock_a);
bool is_bootstrap_connection ();
bool is_realtime_connection ();
std::shared_ptr<std::vector<uint8_t>> receive_buffer;
Expand Down
95 changes: 94 additions & 1 deletion nano/node/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

nano::network::network (nano::node & node_a, uint16_t port_a) :
buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receive buffer
tcp_message_manager (node_a.stats, node_a.config.tcp_incoming_connections_max),
resolver (node_a.io_ctx),
limiter (node_a.config.bandwidth_limit),
node (node_a),
Expand All @@ -22,6 +23,7 @@ disconnect_observer ([]() {})
{
boost::thread::attributes attrs;
nano::thread_attributes::set (attrs);
// UDP
for (size_t i = 0; i < node.config.network_threads && !node.flags.disable_udp; ++i)
{
packet_processing_threads.emplace_back (attrs, [this]() {
Expand Down Expand Up @@ -52,7 +54,42 @@ disconnect_observer ([]() {})
}
if (this->node.config.logging.network_packet_logging ())
{
this->node.logger.try_log ("Exiting packet processing thread");
this->node.logger.try_log ("Exiting UDP packet processing thread");
}
});
}
// TCP
for (size_t i = 0; i < node.config.network_threads && !node.flags.disable_tcp_realtime; ++i)
{
packet_processing_threads.emplace_back (attrs, [this]() {
nano::thread_role::set (nano::thread_role::name::packet_processing);
try
{
tcp_channels.process_messages ();
}
catch (boost::system::error_code & ec)
{
this->node.logger.try_log (FATAL_LOG_PREFIX, ec.message ());
release_assert (false);
}
catch (std::error_code & ec)
{
this->node.logger.try_log (FATAL_LOG_PREFIX, ec.message ());
release_assert (false);
}
catch (std::runtime_error & err)
{
this->node.logger.try_log (FATAL_LOG_PREFIX, err.what ());
release_assert (false);
}
catch (...)
{
this->node.logger.try_log (FATAL_LOG_PREFIX, "Unknown exception");
release_assert (false);
}
if (this->node.config.logging.network_packet_logging ())
{
this->node.logger.try_log ("Exiting TCP packet processing thread");
}
});
}
Expand Down Expand Up @@ -87,6 +124,7 @@ void nano::network::stop ()
tcp_channels.stop ();
resolver.cancel ();
buffer_container.stop ();
tcp_message_manager.stop ();
port = 0;
for (auto & thread : packet_processing_threads)
{
Expand Down Expand Up @@ -376,6 +414,13 @@ class network_message_visitor : public nano::message_visitor
}
node.stats.inc (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in);
node.network.merge_peers (message_a.peers);
// Check for special node port data
auto peer0 (message_a.peers[0]);
if (peer0.address () == boost::asio::ip::address_v6{} && peer0.port () != 0)
{
nano::endpoint new_endpoint (channel->get_tcp_endpoint ().address (), peer0.port ());
node.network.merge_peer (new_endpoint);
}
}
void publish (nano::publish const & message_a) override
{
Expand Down Expand Up @@ -815,6 +860,54 @@ void nano::message_buffer_manager::stop ()
condition.notify_all ();
}

nano::tcp_message_manager::tcp_message_manager (nano::stat & stats_a, unsigned incoming_connections_max_a) :
stats (stats_a),
max_entries (incoming_connections_max_a * nano::tcp_message_manager::max_entries_per_connection + 1)
{
debug_assert (max_entries > 0);
}

void nano::tcp_message_manager::put_message (nano::tcp_message_item const & item_a)
{
{
nano::unique_lock<std::mutex> lock (mutex);
if (entries.size () > max_entries && !stopped)
{
condition.wait (lock, [& stopped = stopped, &entries = entries, &max_entries = max_entries] { return stopped || entries.size () < max_entries; });
}
entries.push_back (item_a);
}
condition.notify_all ();
}

nano::tcp_message_item nano::tcp_message_manager::get_message ()
{
nano::unique_lock<std::mutex> lock (mutex);
if (entries.empty () && !stopped)
{
condition.wait (lock, [& stopped = stopped, &entries = entries] { return stopped || !entries.empty (); });
}
if (!entries.empty ())
{
auto result (entries.front ());
entries.pop_front ();
return result;
}
else
{
return nano::tcp_message_item{ std::make_shared<nano::keepalive> (nano::keepalive ()), nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0), 0, nullptr, nano::bootstrap_server_type::undefined };
guilhermelawless marked this conversation as resolved.
Show resolved Hide resolved
}
}

void nano::tcp_message_manager::stop ()
{
{
nano::lock_guard<std::mutex> lock (mutex);
stopped = true;
}
condition.notify_all ();
}

boost::optional<nano::uint256_union> nano::syn_cookies::assign (nano::endpoint const & endpoint_a)
{
auto ip_addr (endpoint_a.address ());
Expand Down
20 changes: 20 additions & 0 deletions nano/node/network.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,25 @@ class message_buffer_manager final
std::vector<nano::message_buffer> entries;
bool stopped;
};
class tcp_message_manager final
{
public:
// Stats - Statistics
guilhermelawless marked this conversation as resolved.
Show resolved Hide resolved
tcp_message_manager (nano::stat & stats_a, unsigned incoming_connections_max_a);
void put_message (nano::tcp_message_item const & item_a);
nano::tcp_message_item get_message ();
// Stop container and notify waiting threads
void stop ();

private:
nano::stat & stats;
std::mutex mutex;
nano::condition_variable condition;
std::deque<nano::tcp_message_item> entries;
unsigned max_entries;
static unsigned const max_entries_per_connection = 16;
bool stopped{ false };
};
/**
* Node ID cookies for node ID handshakes
*/
Expand Down Expand Up @@ -151,6 +170,7 @@ class network final
boost::asio::ip::udp::resolver resolver;
std::vector<boost::thread> packet_processing_threads;
nano::bandwidth_limiter limiter;
nano::tcp_message_manager tcp_message_manager;
nano::node & node;
nano::transport::udp_channels udp_channels;
nano::transport::tcp_channels tcp_channels;
Expand Down
29 changes: 12 additions & 17 deletions nano/node/transport/tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,18 @@ nano::tcp_endpoint nano::transport::tcp_channels::bootstrap_peer (uint8_t connec
return result;
}

void nano::transport::tcp_channels::process_messages ()
{
while (!stopped)
{
auto item (node.network.tcp_message_manager.get_message ());
if (item.message != nullptr)
{
process_message (*item.message, item.endpoint, item.node_id, item.socket, item.type);
}
}
}

void nano::transport::tcp_channels::process_message (nano::message const & message_a, nano::tcp_endpoint const & endpoint_a, nano::account const & node_id_a, std::shared_ptr<nano::socket> socket_a, nano::bootstrap_server_type type_a)
{
if (!stopped)
Expand Down Expand Up @@ -304,23 +316,6 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa
}
}

void nano::transport::tcp_channels::process_keepalive (nano::keepalive const & message_a, nano::tcp_endpoint const & endpoint_a)
{
if (!max_ip_connections (endpoint_a))
{
// Check for special node port data
auto peer0 (message_a.peers[0]);
if (peer0.address () == boost::asio::ip::address_v6{} && peer0.port () != 0)
{
nano::endpoint new_endpoint (endpoint_a.address (), peer0.port ());
node.network.merge_peer (new_endpoint);
}
// Used to store sender endpoint information only
auto udp_channel (std::make_shared<nano::transport::channel_udp> (node.network.udp_channels, nano::transport::map_tcp_to_endpoint (endpoint_a), node.network_params.protocol.protocol_version));
node.network.process_message (message_a, udp_channel);
}
}

void nano::transport::tcp_channels::start ()
{
ongoing_keepalive ();
Expand Down
11 changes: 10 additions & 1 deletion nano/node/transport/tcp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ namespace nano
{
class bootstrap_server;
enum class bootstrap_server_type;
class tcp_message_item final
{
public:
std::shared_ptr<nano::message> message;
nano::tcp_endpoint endpoint;
nano::account node_id;
std::shared_ptr<nano::socket> socket;
nano::bootstrap_server_type type;
};
namespace transport
{
class tcp_channels;
Expand Down Expand Up @@ -95,8 +104,8 @@ namespace transport
void receive ();
void start ();
void stop ();
void process_messages ();
void process_message (nano::message const &, nano::tcp_endpoint const &, nano::account const &, std::shared_ptr<nano::socket>, nano::bootstrap_server_type);
void process_keepalive (nano::keepalive const &, nano::tcp_endpoint const &);
bool max_ip_connections (nano::tcp_endpoint const &);
// Should we reach out to this endpoint with a keepalive message
bool reachout (nano::endpoint const &);
Expand Down