Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Respond to keepalive messages within a TCP socket #1742

Merged
merged 3 commits into from Feb 18, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 28 additions & 0 deletions nano/core_test/network.cpp
Expand Up @@ -1392,3 +1392,31 @@ TEST (bulk_pull_account, basics)
ASSERT_EQ (nullptr, block_data.second.get ());
}
}

TEST (bootstrap, keepalive)
{
nano::system system (24000, 1);
auto socket (std::make_shared<nano::socket> (system.nodes[0]));
nano::keepalive keepalive;
auto input (keepalive.to_bytes ());
socket->async_connect (system.nodes[0]->bootstrap.endpoint (), [&input, socket](boost::system::error_code const & ec) {
ASSERT_FALSE (ec);
socket->async_write (input, [&input](boost::system::error_code const & ec, size_t size_a) {
ASSERT_FALSE (ec);
ASSERT_EQ (input->size (), size_a);
});
});

auto output (keepalive.to_bytes ());
bool done (false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably should be atomic ?

socket->async_read (output, output->size (), [&output, &done](boost::system::error_code const & ec, size_t size_a) {
ASSERT_FALSE (ec);
ASSERT_EQ (output->size (), size_a);
done = true;
});
system.deadline_set (std::chrono::seconds (5));
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
}
62 changes: 59 additions & 3 deletions nano/node/bootstrap.cpp
Expand Up @@ -2062,7 +2062,7 @@ receive_buffer (std::make_shared<std::vector<uint8_t>> ()),
socket (socket_a),
node (node_a)
{
receive_buffer->resize (128);
receive_buffer->resize (512);
}

void nano::bootstrap_server::receive ()
Expand Down Expand Up @@ -2119,6 +2119,14 @@ void nano::bootstrap_server::receive_header_action (boost::system::error_code co
add_request (std::unique_ptr<nano::message> (new nano::bulk_push (header)));
break;
}
case nano::message_type::keepalive:
{
auto this_l (shared_from_this ());
socket->async_read (receive_buffer, header.payload_length_bytes (), [this_l, header](boost::system::error_code const & ec, size_t size_a) {
this_l->receive_keepalive_action (ec, size_a, header);
});
break;
}
default:
{
if (node->config.logging.network_logging ())
Expand Down Expand Up @@ -2178,6 +2186,28 @@ void nano::bootstrap_server::receive_bulk_pull_account_action (boost::system::er
}
}

void nano::bootstrap_server::receive_keepalive_action (boost::system::error_code const & ec, size_t size_a, nano::message_header const & header_a)
{
if (!ec)
{
auto error (false);
nano::bufferstream stream (receive_buffer->data (), header_a.payload_length_bytes ());
std::unique_ptr<nano::keepalive> request (new nano::keepalive (error, stream, header_a));
if (!error)
{
add_request (std::unique_ptr<nano::message> (request.release ()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't this just be: add_request (std::move (request)); ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, a lot of these need to be fixed to use c++13 semantics.

receive ();
}
}
else
{
if (node->config.logging.network_keepalive_logging ())
{
BOOST_LOG (node->log) << boost::str (boost::format ("Error receiving keepalive from: %1%") % ec.message ());
}
}
}

void nano::bootstrap_server::receive_frontier_req_action (boost::system::error_code const & ec, size_t size_a, nano::message_header const & header_a)
{
if (!ec)
Expand Down Expand Up @@ -2235,9 +2265,35 @@ class request_response_visitor : public nano::message_visitor
{
}
virtual ~request_response_visitor () = default;
void keepalive (nano::keepalive const &) override
void keepalive (nano::keepalive const & message_a) override
{
assert (false);
if (connection->node->config.logging.network_keepalive_logging ())
{
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Received keepalive message from %1%") % connection->socket->remote_endpoint ());
}
connection->node->stats.inc (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in);
connection->node->network.merge_peers (message_a.peers);
nano::keepalive message;
connection->node->peers.random_fill (message.peers);
auto bytes = message.to_bytes ();
if (connection->node->config.logging.network_keepalive_logging ())
{
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Keepalive req sent to %1%") % connection->socket->remote_endpoint ());
}
connection->socket->async_write (bytes, [connection = connection](boost::system::error_code const & ec, size_t size_a) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

connection = connection (?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

connection is a member variable, so if you want to pass it in without passing this this is the way to do it (and [&var = var] for reference)

if (ec)
{
if (connection->node->config.logging.network_keepalive_logging ())
{
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Error sending keepalive to %1%: %2%") % connection->socket->remote_endpoint () % ec.message ());
}
}
else
{
connection->node->stats.inc (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::out);
connection->finish_request ();
}
});
}
void publish (nano::publish const &) override
{
Expand Down
1 change: 1 addition & 0 deletions nano/node/bootstrap.hpp
Expand Up @@ -284,6 +284,7 @@ class bootstrap_server : public std::enable_shared_from_this<nano::bootstrap_ser
void receive_bulk_pull_action (boost::system::error_code const &, size_t, nano::message_header const &);
void receive_bulk_pull_account_action (boost::system::error_code const &, size_t, nano::message_header const &);
void receive_frontier_req_action (boost::system::error_code const &, size_t, nano::message_header const &);
void receive_keepalive_action (boost::system::error_code const &, size_t, nano::message_header const &);
void add_request (std::unique_ptr<nano::message>);
void finish_request ();
void run_next ();
Expand Down
4 changes: 4 additions & 0 deletions nano/node/common.cpp
Expand Up @@ -119,6 +119,10 @@ size_t nano::message_header::payload_length_bytes () const
{
return nano::bulk_pull_account::size;
}
case nano::message_type::keepalive:
{
return nano::keepalive::size;
}
// Add realtime network messages once they get framing support; currently the
// realtime messages all fit in a datagram from which they're deserialized.
default:
Expand Down
1 change: 1 addition & 0 deletions nano/node/common.hpp
Expand Up @@ -280,6 +280,7 @@ class keepalive : public message
bool deserialize (nano::stream &);
bool operator== (nano::keepalive const &) const;
std::array<nano::endpoint, 8> peers;
static size_t constexpr size = 8 * (16 + 2);
};
class publish : public message
{
Expand Down