Skip to content

Commit

Permalink
frontier_req_server & frontier_req_client accounts deque (#1251)
Browse files Browse the repository at this point in the history
  • Loading branch information
SergiySW authored and rkeene committed Nov 15, 2018
1 parent 78bc8d2 commit 37329fe
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 45 deletions.
28 changes: 24 additions & 4 deletions rai/core_test/network.cpp
Expand Up @@ -736,7 +736,7 @@ TEST (frontier_req, begin)
auto request (std::make_shared<rai::frontier_req_server> (connection, std::move (req)));
ASSERT_EQ (rai::test_genesis_key.pub, request->current);
rai::genesis genesis;
ASSERT_EQ (genesis.hash (), request->info.head);
ASSERT_EQ (genesis.hash (), request->frontier);
}

TEST (frontier_req, end)
Expand All @@ -762,7 +762,17 @@ TEST (frontier_req, time_bound)
req->count = std::numeric_limits<decltype (req->count)>::max ();
connection->requests.push (std::unique_ptr<rai::message>{});
auto request (std::make_shared<rai::frontier_req_server> (connection, std::move (req)));
ASSERT_TRUE (request->current.is_zero ());
ASSERT_EQ (rai::test_genesis_key.pub, request->current);
// Wait for next second when age of account will be > 0 seconds
std::this_thread::sleep_for (std::chrono::milliseconds (1001));
std::unique_ptr<rai::frontier_req> req2 (new rai::frontier_req);
req2->start.clear ();
req2->age = 0;
req2->count = std::numeric_limits<decltype (req->count)>::max ();
auto connection2 (std::make_shared<rai::bootstrap_server> (nullptr, system.nodes[0]));
connection2->requests.push (std::unique_ptr<rai::message>{});
auto request2 (std::make_shared<rai::frontier_req_server> (connection, std::move (req2)));
ASSERT_TRUE (request2->current.is_zero ());
}

TEST (frontier_req, time_cutoff)
Expand All @@ -771,13 +781,23 @@ TEST (frontier_req, time_cutoff)
auto connection (std::make_shared<rai::bootstrap_server> (nullptr, system.nodes[0]));
std::unique_ptr<rai::frontier_req> req (new rai::frontier_req);
req->start.clear ();
req->age = 10;
req->age = 3;
req->count = std::numeric_limits<decltype (req->count)>::max ();
connection->requests.push (std::unique_ptr<rai::message>{});
auto request (std::make_shared<rai::frontier_req_server> (connection, std::move (req)));
ASSERT_EQ (rai::test_genesis_key.pub, request->current);
rai::genesis genesis;
ASSERT_EQ (genesis.hash (), request->info.head);
ASSERT_EQ (genesis.hash (), request->frontier);
// Wait 4 seconds when age of account will be > 3 seconds
std::this_thread::sleep_for (std::chrono::milliseconds (4001));
std::unique_ptr<rai::frontier_req> req2 (new rai::frontier_req);
req2->start.clear ();
req2->age = 3;
req2->count = std::numeric_limits<decltype (req->count)>::max ();
auto connection2 (std::make_shared<rai::bootstrap_server> (nullptr, system.nodes[0]));
connection2->requests.push (std::unique_ptr<rai::message>{});
auto request2 (std::make_shared<rai::frontier_req_server> (connection, std::move (req2)));
ASSERT_TRUE (request2->frontier.is_zero ());
}

