Skip to content

Commit

Permalink
Lazy bootstrap improvements (#1427)
Browse files Browse the repository at this point in the history
  • Loading branch information
SergiySW authored and rkeene committed Dec 8, 2018
1 parent 1ac017d commit 3cffac3
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 48 deletions.
37 changes: 37 additions & 0 deletions rai/core_test/network.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -768,6 +768,43 @@ TEST (bootstrap_processor, lazy_hash)
node1->stop ();
}

TEST (bootstrap_processor, lazy_max_pull_count)
{
rai::system system (24000, 1);
rai::node_init init1;
rai::genesis genesis;
rai::keypair key1;
rai::keypair key2;
// Generating test chain
auto send1 (std::make_shared<rai::state_block> (rai::test_genesis_key.pub, genesis.hash (), rai::test_genesis_key.pub, rai::genesis_amount - rai::Gxrb_ratio, key1.pub, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.nodes[0]->work_generate_blocking (genesis.hash ())));
auto receive1 (std::make_shared<rai::state_block> (key1.pub, 0, key1.pub, rai::Gxrb_ratio, send1->hash (), key1.prv, key1.pub, system.nodes[0]->work_generate_blocking (key1.pub)));
auto send2 (std::make_shared<rai::state_block> (key1.pub, receive1->hash (), key1.pub, 0, key2.pub, key1.prv, key1.pub, system.nodes[0]->work_generate_blocking (receive1->hash ())));
auto receive2 (std::make_shared<rai::state_block> (key2.pub, 0, key2.pub, rai::Gxrb_ratio, send2->hash (), key2.prv, key2.pub, system.nodes[0]->work_generate_blocking (key2.pub)));
auto change1 (std::make_shared<rai::state_block> (key2.pub, receive2->hash (), key1.pub, rai::Gxrb_ratio, 0, key2.prv, key2.pub, system.nodes[0]->work_generate_blocking (receive2->hash ())));
auto change2 (std::make_shared<rai::state_block> (key2.pub, change1->hash (), rai::test_genesis_key.pub, rai::Gxrb_ratio, 0, key2.prv, key2.pub, system.nodes[0]->work_generate_blocking (change1->hash ())));
auto change3 (std::make_shared<rai::state_block> (key2.pub, change2->hash (), key2.pub, rai::Gxrb_ratio, 0, key2.prv, key2.pub, system.nodes[0]->work_generate_blocking (change2->hash ())));
// Processing test chain
system.nodes[0]->block_processor.add (send1, std::chrono::steady_clock::time_point ());
system.nodes[0]->block_processor.add (receive1, std::chrono::steady_clock::time_point ());
system.nodes[0]->block_processor.add (send2, std::chrono::steady_clock::time_point ());
system.nodes[0]->block_processor.add (receive2, std::chrono::steady_clock::time_point ());
system.nodes[0]->block_processor.add (change1, std::chrono::steady_clock::time_point ());
system.nodes[0]->block_processor.add (change2, std::chrono::steady_clock::time_point ());
system.nodes[0]->block_processor.add (change3, std::chrono::steady_clock::time_point ());
system.nodes[0]->block_processor.flush ();
// Start lazy bootstrap with last block in chain known
auto node1 (std::make_shared<rai::node> (init1, system.service, 24001, rai::unique_path (), system.alarm, system.logging, system.work));
node1->peers.insert (system.nodes[0]->network.endpoint (), rai::protocol_version);
node1->bootstrap_initiator.bootstrap_lazy (change3->hash ());
// Check processed blocks
system.deadline_set (10s);
while (node1->block (change3->hash ()) == nullptr)
{
ASSERT_NO_ERROR (system.poll ());
}
node1->stop ();
}

