Skip to content

Commit

Permalink
Externally connect to the tcp socket instead of within the bootstrap …
Browse files Browse the repository at this point in the history
…client. (#1828)
  • Loading branch information
clemahieu committed Mar 15, 2019
1 parent db4ba1f commit 686c901
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 52 deletions.
98 changes: 49 additions & 49 deletions nano/node/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,11 @@ nano::tcp_endpoint nano::socket::remote_endpoint ()
return endpoint;
}

nano::bootstrap_client::bootstrap_client (std::shared_ptr<nano::node> node_a, std::shared_ptr<nano::bootstrap_attempt> attempt_a, nano::tcp_endpoint const & endpoint_a) :
nano::bootstrap_client::bootstrap_client (std::shared_ptr<nano::node> node_a, std::shared_ptr<nano::bootstrap_attempt> attempt_a, std::shared_ptr<nano::socket> socket_a) :
node (node_a),
attempt (attempt_a),
socket (std::make_shared<nano::socket> (node_a)),
socket (socket_a),
receive_buffer (std::make_shared<std::vector<uint8_t>> ()),
endpoint (endpoint_a),
start_time (std::chrono::steady_clock::now ()),
block_count (0),
pending_stop (false),
Expand Down Expand Up @@ -171,39 +170,6 @@ void nano::bootstrap_client::stop (bool force)
}
}

void nano::bootstrap_client::run ()
{
auto this_l (shared_from_this ());
socket->async_connect (endpoint, [this_l](boost::system::error_code const & ec) {
if (!ec)
{
if (this_l->node->config.logging.bulk_pull_logging ())
{
this_l->node->logger.try_log (boost::str (boost::format ("Connection established to %1%") % this_l->endpoint));
}
this_l->attempt->pool_connection (this_l->shared_from_this ());
}
else
{
if (this_l->node->config.logging.network_logging ())
{
switch (ec.value ())
{
default:
this_l->node->logger.try_log (boost::str (boost::format ("Error initiating bootstrap connection to %1%: %2%") % this_l->endpoint % ec.message ()));
break;
case boost::system::errc::connection_refused:
case boost::system::errc::operation_canceled:
case boost::system::errc::timed_out:
case 995: //Windows The I/O operation has been aborted because of either a thread exit or an application request
case 10061: //Windows No connection could be made because the target machine actively refused it
break;
}
}
}
});
}

