Skip to content

Commit

Permalink
Socket write queue fixes and improvements (#4202)
Browse files Browse the repository at this point in the history
Fixes a socket data error where partial simultaneous writes can be interleaved corrupting the connection.
Adds ability to prioritise non-bootstrap traffic
  • Loading branch information
pwojcikdev committed Apr 3, 2023
1 parent 168367a commit 62bdaba
Show file tree
Hide file tree
Showing 16 changed files with 315 additions and 120 deletions.
28 changes: 14 additions & 14 deletions nano/core_test/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ TEST (socket, max_connections)
return node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in);
};

ASSERT_TIMELY (5s, get_tcp_accept_failures () == 1);
ASSERT_TIMELY (5s, get_tcp_accept_successes () == 2);
ASSERT_TIMELY (5s, connection_attempts == 3);
ASSERT_TIMELY_EQ (5s, get_tcp_accept_failures (), 1);
ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (), 2);
ASSERT_TIMELY_EQ (5s, connection_attempts, 3);

// create space for one socket and fill the connections table again

Expand All @@ -79,9 +79,9 @@ TEST (socket, max_connections)
auto client5 = std::make_shared<nano::transport::client_socket> (*node);
client5->async_connect (dst_endpoint, connect_handler);

ASSERT_TIMELY (5s, get_tcp_accept_failures () == 2);
ASSERT_TIMELY (5s, get_tcp_accept_successes () == 3);
ASSERT_TIMELY (5s, connection_attempts == 5);
ASSERT_TIMELY_EQ (5s, get_tcp_accept_failures (), 2);
ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (), 3);
ASSERT_TIMELY_EQ (5s, connection_attempts, 5);

// close all existing sockets and fill the connections table again
// start counting form 1 because 0 is the already closed socket
Expand All @@ -99,10 +99,10 @@ TEST (socket, max_connections)
auto client8 = std::make_shared<nano::transport::client_socket> (*node);
client8->async_connect (dst_endpoint, connect_handler);

ASSERT_TIMELY (5s, get_tcp_accept_failures () == 3);
ASSERT_TIMELY (5s, get_tcp_accept_successes () == 5);
ASSERT_TIMELY (5s, connection_attempts == 8); // connections initiated by the client
ASSERT_TIMELY (5s, server_sockets.size () == 5); // connections accepted by the server
ASSERT_TIMELY_EQ (5s, get_tcp_accept_failures (), 3);
ASSERT_TIMELY_EQ (5s, get_tcp_accept_successes (), 5);
ASSERT_TIMELY_EQ (5s, connection_attempts, 8); // connections initiated by the client
ASSERT_TIMELY_EQ (5s, server_sockets.size (), 5); // connections accepted by the server

node->stop ();
}
Expand Down Expand Up @@ -459,11 +459,11 @@ TEST (socket, drop_policy)

// We're going to write twice the queue size + 1, and the server isn't reading
// The total number of drops should thus be 1 (the socket allows doubling the queue size for no_socket_drop)
func (nano::transport::socket::queue_size_max * 2 + 1, nano::transport::buffer_drop_policy::no_socket_drop);
func (nano::transport::socket::default_max_queue_size * 2 + 1, nano::transport::buffer_drop_policy::no_socket_drop);
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out));
ASSERT_EQ (0, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out));