TEST (frontier_req_response, DISABLED_destruction)
{
{
Expand Down
146 changes: 114 additions & 32 deletions rai/node/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,10 @@ void rai::frontier_req_client::next (rai::transaction const & transaction_a)

rai::bulk_pull_client::bulk_pull_client (std::shared_ptr<rai::bootstrap_client> connection_a, rai::pull_info const & pull_a) :
connection (connection_a),
total_blocks (0),
pull (pull_a)
{
std::lock_guard<std::mutex> mutex (connection->attempt->mutex);
++connection->attempt->pulling;
connection->attempt->condition.notify_all ();
}

Expand All @@ -407,6 +407,10 @@ rai::bulk_pull_client::~bulk_pull_client ()
if (expected != pull.end)
{
pull.head = expected;
if (connection->attempt->lazy_mode)
{
pull.account = expected;
}
connection->attempt->requeue_pull (pull);
if (connection->node->config.logging.bulk_pull_logging ())
{
Expand Down Expand Up @@ -552,25 +556,32 @@ void rai::bulk_pull_client::received_block (boost::system::error_code const & ec
block->serialize_json (block_l);
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Pulled block %1% %2%") % hash.to_string () % block_l);
}
bool block_expected (false);
if (hash == expected)
{
expected = block->previous ();
block_expected = true;
}
if (connection->block_count++ == 0)
{
connection->start_time = std::chrono::steady_clock::now ();
}
connection->attempt->total_blocks++;
bool stop_pull (connection->attempt->process_block (block));
total_blocks++;
bool stop_pull (connection->attempt->process_block (block, total_blocks, block_expected));
if (!stop_pull && !connection->hard_stop.load ())
{
receive_block ();
}
else if (stop_pull && expected == block->previous ())
else if (stop_pull && block_expected)
{
expected = pull.end;
connection->attempt->pool_connection (connection);
}
if (stop_pull)
{
connection->attempt->lazy_stopped++;
}
}
else
{
Expand Down Expand Up @@ -738,17 +749,11 @@ pulling (0),
node (node_a),
account_count (0),
total_blocks (0),
lazy_stopped (0),
stopped (false),
lazy_mode (false)
{
if (lazy_mode)
{
BOOST_LOG (node->log) << "Starting lazy-bootstrap attempt";
}
else
{
BOOST_LOG (node->log) << "Starting bootstrap attempt";
}
BOOST_LOG (node->log) << "Starting bootstrap attempt";
node->bootstrap_initiator.notify_listeners (true);
}

Expand Down Expand Up @@ -814,6 +819,17 @@ void rai::bootstrap_attempt::request_pull (std::unique_lock<std::mutex> & lock_a
{
auto pull (pulls.front ());
pulls.pop_front ();
if (lazy_mode)
{
// Check if pull is obsolete (head was processed)
std::unique_lock<std::mutex> lock (lazy_mutex);
while (!pulls.empty () && !pull.head.is_zero () && lazy_blocks.find (pull.head) != lazy_blocks.end ())
{
pull = pulls.front ();
pulls.pop_front ();
}
}
++pulling;
// The bulk_pull_client destructor attempt to requeue_pull which can cause a deadlock if this is the last reference
// Dispatch request in an external thread in case it needs to be destroyed
node->background ([connection_l, pull]() {
Expand Down Expand Up @@ -900,15 +916,16 @@ void rai::bootstrap_attempt::run ()
if (!stopped)
{
BOOST_LOG (node->log) << "Completed pulls";
request_push (lock);
// Start lazy bootstrap if some lazy keys were inserted
if (!lazy_keys.empty ())
{
lock.unlock ();
lazy_mode = true;
lazy_run ();
lock.lock ();
}
}
request_push (lock);
stopped = true;
condition.notify_all ();
idle.clear ();
Expand Down Expand Up @@ -1156,19 +1173,19 @@ void rai::bootstrap_attempt::add_bulk_push_target (rai::block_hash const & head,
void rai::bootstrap_attempt::lazy_start (rai::block_hash const & hash_a)
{
std::unique_lock<std::mutex> lock (lazy_mutex);
// Add start blocks
if (lazy_keys.find (hash_a) == lazy_keys.end ())
// Add start blocks, limit 1024 (32k with disabled legacy bootstrap)
size_t max_keys (node->flags.disable_legacy_bootstrap ? 32 * 1024 : 1024);
if (lazy_keys.size () < max_keys && lazy_keys.find (hash_a) == lazy_keys.end () && lazy_blocks.find (hash_a) == lazy_blocks.end ())
{
lazy_keys.insert (hash_a);
lazy_pulls.push_back (hash_a);
}
lazy_add (hash_a);
}

void rai::bootstrap_attempt::lazy_add (rai::block_hash const & hash_a)
{
// Add only unknown blocks
assert (!lazy_mutex.try_lock ());

if (lazy_blocks.find (hash_a) == lazy_blocks.end ())
{
lazy_pulls.push_back (hash_a);
Expand All @@ -1183,7 +1200,7 @@ void rai::bootstrap_attempt::lazy_pull_flush ()
// Recheck if block was already processed
if (lazy_blocks.find (pull_start) == lazy_blocks.end ())
{
add_pull (rai::pull_info (pull_start, pull_start, rai::block_hash (0)));
add_pull (rai::pull_info (pull_start, pull_start, rai::block_hash (0), lazy_max_pull_blocks));
}
}
lazy_pulls.clear ();
Expand All @@ -1194,7 +1211,7 @@ bool rai::bootstrap_attempt::lazy_finished ()
bool result (true);
auto transaction (node->store.tx_begin_read ());
std::unique_lock<std::mutex> lock (lazy_mutex);
for (auto it (lazy_keys.begin ()), end (lazy_keys.end ()); it != end;)
for (auto it (lazy_keys.begin ()), end (lazy_keys.end ()); it != end && !stopped;)
{
if (node->store.block_exists (transaction, *it))
{
Expand All @@ -1208,18 +1225,24 @@ bool rai::bootstrap_attempt::lazy_finished ()
// No need to increment `it` as we break above.
}
}
// Finish lazy bootstrap without lazy pulls (in combination with still_pulling ())
if (!result && lazy_pulls.empty ())
{
result = true;
}
return result;
}

void rai::bootstrap_attempt::lazy_run ()
{
populate_connections ();
auto start_time (std::chrono::steady_clock::now ());
auto max_time (std::chrono::milliseconds (30 * 60 * 1000));
auto max_time (std::chrono::minutes (node->flags.disable_legacy_bootstrap ? 48 * 60 : 30));
std::unique_lock<std::mutex> lock (mutex);
while ((still_pulling () || !lazy_finished ()) && std::chrono::steady_clock::now () - start_time < max_time)
{
while (still_pulling ())
unsigned iterations (0);
while (still_pulling () && std::chrono::steady_clock::now () - start_time < max_time)
{
if (!pulls.empty ())
{
Expand All @@ -1236,6 +1259,14 @@ void rai::bootstrap_attempt::lazy_run ()
{
condition.wait (lock);
}
++iterations;
// Flushing lazy pulls
if (iterations % 100 == 0)
{
lock.unlock ();
lazy_pull_flush ();
lock.lock ();
}
}
// Flushing may resolve forks which can add more pulls
// Flushing lazy pulls
Expand All @@ -1253,10 +1284,10 @@ void rai::bootstrap_attempt::lazy_run ()
idle.clear ();
}

bool rai::bootstrap_attempt::process_block (std::shared_ptr<rai::block> block_a)
bool rai::bootstrap_attempt::process_block (std::shared_ptr<rai::block> block_a, uint64_t total_blocks, bool block_expected)
{
bool stop_pull (false);
if (lazy_mode)
if (lazy_mode && block_expected)
{
auto hash (block_a->hash ());
std::unique_lock<std::mutex> lock (lazy_mutex);
Expand All @@ -1267,49 +1298,90 @@ bool rai::bootstrap_attempt::process_block (std::shared_ptr<rai::block> block_a)
auto transaction (node->store.tx_begin_read ());
if (!node->store.block_exists (transaction, hash))
{
lazy_blocks.insert (hash);
rai::uint128_t balance (std::numeric_limits<rai::uint128_t>::max ());
node->block_processor.add (block_a, std::chrono::steady_clock::time_point ());
// Search for new dependencies
if (!block_a->source ().is_zero () && !node->store.block_exists (transaction, block_a->source ()))
{
lazy_add (block_a->source ());
}
else if (block_a->type () == rai::block_type::send)
{
// Calculate balance for legacy send blocks
std::shared_ptr<rai::send_block> block_l (std::static_pointer_cast<rai::send_block> (block_a));
if (block_l != nullptr)
{
balance = block_l->hashables.balance.number ();
}
}
else if (block_a->type () == rai::block_type::state)
{
std::shared_ptr<rai::state_block> block_l (std::static_pointer_cast<rai::state_block> (block_a));
if (block_l != nullptr)
{
balance = block_l->hashables.balance.number ();
rai::block_hash link (block_l->hashables.link);
// If link is not epoch link or 0. And if block from link unknown
if (!link.is_zero () && link != node->ledger.epoch_link && lazy_blocks.find (link) == lazy_blocks.end () && !node->store.block_exists (transaction, link))
{
rai::block_hash previous (block_l->hashables.previous);
// If state block previous is 0 then source block required
if (block_l->hashables.previous.is_zero ())
if (previous.is_zero ())
{
lazy_add (link);
}
// In other cases previous block balance required to find out subtype of state block
else if (node->store.block_exists (transaction, block_l->hashables.previous))
else if (node->store.block_exists (transaction, previous))
{
rai::amount prev_balance (node->ledger.balance (transaction, block_l->hashables.previous));
if (prev_balance.number () <= block_l->hashables.balance.number ())
rai::amount prev_balance (node->ledger.balance (transaction, previous));
if (prev_balance.number () <= balance)
{
lazy_add (link);
}
}
// Search balance of already processed previous blocks
else if (lazy_blocks.find (previous) != lazy_blocks.end ())
{
auto previous_balance (lazy_balances.find (previous));
if (previous_balance != lazy_balances.end ())
{
if (previous_balance->second <= balance)
{
lazy_add (link);
}
lazy_balances.erase (previous_balance);
}
}
// Insert in unknown state blocks if previous wasn't already processed
else
{
lazy_state_unknown.insert (std::make_pair (block_l->hashables.previous, block_l));
lazy_state_unknown.insert (std::make_pair (previous, std::make_pair (link, balance)));
}
}
}
}
lazy_blocks.insert (hash);
// Adding lazy balances
if (total_blocks == 0)
{
lazy_balances.insert (std::make_pair (hash, balance));
}
// Removing lazy balances
if (!block_a->previous ().is_zero () && lazy_balances.find (block_a->previous ()) != lazy_balances.end ())
{
lazy_balances.erase (block_a->previous ());
}
}
// Drop bulk_pull if block is already known (ledger)
else
{
// Disabled until server rewrite
// stop_pull = true;
// Force drop lazy bootstrap connection for long bulk_pull
if (total_blocks > lazy_max_pull_blocks)
{
stop_pull = true;
}
}
//Search unknown state blocks balances
auto find_state (lazy_state_unknown.find (hash));
Expand All @@ -1321,18 +1393,18 @@ bool rai::bootstrap_attempt::process_block (std::shared_ptr<rai::block> block_a)
if (block_a->type () == rai::block_type::state)
{
std::shared_ptr<rai::state_block> block_l (std::static_pointer_cast<rai::state_block> (block_a));
if (block_l->hashables.balance.number () <= next_block->hashables.balance.number ())
if (block_l->hashables.balance.number () <= next_block.second)
{
lazy_add (next_block->hashables.link);
lazy_add (next_block.first);
}
}
// Retrieve balance for previous legacy send blocks
else if (block_a->type () == rai::block_type::send)
{
std::shared_ptr<rai::send_block> block_l (std::static_pointer_cast<rai::send_block> (block_a));
if (block_l->hashables.balance.number () <= next_block->hashables.balance.number ())
if (block_l->hashables.balance.number () <= next_block.second)
{
lazy_add (next_block->hashables.link);
lazy_add (next_block.first);
}
}
// Weak assumption for other legacy block types
Expand All @@ -1347,8 +1419,18 @@ bool rai::bootstrap_attempt::process_block (std::shared_ptr<rai::block> block_a)
{
// Disabled until server rewrite
// stop_pull = true;
// Force drop lazy bootstrap connection for long bulk_pull
if (total_blocks > lazy_max_pull_blocks)
{
stop_pull = true;
}
}
}
else if (lazy_mode)
{
// Drop connection with unexpected block for lazy bootstrap
stop_pull = true;
}
else
{
node->block_processor.add (block_a, std::chrono::steady_clock::time_point ());
Expand Down
Loading

0 comments on commit 3cffac3

Please sign in to comment.