Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Decouple message deserializer from the TCP socket #4160

Merged
42 changes: 31 additions & 11 deletions nano/node/transport/inproc.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#include <nano/node/network.hpp>
#include <nano/node/node.hpp>
#include <nano/node/transport/inproc.hpp>
#include <nano/node/transport/message_deserializer.hpp>

#include <boost/format.hpp>

Expand Down Expand Up @@ -53,23 +55,41 @@ class message_visitor_inbound : public nano::message_visitor
*/
void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::transport::buffer_drop_policy drop_policy_a)
{
// we create a temporary channel for the reply path, in case the receiver of the message wants to reply
auto remote_channel = std::make_shared<nano::transport::inproc::channel> (destination, node);
auto offset = 0u;
auto const buffer_read_fn = [&offset, buffer_v = buffer_a.to_bytes ()] (std::shared_ptr<std::vector<uint8_t>> const & data_a, size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a) {
debug_assert (buffer_v.size () >= (offset + size_a));
data_a->resize (size_a);
auto const copy_start = buffer_v.begin () + offset;
std::copy (copy_start, copy_start + size_a, data_a->data ());
offset += size_a;
callback_a (boost::system::errc::make_error_code (boost::system::errc::success), size_a);
};

// create an inbound message visitor class to handle incoming messages because that's what the message parser expects
message_visitor_inbound visitor{ destination.network.inbound, remote_channel };
auto const message_deserializer = std::make_shared<nano::transport::message_deserializer> (node.network_params.network, node.network.publish_filter, node.block_uniquer, node.vote_uniquer, buffer_read_fn);
message_deserializer->read (
[this] (boost::system::error_code ec_a, std::unique_ptr<nano::message> message_a) {
if (ec_a || !message_a)
{
return;
}

nano::message_parser parser{ destination.network.publish_filter, destination.block_uniquer, destination.vote_uniquer, visitor, destination.work, destination.network_params.network };
// we create a temporary channel for the reply path, in case the receiver of the message wants to reply
auto remote_channel = std::make_shared<nano::transport::inproc::channel> (destination, node);

// parse the message and action any work that needs to be done on that object via the visitor object
auto bytes = buffer_a.to_bytes ();
auto size = bytes.size ();
parser.deserialize_buffer (bytes.data (), size);
// process message
{
node.stats.inc (nano::stat::type::message, nano::to_stat_detail (message_a->header.type), nano::stat::dir::in);

// create an inbound message visitor class to handle incoming messages
message_visitor_inbound visitor{ destination.network.inbound, remote_channel };
message_a->visit (visitor);
}
});

if (callback_a)
{
node.background ([callback_a, size] () {
callback_a (boost::system::errc::make_error_code (boost::system::errc::success), size);
node.background ([callback_l = std::move (callback_a), buffer_size = buffer_a.size ()] () {
callback_l (boost::system::errc::make_error_code (boost::system::errc::success), buffer_size);
});
}
}
Expand Down
27 changes: 12 additions & 15 deletions nano/node/transport/message_deserializer.cpp
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
#include <nano/node/node.hpp>
#include <nano/node/transport/message_deserializer.hpp>

nano::transport::message_deserializer::message_deserializer (nano::network_constants const & network_constants_a, nano::network_filter & publish_filter_a, nano::block_uniquer & block_uniquer_a, nano::vote_uniquer & vote_uniquer_a) :
nano::transport::message_deserializer::message_deserializer (nano::network_constants const & network_constants_a, nano::network_filter & publish_filter_a, nano::block_uniquer & block_uniquer_a, nano::vote_uniquer & vote_uniquer_a,
read_query read_op) :
read_buffer{ std::make_shared<std::vector<uint8_t>> () },
network_constants_m{ network_constants_a },
publish_filter_m{ publish_filter_a },
block_uniquer_m{ block_uniquer_a },
vote_uniquer_m{ vote_uniquer_a }
vote_uniquer_m{ vote_uniquer_a },
read_op{ std::move (read_op) }
{
debug_assert (this->read_op);
read_buffer->resize (MAX_MESSAGE_SIZE);
}

void nano::transport::message_deserializer::read (std::shared_ptr<nano::transport::socket> socket, const nano::transport::message_deserializer::callback_type && callback)
void nano::transport::message_deserializer::read (const nano::transport::message_deserializer::callback_type && callback)
{
debug_assert (callback);
debug_assert (read_op);

status = parse_status::none;

// Increase timeout to receive TCP header (idle server socket)
auto prev_timeout = socket->get_default_timeout_value ();
socket->set_default_timeout_value (network_constants_m.idle_timeout);

socket->async_read (read_buffer, HEADER_SIZE, [this_l = shared_from_this (), socket, callback = std::move (callback), prev_timeout] (boost::system::error_code const & ec, std::size_t size_a) {
read_op (read_buffer, HEADER_SIZE, [this_l = shared_from_this (), callback = std::move (callback)] (boost::system::error_code const & ec, std::size_t size_a) {
if (ec)
{
callback (ec, nullptr);
Expand All @@ -32,15 +32,11 @@ void nano::transport::message_deserializer::read (std::shared_ptr<nano::transpor
callback (boost::asio::error::fault, nullptr);
return;
}

// Decrease timeout to default
socket->set_default_timeout_value (prev_timeout);

this_l->received_header (socket, std::move (callback));
this_l->received_header (std::move (callback));
});
}

void nano::transport::message_deserializer::received_header (std::shared_ptr<nano::transport::socket> socket, const nano::transport::message_deserializer::callback_type && callback)
void nano::transport::message_deserializer::received_header (const nano::transport::message_deserializer::callback_type && callback)
{
nano::bufferstream stream{ read_buffer->data (), HEADER_SIZE };
auto error = false;
Expand Down Expand Up @@ -86,7 +82,8 @@ void nano::transport::message_deserializer::received_header (std::shared_ptr<nan
}
else
{
socket->async_read (read_buffer, payload_size, [this_l = shared_from_this (), payload_size, header, callback = std::move (callback)] (boost::system::error_code const & ec, std::size_t size_a) {
debug_assert (read_op);
read_op (read_buffer, payload_size, [this_l = shared_from_this (), payload_size, header, callback = std::move (callback)] (boost::system::error_code const & ec, std::size_t size_a) {
if (ec)
{
callback (ec, nullptr);
Expand Down
13 changes: 6 additions & 7 deletions nano/node/transport/message_deserializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

#include <nano/node/common.hpp>
#include <nano/node/messages.hpp>
#include <nano/node/transport/socket.hpp>

#include <memory>
#include <vector>

namespace nano
{
class socket;

namespace transport
{
class message_deserializer : public std::enable_shared_from_this<nano::transport::message_deserializer>
Expand Down Expand Up @@ -45,19 +42,20 @@ namespace transport

parse_status status;

message_deserializer (network_constants const &, network_filter &, block_uniquer &, vote_uniquer &);
using read_query = std::function<void (std::shared_ptr<std::vector<uint8_t>> const &, size_t, std::function<void (boost::system::error_code const &, std::size_t)>)>;
message_deserializer (network_constants const &, network_filter &, block_uniquer &, vote_uniquer &, read_query read_op);

/*
* Asynchronously read next message from socket.
* Asynchronously read next message from the channel_read_fn.
* If an irrecoverable error is encountered callback will be called with an error code set and null message.
* If a 'soft' error is encountered (eg. duplicate block publish) error won't be set but message will be null. In that case, `status` field will be set to code indicating reason for failure.
* If message is received successfully, error code won't be set and message will be non-null. `status` field will be set to `success`.
* Should not be called until the previous invocation finishes and calls the callback.
*/
void read (std::shared_ptr<nano::transport::socket> socket, callback_type const && callback);
void read (callback_type const && callback);

private:
void received_header (std::shared_ptr<nano::transport::socket> socket, callback_type const && callback);
void received_header (callback_type const && callback);
void received_message (nano::message_header header, std::size_t payload_size, callback_type const && callback);

/*
Expand Down Expand Up @@ -90,6 +88,7 @@ namespace transport
nano::network_filter & publish_filter_m;
nano::block_uniquer & block_uniquer_m;
nano::vote_uniquer & vote_uniquer_m;
read_query read_op;

public:
static stat::detail to_stat_detail (parse_status);
Expand Down
11 changes: 11 additions & 0 deletions nano/node/transport/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,17 @@ void nano::transport::socket::checkup ()
});
}

void nano::transport::socket::read_impl (std::shared_ptr<std::vector<uint8_t>> const & data_a, size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a)
{
// Increase timeout to receive TCP header (idle server socket)
auto const prev_timeout = get_default_timeout_value ();
set_default_timeout_value (node.network_params.network.idle_timeout);
async_read (data_a, size_a, [callback_l = std::move (callback_a), prev_timeout, this_l = shared_from_this ()] (boost::system::error_code const & ec_a, std::size_t size_a) {
this_l->set_default_timeout_value (prev_timeout);
callback_l (ec_a, size_a);
});
}

bool nano::transport::socket::has_timed_out () const
{
return timed_out;
Expand Down
2 changes: 2 additions & 0 deletions nano/node/transport/socket.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class server_socket;
class socket : public std::enable_shared_from_this<nano::transport::socket>
{
friend class server_socket;
friend class tcp_server;

public:
enum class type_t
Expand Down Expand Up @@ -171,6 +172,7 @@ class socket : public std::enable_shared_from_this<nano::transport::socket>
void set_last_completion ();
void set_last_receive_time ();
void checkup ();
void read_impl (std::shared_ptr<std::vector<uint8_t>> const & data_a, size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a);

private:
type_t type_m{ type_t::undefined };
Expand Down
12 changes: 10 additions & 2 deletions nano/node/transport/tcp_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#include <boost/format.hpp>

#include <memory>

nano::transport::tcp_listener::tcp_listener (uint16_t port_a, nano::node & node_a) :
node (node_a),
port (port_a)
Expand Down Expand Up @@ -125,7 +127,13 @@ nano::transport::tcp_server::tcp_server (std::shared_ptr<nano::transport::socket
socket{ std::move (socket_a) },
node{ std::move (node_a) },
allow_bootstrap{ allow_bootstrap_a },
message_deserializer{ std::make_shared<nano::transport::message_deserializer> (node->network_params.network, node->network.publish_filter, node->block_uniquer, node->vote_uniquer) }
message_deserializer{
std::make_shared<nano::transport::message_deserializer> (node->network_params.network, node->network.publish_filter, node->block_uniquer, node->vote_uniquer,
[socket_l = socket] (std::shared_ptr<std::vector<uint8_t>> const & data_a, size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a) {
debug_assert (socket_l != nullptr);
socket_l->read_impl (data_a, size_a, callback_a);
})
}
{
debug_assert (socket != nullptr);
}
Expand Down Expand Up @@ -186,7 +194,7 @@ void nano::transport::tcp_server::receive_message ()
return;
}

message_deserializer->read (socket, [this_l = shared_from_this ()] (boost::system::error_code ec, std::unique_ptr<nano::message> message) {
message_deserializer->read ([this_l = shared_from_this ()] (boost::system::error_code ec, std::unique_ptr<nano::message> message) {
if (ec)
{
// IO error or critical error when deserializing message
Expand Down