Skip to content

Commit

Permalink
Election scheduler (#3208)
Browse files Browse the repository at this point in the history
* This change replaces PoW-difficulty based prioritization with balance * time_since_use based prioritization. The scheduler currently manages two queues. A priority queue where live traffic is managed and scheduled according to vacancy in the active_transactions container and prioritized by the prioritization container. A manual queue where requests that have come through RPCs are enqueued and bypass prioritization.
  • Loading branch information
clemahieu committed Apr 25, 2021
1 parent 88e374a commit c6e52da
Show file tree
Hide file tree
Showing 26 changed files with 486 additions and 129 deletions.
1 change: 1 addition & 0 deletions nano/core_test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ add_executable(
difficulty.cpp
distributed_work.cpp
election.cpp
election_scheduler.cpp
epochs.cpp
frontiers_confirmation.cpp
gap_cache.cpp
Expand Down
45 changes: 31 additions & 14 deletions nano/core_test/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ TEST (active_transactions, keep_local)
auto send5 (wallet.send_action (nano::dev_genesis_key.pub, key5.pub, node.config.receive_minimum.number ()));
auto send6 (wallet.send_action (nano::dev_genesis_key.pub, key6.pub, node.config.receive_minimum.number ()));
// should not drop wallet created transactions
ASSERT_TIMELY (5s, node.active.size () == 6);
ASSERT_TIMELY (5s, node.active.size () == 1);
for (auto const & block : { send1, send2, send3, send4, send5, send6 })
{
ASSERT_TIMELY (1s, node.active.election (block->qualified_root ()));
Expand Down Expand Up @@ -175,8 +175,8 @@ TEST (active_transactions, keep_local)
node.process_active (open3);
node.block_processor.flush ();
// bound elections, should drop after one loop
ASSERT_TIMELY (5s, node.active.size () == node_config.active_elections_size);
ASSERT_EQ (1, node.stats.count (nano::stat::type::election, nano::stat::detail::election_drop));
ASSERT_TIMELY (1s, node.active.size () == node_config.active_elections_size);
ASSERT_EQ (1, node.scheduler.size ());
}

TEST (active_transactions, inactive_votes_cache)
Expand Down Expand Up @@ -342,7 +342,8 @@ TEST (active_transactions, inactive_votes_cache_multiple_votes)
node.vote_processor.vote (vote2, std::make_shared<nano::transport::channel_loopback> (node));
ASSERT_TIMELY (5s, node.active.find_inactive_votes_cache (send1->hash ()).voters.size () == 2);
ASSERT_EQ (1, node.active.inactive_votes_cache_size ());
node.active.insert (send1);
node.scheduler.activate (nano::dev_genesis_key.pub, node.store.tx_begin_read ());
node.scheduler.flush ();
auto election = node.active.election (send1->qualified_root ());
ASSERT_NE (nullptr, election);
ASSERT_EQ (3, election->votes ().size ()); // 2 votes and 1 default not_an_acount
Expand Down Expand Up @@ -510,7 +511,7 @@ TEST (active_transactions, inactive_votes_cache_election_start)
ASSERT_TIMELY (5s, 13 == node.ledger.cache.cemented_count);
}

TEST (active_transactions, update_difficulty)
TEST (active_transactions, DISABLED_update_difficulty)
{
nano::system system (2);
auto & node1 = *system.nodes[0];
Expand Down Expand Up @@ -557,6 +558,7 @@ TEST (active_transactions, update_difficulty)
node1.process_active (send1);
node1.process_active (send2);
node1.block_processor.flush ();
node1.scheduler.flush ();
// Share the updated blocks
node1.network.flood_block (send1);
node1.network.flood_block (send2);
Expand Down Expand Up @@ -701,7 +703,8 @@ TEST (active_transactions, dropped_cleanup)
ASSERT_FALSE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));

node.active.insert (block);
node.block_confirm (block);
node.scheduler.flush ();
auto election = node.active.election (block->qualified_root ());
ASSERT_NE (nullptr, election);

Expand All @@ -725,6 +728,7 @@ TEST (active_transactions, dropped_cleanup)
// Repeat test for a confirmed election
ASSERT_TRUE (node.network.publish_filter.apply (block_bytes.data (), block_bytes.size ()));
node.block_confirm (block);
node.scheduler.flush ();
election = node.active.election (block->qualified_root ());
ASSERT_NE (nullptr, election);
election->force_confirm ();
Expand Down Expand Up @@ -848,6 +852,7 @@ TEST (active_transactions, fork_filter_cleanup)
.build_shared ();
node1.process_active (fork);
node1.block_processor.flush ();
node1.scheduler.flush ();
}
ASSERT_EQ (1, node1.active.size ());