TEST (bulk, genesis)
Expand Down
90 changes: 52 additions & 38 deletions rai/node/bootstrap.cpp
Expand Up @@ -294,14 +294,14 @@ void rai::frontier_req_client::received_frontier (boost::system::error_code cons
while (!current.is_zero () && current < account)
{
// We know about an account they don't.
unsynced (info.head, 0);
unsynced (frontier, 0);
next (transaction);
}
if (!current.is_zero ())
{
if (account == current)
{
if (latest == info.head)
if (latest == frontier)
{
// In sync
}
Expand All @@ -310,11 +310,11 @@ void rai::frontier_req_client::received_frontier (boost::system::error_code cons
if (connection->node->store.block_exists (transaction, latest))
{
// We know about a block they don't.
unsynced (info.head, latest);
unsynced (frontier, latest);
}
else
{
connection->attempt->add_pull (rai::pull_info (account, latest, info.head));
connection->attempt->add_pull (rai::pull_info (account, latest, frontier));
// Either we're behind or there's a fork we differ on
// Either way, bulk pushing will probably not be effective
bulk_push_cost += 5;
Expand All @@ -339,7 +339,7 @@ void rai::frontier_req_client::received_frontier (boost::system::error_code cons
while (!current.is_zero ())
{
// We know about an account they don't.
unsynced (info.head, 0);
unsynced (frontier, 0);
next (transaction);
}
if (connection->node->config.logging.bulk_pull_logging ())
Expand Down Expand Up @@ -369,16 +369,27 @@ void rai::frontier_req_client::received_frontier (boost::system::error_code cons

void rai::frontier_req_client::next (rai::transaction const & transaction_a)
{
auto iterator (connection->node->store.latest_begin (transaction_a, rai::uint256_union (current.number () + 1)));
if (iterator != connection->node->store.latest_end ())
// Filling accounts deque to prevent often read transactions
if (accounts.empty ())
{
current = rai::account (iterator->first);
info = rai::account_info (iterator->second);
}
else
{
current.clear ();
size_t max_size (128);
for (auto i (connection->node->store.latest_begin (transaction_a, current.number () + 1)), n (connection->node->store.latest_end ()); i != n && accounts.size () != max_size; ++i)
{
rai::account_info info (i->second);
accounts.push_back (std::make_pair (rai::account (i->first), info.head));
}
/* If loop breaks before max_size, then latest_end () is reached
Add empty record to finish frontier_req_server */
if (accounts.size () != max_size)
{
accounts.push_back (std::make_pair (rai::account (0), rai::block_hash (0)));
}
}
// Retrieving accounts from deque
auto account_pair (accounts.front ());
accounts.pop_front ();
current = account_pair.first;
frontier = account_pair.second;
}

rai::bulk_pull_client::bulk_pull_client (std::shared_ptr<rai::bootstrap_client> connection_a, rai::pull_info const & pull_a) :
Expand Down Expand Up @@ -2286,24 +2297,11 @@ void rai::bulk_push_server::received_block (boost::system::error_code const & ec
rai::frontier_req_server::frontier_req_server (std::shared_ptr<rai::bootstrap_server> const & connection_a, std::unique_ptr<rai::frontier_req> request_a) :
connection (connection_a),
current (request_a->start.number () - 1),
info (0, 0, 0, 0, 0, 0, rai::epoch::epoch_0),
frontier (0),
request (std::move (request_a)),
send_buffer (std::make_shared<std::vector<uint8_t>> ())
{
next ();
skip_old ();
}

void rai::frontier_req_server::skip_old ()
{
if (request->age != std::numeric_limits<decltype (request->age)>::max ())
{
auto now (rai::seconds_since_epoch ());
while (!current.is_zero () && (now - info.modified) >= request->age)
{
next ();
}
}
}

void rai::frontier_req_server::send_next ()
Expand All @@ -2314,12 +2312,12 @@ void rai::frontier_req_server::send_next ()
send_buffer->clear ();
rai::vectorstream stream (*send_buffer);
write (stream, current.bytes);
write (stream, info.head.bytes);
write (stream, frontier.bytes);
}
auto this_l (shared_from_this ());
if (connection->node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Sending frontier for %1% %2%") % current.to_account () % info.head.to_string ());
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Sending frontier for %1% %2%") % current.to_account () % frontier.to_string ());
}
next ();
connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) {
Expand Down Expand Up @@ -2383,15 +2381,31 @@ void rai::frontier_req_server::sent_action (boost::system::error_code const & ec

void rai::frontier_req_server::next ()
{
auto transaction (connection->node->store.tx_begin_read ());
auto iterator (connection->node->store.latest_begin (transaction, current.number () + 1));
if (iterator != connection->node->store.latest_end ())
// Filling accounts deque to prevent often read transactions
if (accounts.empty ())
{
current = rai::uint256_union (iterator->first);
info = rai::account_info (iterator->second);
}
else
{
current.clear ();
auto now (rai::seconds_since_epoch ());
bool skip_old (request->age != std::numeric_limits<decltype (request->age)>::max ());
size_t max_size (128);
auto transaction (connection->node->store.tx_begin_read ());
for (auto i (connection->node->store.latest_begin (transaction, current.number () + 1)), n (connection->node->store.latest_end ()); i != n && accounts.size () != max_size; ++i)
{
rai::account_info info (i->second);
if (!skip_old || (now - info.modified) <= request->age)
{
accounts.push_back (std::make_pair (rai::account (i->first), info.head));
}
}
/* If loop breaks before max_size, then latest_end () is reached
Add empty record to finish frontier_req_server */
if (accounts.size () != max_size)
{
accounts.push_back (std::make_pair (rai::account (0), rai::block_hash (0)));
}
}
// Retrieving accounts from deque
auto account_pair (accounts.front ());
accounts.pop_front ();
current = account_pair.first;
frontier = account_pair.second;
}
7 changes: 4 additions & 3 deletions rai/node/bootstrap.hpp
Expand Up @@ -113,14 +113,15 @@ class frontier_req_client : public std::enable_shared_from_this<rai::frontier_re
void insert_pull (rai::pull_info const &);
std::shared_ptr<rai::bootstrap_client> connection;
rai::account current;
rai::account_info info;
rai::block_hash frontier;
unsigned count;
rai::account landing;
rai::account faucet;
std::chrono::steady_clock::time_point start_time;
std::promise<bool> promise;
/** A very rough estimate of the cost of `bulk_push`ing missing blocks */
uint64_t bulk_push_cost;
std::deque<std::pair<rai::account, rai::block_hash>> accounts;
};
class bulk_pull_client : public std::enable_shared_from_this<rai::bulk_pull_client>
{
Expand Down Expand Up @@ -300,17 +301,17 @@ class frontier_req_server : public std::enable_shared_from_this<rai::frontier_re
{
public:
frontier_req_server (std::shared_ptr<rai::bootstrap_server> const &, std::unique_ptr<rai::frontier_req>);
void skip_old ();
void send_next ();
void sent_action (boost::system::error_code const &, size_t);
void send_finished ();
void no_block_sent (boost::system::error_code const &, size_t);
void next ();
std::shared_ptr<rai::bootstrap_server> connection;
rai::account current;
rai::account_info info;
rai::block_hash frontier;
std::unique_ptr<rai::frontier_req> request;
std::shared_ptr<std::vector<uint8_t>> send_buffer;
size_t count;
std::deque<std::pair<rai::account, rai::block_hash>> accounts;
};
}

0 comments on commit 37329fe

Please sign in to comment.