func (nano::transport::socket::queue_size_max + 1, nano::transport::buffer_drop_policy::limiter);
func (nano::transport::socket::default_max_queue_size + 1, nano::transport::buffer_drop_policy::limiter);
// The stats are accumulated from before
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_no_socket_drop, nano::stat::dir::out));
ASSERT_EQ (1, node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_write_drop, nano::stat::dir::out));
Expand Down Expand Up @@ -716,7 +716,7 @@ TEST (socket_timeout, write)
// create a client socket and send lots of data to fill the socket queue on the local and remote side
// eventually, the all tcp queues should fill up and async_write will not be able to progress
// and the timeout should kick in and close the socket, which will cause the async_write to return an error
auto socket = std::make_shared<nano::transport::client_socket> (*node);
auto socket = std::make_shared<nano::transport::client_socket> (*node, 1024 * 64); // socket with a max queue size much larger than OS buffers
std::atomic<bool> done = false;
boost::system::error_code ec;
socket->async_connect (endpoint, [&socket, &ec, &done] (boost::system::error_code const & ec_a) {
Expand Down Expand Up @@ -826,7 +826,7 @@ TEST (socket_timeout, write_overlapped)
// create a client socket and send lots of data to fill the socket queue on the local and remote side
// eventually, the all tcp queues should fill up and async_write will not be able to progress
// and the timeout should kick in and close the socket, which will cause the async_write to return an error
auto socket = std::make_shared<nano::transport::client_socket> (*node);
auto socket = std::make_shared<nano::transport::client_socket> (*node, 1024 * 64); // socket with a max queue size much larger than OS buffers
std::atomic<bool> done = false;
boost::system::error_code ec;
socket->async_connect (endpoint, [&socket, &ec, &done] (boost::system::error_code const & ec_a) {
Expand Down
15 changes: 15 additions & 0 deletions nano/node/bandwidth_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,19 @@ void nano::outbound_bandwidth_limiter::reset (std::size_t limit, double burst_ra
{
auto & limiter = select_limiter (type);
limiter.reset (limit, burst_ratio);
}

nano::bandwidth_limit_type nano::to_bandwidth_limit_type (const nano::transport::traffic_type & traffic_type)
{
switch (traffic_type)
{
case nano::transport::traffic_type::generic:
return nano::bandwidth_limit_type::standard;
break;
case nano::transport::traffic_type::bootstrap:
return nano::bandwidth_limit_type::bootstrap;
break;
}
debug_assert (false);
return {};
}
3 changes: 3 additions & 0 deletions nano/node/bandwidth_limiter.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <nano/lib/rate_limiting.hpp>
#include <nano/node/transport/traffic_type.hpp>

namespace nano
{
Expand All @@ -15,6 +16,8 @@ enum class bandwidth_limit_type
bootstrap
};

nano::bandwidth_limit_type to_bandwidth_limit_type (nano::transport::traffic_type const &);

/**
* Class that tracks and manages bandwidth limits for IO operations
*/
Expand Down
4 changes: 2 additions & 2 deletions nano/node/bootstrap/bootstrap_ascending.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ void nano::bootstrap_ascending::send (std::shared_ptr<nano::transport::channel>
// TODO: There is no feedback mechanism if bandwidth limiter starts dropping our requests
channel->send (
request, nullptr,
nano::transport::buffer_drop_policy::limiter, nano::bandwidth_limit_type::bootstrap);
nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type::bootstrap);
}

size_t nano::bootstrap_ascending::priority_size () const
Expand Down Expand Up @@ -562,7 +562,7 @@ std::shared_ptr<nano::transport::channel> nano::bootstrap_ascending::available_c
auto channels = network.random_set (32, node.network_params.network.bootstrap_protocol_version_min, /* include temporary channels */ true);
for (auto & channel : channels)
{
if (!channel->max ())
if (!channel->max (nano::transport::traffic_type::bootstrap))
{
return channel;
}
Expand Down
6 changes: 3 additions & 3 deletions nano/node/bootstrap/bootstrap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ bool nano::bootstrap_server::request (nano::asc_pull_req const & message, std::s

// If channel is full our response will be dropped anyway, so filter that early
// TODO: Add per channel limits (this ideally should be done on the channel message processing side)
if (channel->max ())
if (channel->max (nano::transport::traffic_type::bootstrap))
{
stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::channel_full, nano::stat::dir::in);
return false;
Expand Down Expand Up @@ -125,7 +125,7 @@ void nano::bootstrap_server::respond (nano::asc_pull_ack & response, std::shared
stats.inc (nano::stat::type::bootstrap_server, nano::stat::detail::write_error, nano::stat::dir::out);
}
},
nano::transport::buffer_drop_policy::limiter, nano::bandwidth_limit_type::bootstrap);
nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type::bootstrap);
}

/*
Expand All @@ -138,7 +138,7 @@ void nano::bootstrap_server::process_batch (std::deque<request_t> & batch)

for (auto & [request, channel] : batch)
{
if (!channel->max ())
if (!channel->max (nano::transport::traffic_type::bootstrap))
{
auto response = process (transaction, request);
respond (response, channel);
Expand Down
6 changes: 3 additions & 3 deletions nano/node/transport/channel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ nano::transport::channel::channel (nano::node & node_a) :
set_network_version (node_a.network_params.network.protocol_version);
}

void nano::transport::channel::send (nano::message & message_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::bandwidth_limit_type limiter_type)
void nano::transport::channel::send (nano::message & message_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type)
{
auto buffer (message_a.to_shared_const_buffer ());
auto detail = nano::to_stat_detail (message_a.header.type);
auto is_droppable_by_limiter = (drop_policy_a == nano::transport::buffer_drop_policy::limiter);
auto should_pass (node.outbound_limiter.should_pass (buffer.size (), limiter_type));
auto should_pass (node.outbound_limiter.should_pass (buffer.size (), to_bandwidth_limit_type (traffic_type)));
if (!is_droppable_by_limiter || should_pass)
{
send_buffer (buffer, callback_a, drop_policy_a);
send_buffer (buffer, callback_a, drop_policy_a, traffic_type);
node.stats.inc (nano::stat::type::message, detail, nano::stat::dir::out);
}
else
Expand Down
18 changes: 14 additions & 4 deletions nano/node/transport/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,25 @@ class channel

virtual std::size_t hash_code () const = 0;
virtual bool operator== (nano::transport::channel const &) const = 0;
void send (nano::message & message_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a = nullptr, nano::transport::buffer_drop_policy policy_a = nano::transport::buffer_drop_policy::limiter, nano::bandwidth_limit_type = nano::bandwidth_limit_type::standard);

void send (nano::message & message_a,
std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a = nullptr,
nano::transport::buffer_drop_policy policy_a = nano::transport::buffer_drop_policy::limiter,
nano::transport::traffic_type = nano::transport::traffic_type::generic);

// TODO: investigate clang-tidy warning about default parameters on virtual/override functions
//
virtual void send_buffer (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter) = 0;
virtual void send_buffer (nano::shared_const_buffer const &,
std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr,
nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter,
nano::transport::traffic_type = nano::transport::traffic_type::generic)
= 0;

virtual std::string to_string () const = 0;
virtual nano::endpoint get_endpoint () const = 0;
virtual nano::tcp_endpoint get_tcp_endpoint () const = 0;
virtual nano::transport::transport_type get_type () const = 0;
virtual bool max ()

virtual bool max (nano::transport::traffic_type = nano::transport::traffic_type::generic)
{
return false;
}
Expand Down
6 changes: 3 additions & 3 deletions nano/node/transport/fake.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ nano::transport::fake::channel::channel (nano::node & node) :

/**
* The send function behaves like a null device, it throws the data away and returns success.
*/
void nano::transport::fake::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy drop_policy_a)
*/
void nano::transport::fake::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type)
{
//auto bytes = buffer_a.to_bytes ();
// auto bytes = buffer_a.to_bytes ();
auto size = buffer_a.size ();
if (callback_a)
{
Expand Down
10 changes: 4 additions & 6 deletions nano/node/transport/fake.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@ namespace transport
std::string to_string () const override;
std::size_t hash_code () const override;

// clang-format off
void send_buffer (
nano::shared_const_buffer const &,
std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr,
nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter
) override;
// clang-format on
nano::shared_const_buffer const &,
std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr,
nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter,
nano::transport::traffic_type = nano::transport::traffic_type::generic) override;

bool operator== (nano::transport::channel const &) const override;
bool operator== (nano::transport::fake::channel const & other_a) const;
Expand Down
2 changes: 1 addition & 1 deletion nano/node/transport/inproc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class message_visitor_inbound : public nano::message_visitor
* Send the buffer to the peer and call the callback function when done. The call never fails.
* Note that the inbound message visitor will be called before the callback because it is called directly whereas the callback is spawned in the background.
*/
void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy drop_policy_a)
void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy drop_policy_a, nano::transport::traffic_type traffic_type)
{
std::size_t offset{ 0 };
auto const buffer_read_fn = [&offset, buffer_v = buffer_a.to_bytes ()] (std::shared_ptr<std::vector<uint8_t>> const & data_a, std::size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a) {
Expand Down
5 changes: 3 additions & 2 deletions nano/node/transport/inproc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ namespace transport
explicit channel (nano::node & node, nano::node & destination);
std::size_t hash_code () const override;
bool operator== (nano::transport::channel const &) const override;

// TODO: investigate clang-tidy warning about default parameters on virtual/override functions
//
void send_buffer (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter) override;
void send_buffer (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, nano::transport::buffer_drop_policy = nano::transport::buffer_drop_policy::limiter, nano::transport::traffic_type = nano::transport::traffic_type::generic) override;

std::string to_string () const override;
bool operator== (nano::transport::inproc::channel const & other_a) const
{
Expand Down

0 comments on commit 62bdaba

Please sign in to comment.