Expand Down Expand Up @@ -1028,7 +1033,7 @@ TEST (active_transactions, confirmation_consistency)
system.deadline_set (5s);
while (!node.ledger.block_confirmed (node.store.tx_begin_read (), block->hash ()))
{
node.active.insert (block);
node.scheduler.activate (nano::dev_genesis_key.pub, node.store.tx_begin_read ());
ASSERT_NO_ERROR (system.poll (5ms));
}
ASSERT_NO_ERROR (system.poll_until_true (1s, [&node, &block, i] {
Expand Down Expand Up @@ -1128,24 +1133,31 @@ TEST (active_transactions, insertion_prioritization)
};

node.block_confirm (blocks[2]);
node.scheduler.flush ();
ASSERT_TRUE (node.active.election (blocks[2]->qualified_root ())->prioritized ());
update_active_multiplier ();
node.block_confirm (blocks[3]);
node.scheduler.flush ();
ASSERT_FALSE (node.active.election (blocks[3]->qualified_root ())->prioritized ());
update_active_multiplier ();
node.block_confirm (blocks[1]);
node.scheduler.flush ();
ASSERT_TRUE (node.active.election (blocks[1]->qualified_root ())->prioritized ());
update_active_multiplier ();
node.block_confirm (blocks[4]);
node.scheduler.flush ();
ASSERT_FALSE (node.active.election (blocks[4]->qualified_root ())->prioritized ());
update_active_multiplier ();
node.block_confirm (blocks[0]);
node.scheduler.flush ();
ASSERT_TRUE (node.active.election (blocks[0]->qualified_root ())->prioritized ());
update_active_multiplier ();
node.block_confirm (blocks[5]);
node.scheduler.flush ();
ASSERT_FALSE (node.active.election (blocks[5]->qualified_root ())->prioritized ());
update_active_multiplier ();
node.block_confirm (blocks[6]);
node.scheduler.flush ();
ASSERT_FALSE (node.active.election (blocks[6]->qualified_root ())->prioritized ());

ASSERT_EQ (4, node.stats.count (nano::stat::type::election, nano::stat::detail::election_non_priority));
Expand Down Expand Up @@ -1262,6 +1274,7 @@ TEST (active_transactions, election_difficulty_update_old)
auto send1_copy = builder.make_block ().from (*send1).build_shared ();
node.process_active (send1);
node.block_processor.flush ();
node.scheduler.flush ();
ASSERT_EQ (1, node.active.size ());
auto multiplier = node.active.roots.begin ()->multiplier;
{
Expand Down Expand Up @@ -1373,6 +1386,7 @@ TEST (active_transactions, election_difficulty_update_fork)

node.process_active (fork_change);
node.block_processor.flush ();
node.scheduler.flush ();
ASSERT_EQ (1, node.active.size ());
auto multiplier_change = node.active.roots.begin ()->multiplier;
node.process_active (fork_send);
Expand Down Expand Up @@ -1421,6 +1435,7 @@ TEST (active_transactions, confirm_new)
.build_shared ();
node1.process_active (send);
node1.block_processor.flush ();
node1.scheduler.flush ();
ASSERT_EQ (1, node1.active.size ());
auto & node2 = *system.add_node ();
// Add key to node2
Expand Down Expand Up @@ -1516,6 +1531,7 @@ TEST (active_transactions, conflicting_block_vote_existing_election)
auto vote_fork (std::make_shared<nano::vote> (nano::dev_genesis_key.pub, nano::dev_genesis_key.prv, std::numeric_limits<uint64_t>::max (), fork));

ASSERT_EQ (nano::process_result::progress, node.process_local (send).code);
node.scheduler.flush ();
ASSERT_EQ (1, node.active.size ());

// Vote for conflicting block, but the block does not yet exist in the ledger
Expand Down Expand Up @@ -1591,18 +1607,19 @@ TEST (active_transactions, activate_account_chain)
ASSERT_EQ (nano::process_result::progress, node.process (*open).code);
ASSERT_EQ (nano::process_result::progress, node.process (*receive).code);

node.active.activate (nano::dev_genesis_key.pub);
node.scheduler.activate (nano::dev_genesis_key.pub, node.store.tx_begin_read ());
node.scheduler.flush ();
auto election1 = node.active.election (send->qualified_root ());
ASSERT_EQ (1, node.active.size ());
ASSERT_EQ (1, election1->blocks ().count (send->hash ()));
node.active.activate (nano::dev_genesis_key.pub);
node.scheduler.activate (nano::dev_genesis_key.pub, node.store.tx_begin_read ());
auto election2 = node.active.election (send->qualified_root ());
ASSERT_EQ (election2, election1);
election1->force_confirm ();
ASSERT_TIMELY (3s, node.block_confirmed (send->hash ()));
// On cementing, the next election is started
ASSERT_TIMELY (3s, node.active.active (send2->qualified_root ()));
node.active.activate (nano::dev_genesis_key.pub);
node.scheduler.activate (nano::dev_genesis_key.pub, node.store.tx_begin_read ());
auto election3 = node.active.election (send2->qualified_root ());
ASSERT_NE (nullptr, election3);
ASSERT_EQ (1, election3->blocks ().count (send2->hash ()));
Expand All @@ -1611,19 +1628,19 @@ TEST (active_transactions, activate_account_chain)
// On cementing, the next election is started
ASSERT_TIMELY (3s, node.active.active (open->qualified_root ()));
ASSERT_TIMELY (3s, node.active.active (send3->qualified_root ()));
node.active.activate (nano::dev_genesis_key.pub);
node.scheduler.activate (nano::dev_genesis_key.pub, node.store.tx_begin_read ());
auto election4 = node.active.election (send3->qualified_root ());
ASSERT_NE (nullptr, election4);
ASSERT_EQ (1, election4->blocks ().count (send3->hash ()));
node.active.activate (key.pub);
node.scheduler.activate (key.pub, node.store.tx_begin_read ());
auto election5 = node.active.election (open->qualified_root ());
ASSERT_NE (nullptr, election5);
ASSERT_EQ (1, election5->blocks ().count (open->hash ()));
election5->force_confirm ();
ASSERT_TIMELY (3s, node.block_confirmed (open->hash ()));
// Until send3 is also confirmed, the receive block should not activate
std::this_thread::sleep_for (200ms);
node.active.activate (key.pub);
node.scheduler.activate (key.pub, node.store.tx_begin_read ());
election4->force_confirm ();
ASSERT_TIMELY (3s, node.block_confirmed (send3->hash ()));
ASSERT_TIMELY (3s, node.active.active (receive->qualified_root ()));
Expand Down Expand Up @@ -1926,7 +1943,7 @@ TEST (active_transactions, vacancy)
ASSERT_EQ (nano::process_result::progress, node.process (*send).code);
ASSERT_EQ (1, node.active.vacancy ());
ASSERT_EQ (0, node.active.size ());
node.active.activate (nano::dev_genesis_key.pub);
node.scheduler.activate (nano::dev_genesis_key.pub, node.store.tx_begin_read ());
ASSERT_TIMELY (1s, updated);
updated = false;
ASSERT_EQ (0, node.active.vacancy ());
Expand Down
1 change: 1 addition & 0 deletions nano/core_test/bootstrap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1759,6 +1759,7 @@ TEST (bulk, offline_send)
ASSERT_NE (nullptr, send1);
ASSERT_NE (std::numeric_limits<nano::uint256_t>::max (), node1->balance (nano::dev_genesis_key.pub));
node1->block_processor.flush ();
node1->scheduler.flush ();
// Wait to finish election background tasks
ASSERT_TIMELY (10s, node1->active.empty ());
ASSERT_TRUE (node1->block_confirmed (send1->hash ()));
Expand Down
2 changes: 2 additions & 0 deletions nano/core_test/confirmation_height.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -663,9 +663,11 @@ TEST (confirmation_height, conflict_rollback_cemented)
auto channel1 (node1->network.udp_channels.create (node1->network.endpoint ()));
node1->network.process_message (publish1, channel1);
node1->block_processor.flush ();
node1->scheduler.flush ();
auto channel2 (node2->network.udp_channels.create (node1->network.endpoint ()));
node2->network.process_message (publish2, channel2);
node2->block_processor.flush ();
node2->scheduler.flush ();
ASSERT_EQ (1, node1->active.size ());
ASSERT_EQ (1, node2->active.size ());
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
Expand Down
11 changes: 7 additions & 4 deletions nano/core_test/conflicts.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ TEST (conflicts, start_stop)
node1.work_generate_blocking (*send1);
ASSERT_EQ (nano::process_result::progress, node1.process (*send1).code);
ASSERT_EQ (0, node1.active.size ());
node1.active.activate (nano::dev_genesis_key.pub);
node1.scheduler.activate (nano::dev_genesis_key.pub, node1.store.tx_begin_read ());
node1.scheduler.flush ();
auto election1 = node1.active.election (send1->qualified_root ());
ASSERT_EQ (1, node1.active.size ());
ASSERT_NE (nullptr, election1);
Expand All @@ -34,11 +35,11 @@ TEST (conflicts, add_existing)
auto send1 (std::make_shared<nano::send_block> (genesis.hash (), key1.pub, 0, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, 0));
node1.work_generate_blocking (*send1);
ASSERT_EQ (nano::process_result::progress, node1.process (*send1).code);
node1.active.activate (nano::dev_genesis_key.pub);
node1.scheduler.activate (nano::dev_genesis_key.pub, node1.store.tx_begin_read ());
nano::keypair key2;
auto send2 (std::make_shared<nano::send_block> (genesis.hash (), key2.pub, 0, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, 0));
send2->sideband_set ({});
node1.active.activate (nano::dev_genesis_key.pub);
node1.scheduler.activate (nano::dev_genesis_key.pub, node1.store.tx_begin_read ());
auto election1 = node1.active.election (send2->qualified_root ());
ASSERT_EQ (1, node1.active.size ());
auto vote1 (std::make_shared<nano::vote> (key2.pub, key2.prv, 0, send2));
Expand All @@ -64,7 +65,8 @@ TEST (conflicts, add_two)
auto send2 (std::make_shared<nano::send_block> (send1->hash (), key2.pub, 0, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, 0));
node1.work_generate_blocking (*send2);
ASSERT_EQ (nano::process_result::progress, node1.process (*send2).code);
node1.active.activate (nano::dev_genesis_key.pub);
node1.scheduler.activate (nano::dev_genesis_key.pub, node1.store.tx_begin_read ());
node1.scheduler.flush ();
ASSERT_EQ (2, node1.active.size ());
}

Expand Down Expand Up @@ -168,6 +170,7 @@ TEST (conflicts, reprioritize)
nano::send_block send1_copy (*send1);
node1.process_active (send1);
node1.block_processor.flush ();
node1.scheduler.flush ();
{
nano::lock_guard<nano::mutex> guard (node1.active.mutex);
auto existing1 (node1.active.roots.find (send1->qualified_root ()));
Expand Down
11 changes: 8 additions & 3 deletions nano/core_test/election.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ TEST (election, construction)
auto & node = *system.nodes[0];
genesis.open->sideband_set (nano::block_sideband (nano::genesis_account, 0, nano::genesis_amount, 1, nano::seconds_since_epoch (), nano::epoch::epoch_0, false, false, false, nano::epoch::epoch_0));
node.block_confirm (genesis.open);
node.scheduler.flush ();
auto election = node.active.election (genesis.open->qualified_root ());
election->transition_active ();
}
Expand Down Expand Up @@ -49,8 +50,10 @@ TEST (election, quorum_minimum_flip_success)
node1.work_generate_blocking (*send2);
node1.process_active (send1);
node1.block_processor.flush ();
node1.scheduler.flush ();
node1.process_active (send2);
node1.block_processor.flush ();
node1.scheduler.flush ();
auto election = node1.active.election (send1->qualified_root ());
ASSERT_NE (nullptr, election);
ASSERT_EQ (2, election->blocks ().size ());
Expand Down Expand Up @@ -93,8 +96,10 @@ TEST (election, quorum_minimum_flip_fail)
node1.work_generate_blocking (*send2);
node1.process_active (send1);
node1.block_processor.flush ();
node1.scheduler.flush ();
node1.process_active (send2);
node1.block_processor.flush ();
node1.scheduler.flush ();
auto election = node1.active.election (send1->qualified_root ());
ASSERT_NE (nullptr, election);
ASSERT_EQ (2, election->blocks ().size ());
Expand Down Expand Up @@ -126,7 +131,7 @@ TEST (election, quorum_minimum_confirm_success)
node1.work_generate_blocking (*send1);
node1.process_active (send1);
node1.block_processor.flush ();
node1.active.activate (nano::dev_genesis_key.pub);
node1.scheduler.activate (nano::dev_genesis_key.pub, node1.store.tx_begin_read ());
auto election = node1.active.election (send1->qualified_root ());
ASSERT_NE (nullptr, election);
ASSERT_EQ (1, election->blocks ().size ());
Expand Down Expand Up @@ -158,7 +163,7 @@ TEST (election, quorum_minimum_confirm_fail)
node1.work_generate_blocking (*send1);
node1.process_active (send1);
node1.block_processor.flush ();
node1.active.activate (nano::dev_genesis_key.pub);
node1.scheduler.activate (nano::dev_genesis_key.pub, node1.store.tx_begin_read ());
auto election = node1.active.election (send1->qualified_root ());
ASSERT_NE (nullptr, election);
ASSERT_EQ (1, election->blocks ().size ());
Expand Down Expand Up @@ -228,7 +233,7 @@ TEST (election, quorum_minimum_update_weight_before_quorum_checks)
node2.block_processor.flush ();
ASSERT_EQ (node2.ledger.cache.block_count, 4);

node1.active.activate (nano::dev_genesis_key.pub);
node1.scheduler.activate (nano::dev_genesis_key.pub, node1.store.tx_begin_read ());
auto election = node1.active.election (send1->qualified_root ());
ASSERT_NE (nullptr, election);
ASSERT_EQ (1, election->blocks ().size ());
Expand Down
Loading

0 comments on commit c6e52da

Please sign in to comment.