void nano::frontier_req_client::run ()
{
std::unique_ptr<nano::frontier_req> request (new nano::frontier_req);
Expand Down Expand Up @@ -465,7 +431,7 @@ void nano::bulk_pull_client::request ()
if (connection->node->config.logging.bulk_pull_logging ())
{
std::unique_lock<std::mutex> lock (connection->attempt->mutex);
connection->node->logger.try_log (boost::str (boost::format ("Requesting account %1% from %2%. %3% accounts in queue") % req.start.to_account () % connection->endpoint % connection->attempt->pulls.size ()));
connection->node->logger.try_log (boost::str (boost::format ("Requesting account %1% from %2%. %3% accounts in queue") % req.start.to_account () % connection->socket->remote_endpoint () % connection->attempt->pulls.size ()));
}
else if (connection->node->config.logging.network_logging () && connection->attempt->should_log ())
{
Expand All @@ -482,7 +448,7 @@ void nano::bulk_pull_client::request ()
{
if (this_l->connection->node->config.logging.bulk_pull_logging ())
{
this_l->connection->node->logger.try_log (boost::str (boost::format ("Error sending bulk pull request to %1%: to %2%") % ec.message () % this_l->connection->endpoint));
this_l->connection->node->logger.try_log (boost::str (boost::format ("Error sending bulk pull request to %1%: to %2%") % ec.message () % this_l->connection->socket->remote_endpoint ()));
}
}
});
Expand Down Expand Up @@ -789,7 +755,7 @@ void nano::bulk_pull_account_client::request ()
if (connection->node->config.logging.bulk_pull_logging ())
{
std::unique_lock<std::mutex> lock (connection->attempt->mutex);
connection->node->logger.try_log (boost::str (boost::format ("Requesting pending for account %1% from %2%. %3% accounts in queue") % req.account.to_account () % connection->endpoint % connection->attempt->wallet_accounts.size ()));
connection->node->logger.try_log (boost::str (boost::format ("Requesting pending for account %1% from %2%. %3% accounts in queue") % req.account.to_account () % connection->socket->remote_endpoint () % connection->attempt->wallet_accounts.size ()));
}
else if (connection->node->config.logging.network_logging () && connection->attempt->should_log ())
{
Expand All @@ -807,7 +773,7 @@ void nano::bulk_pull_account_client::request ()
this_l->connection->attempt->requeue_pending (this_l->account);
if (this_l->connection->node->config.logging.bulk_pull_logging ())
{
this_l->connection->node->logger.try_log (boost::str (boost::format ("Error starting bulk pull request to %1%: to %2%") % ec.message () % this_l->connection->endpoint));
this_l->connection->node->logger.try_log (boost::str (boost::format ("Error starting bulk pull request to %1%: to %2%") % ec.message () % this_l->connection->socket->remote_endpoint ()));
}
}
});
Expand Down Expand Up @@ -956,7 +922,7 @@ bool nano::bootstrap_attempt::request_frontier (std::unique_lock<std::mutex> & l
{
if (!result)
{
node->logger.try_log (boost::str (boost::format ("Completed frontier request, %1% out of sync accounts according to %2%") % pulls.size () % connection_l->endpoint));
node->logger.try_log (boost::str (boost::format ("Completed frontier request, %1% out of sync accounts according to %2%") % pulls.size () % connection_l->socket->remote_endpoint ()));
}
else
{
Expand Down Expand Up @@ -1172,7 +1138,7 @@ void nano::bootstrap_attempt::populate_connections ()
if (auto client = c.lock ())
{
new_clients.push_back (client);
endpoints.insert (client->endpoint);
endpoints.insert (client->socket->remote_endpoint ());
double elapsed_sec = client->elapsed_seconds ();
auto blocks_per_sec = client->block_rate ();
rate_sum += blocks_per_sec;
Expand All @@ -1186,7 +1152,7 @@ void nano::bootstrap_attempt::populate_connections ()
{
if (node->config.logging.bulk_pull_logging ())
{
node->logger.try_log (boost::str (boost::format ("Stopping slow peer %1% (elapsed sec %2%s > %3%s and %4% blocks per second < %5%)") % client->endpoint.address ().to_string () % elapsed_sec % bootstrap_minimum_termination_time_sec % blocks_per_sec % bootstrap_minimum_blocks_per_sec));
node->logger.try_log (boost::str (boost::format ("Stopping slow peer %1% (elapsed sec %2%s > %3%s and %4% blocks per second < %5%)") % client->socket->remote_endpoint ().address ().to_string () % elapsed_sec % bootstrap_minimum_termination_time_sec % blocks_per_sec % bootstrap_minimum_blocks_per_sec));
}

client->stop (true);
Expand Down Expand Up @@ -1217,7 +1183,7 @@ void nano::bootstrap_attempt::populate_connections ()

if (node->config.logging.bulk_pull_logging ())
{
node->logger.try_log (boost::str (boost::format ("Dropping peer with block rate %1%, block count %2% (%3%) ") % client->block_rate () % client->block_count % client->endpoint.address ().to_string ()));
node->logger.try_log (boost::str (boost::format ("Dropping peer with block rate %1%, block count %2% (%3%) ") % client->block_rate () % client->block_count % client->socket->remote_endpoint ().address ().to_string ()));
}

client->stop (false);
Expand All @@ -1242,10 +1208,8 @@ void nano::bootstrap_attempt::populate_connections ()
auto endpoint (nano::tcp_endpoint (peer.address (), peer.port ()));
if (peer != nano::endpoint (boost::asio::ip::address_v6::any (), 0) && endpoints.find (endpoint) == endpoints.end ())
{
auto client (std::make_shared<nano::bootstrap_client> (node, shared_from_this (), endpoint));
client->run ();
connect_client (endpoint);
std::lock_guard<std::mutex> lock (mutex);
clients.push_back (client);
endpoints.insert (endpoint);
}
else if (connections == 0)
Expand All @@ -1270,8 +1234,44 @@ void nano::bootstrap_attempt::populate_connections ()

void nano::bootstrap_attempt::add_connection (nano::endpoint const & endpoint_a)
{
auto client (std::make_shared<nano::bootstrap_client> (node, shared_from_this (), nano::tcp_endpoint (endpoint_a.address (), endpoint_a.port ())));
client->run ();
connect_client (nano::tcp_endpoint (endpoint_a.address (), endpoint_a.port ()));
}

void nano::bootstrap_attempt::connect_client (nano::tcp_endpoint const & endpoint_a)
{
++connections;
auto socket (std::make_shared<nano::socket> (node));
auto this_l (shared_from_this ());
socket->async_connect (endpoint_a, [this_l, socket, endpoint_a](boost::system::error_code const & ec) {
if (!ec)
{
if (this_l->node->config.logging.bulk_pull_logging ())
{
this_l->node->logger.try_log (boost::str (boost::format ("Connection established to %1%") % endpoint_a));
}
auto client (std::make_shared<nano::bootstrap_client> (this_l->node, this_l, socket));
this_l->pool_connection (client);
}
else
{
if (this_l->node->config.logging.network_logging ())
{
switch (ec.value ())
{
default:
this_l->node->logger.try_log (boost::str (boost::format ("Error initiating bootstrap connection to %1%: %2%") % endpoint_a % ec.message ()));
break;
case boost::system::errc::connection_refused:
case boost::system::errc::operation_canceled:
case boost::system::errc::timed_out:
case 995: //Windows The I/O operation has been aborted because of either a thread exit or an application request
case 10061: //Windows No connection could be made because the target machine actively refused it
break;
}
}
}
--this_l->connections;
});
}

void nano::bootstrap_attempt::pool_connection (std::shared_ptr<nano::bootstrap_client> client_a)
Expand Down
5 changes: 2 additions & 3 deletions nano/node/bootstrap.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class bootstrap_attempt : public std::enable_shared_from_this<bootstrap_attempt>
void request_pull (std::unique_lock<std::mutex> &);
void request_push (std::unique_lock<std::mutex> &);
void add_connection (nano::endpoint const &);
void connect_client (nano::tcp_endpoint const &);
void pool_connection (std::shared_ptr<nano::bootstrap_client>);
void stop ();
void requeue_pull (nano::pull_info const &);
Expand Down Expand Up @@ -172,9 +173,8 @@ class bulk_pull_client : public std::enable_shared_from_this<nano::bulk_pull_cli
class bootstrap_client : public std::enable_shared_from_this<bootstrap_client>
{
public:
bootstrap_client (std::shared_ptr<nano::node>, std::shared_ptr<nano::bootstrap_attempt>, nano::tcp_endpoint const &);
bootstrap_client (std::shared_ptr<nano::node>, std::shared_ptr<nano::bootstrap_attempt>, std::shared_ptr<nano::socket>);
~bootstrap_client ();
void run ();
std::shared_ptr<nano::bootstrap_client> shared ();
void stop (bool force);
double block_rate () const;
Expand All @@ -183,7 +183,6 @@ class bootstrap_client : public std::enable_shared_from_this<bootstrap_client>
std::shared_ptr<nano::bootstrap_attempt> attempt;
std::shared_ptr<nano::socket> socket;
std::shared_ptr<std::vector<uint8_t>> receive_buffer;
nano::tcp_endpoint endpoint;
std::chrono::steady_clock::time_point start_time;
std::atomic<uint64_t> block_count;
std::atomic<bool> pending_stop;
Expand Down

0 comments on commit 686c901

Please sign in to comment.