diff --git a/nano/core_test/confirmation_height.cpp b/nano/core_test/confirmation_height.cpp index fab2e8fd97..486b8fec9d 100644 --- a/nano/core_test/confirmation_height.cpp +++ b/nano/core_test/confirmation_height.cpp @@ -40,14 +40,8 @@ TEST (confirmation_height, single) node->block_processor.flush (); system.deadline_set (10s); - while (true) + while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 1) { - auto transaction = node->store.tx_begin_read (); - if (node->ledger.block_confirmed (transaction, send1->hash ())) - { - break; - } - ASSERT_NO_ERROR (system.poll ()); } @@ -142,14 +136,8 @@ TEST (confirmation_height, multiple_accounts) node->block_processor.flush (); system.deadline_set (10s); - while (true) + while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 10) { - auto transaction = node->store.tx_begin_read (); - if (node->ledger.block_confirmed (transaction, receive3->hash ())) - { - break; - } - ASSERT_NO_ERROR (system.poll ()); } @@ -157,6 +145,7 @@ TEST (confirmation_height, multiple_accounts) nano::confirmation_height_info confirmation_height_info; auto & store = node->store; auto transaction = node->store.tx_begin_read (); + ASSERT_TRUE (node->ledger.block_confirmed (transaction, receive3->hash ())); ASSERT_FALSE (store.account_get (transaction, nano::test_genesis_key.pub, account_info)); ASSERT_FALSE (node->store.confirmation_height_get (transaction, nano::test_genesis_key.pub, confirmation_height_info)); ASSERT_EQ (4, confirmation_height_info.height); @@ -325,14 +314,8 @@ TEST (confirmation_height, gap_live) node->block_processor.flush (); system.deadline_set (10s); - while (true) + while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 6) { - auto transaction = node->store.tx_begin_read (); - if (node->ledger.block_confirmed (transaction, receive2->hash ())) - { - break; - } - ASSERT_NO_ERROR (system.poll ()); } @@ -406,18 +389,13 @@ TEST (confirmation_height, send_receive_between_2_accounts) node->block_processor.flush (); system.deadline_set (10s); - while (true) + while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 10) { - auto transaction = node->store.tx_begin_read (); - if (node->ledger.block_confirmed (transaction, receive4->hash ())) - { - break; - } - ASSERT_NO_ERROR (system.poll ()); } auto transaction (node->store.tx_begin_read ()); + ASSERT_TRUE (node->ledger.block_confirmed (transaction, receive4->hash ())); nano::account_info account_info; nano::confirmation_height_info confirmation_height_info; ASSERT_FALSE (node->store.account_get (transaction, nano::test_genesis_key.pub, account_info)); @@ -474,18 +452,13 @@ TEST (confirmation_height, send_receive_self) node->block_confirm (receive3); system.deadline_set (10s); - while (true) + while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 6) { - auto transaction = node->store.tx_begin_read (); - if (node->ledger.block_confirmed (transaction, receive3->hash ())) - { - break; - } - ASSERT_NO_ERROR (system.poll ()); } auto transaction (node->store.tx_begin_read ()); + ASSERT_TRUE (node->ledger.block_confirmed (transaction, receive3->hash ())); nano::account_info account_info; ASSERT_FALSE (node->store.account_get (transaction, nano::test_genesis_key.pub, account_info)); nano::confirmation_height_info confirmation_height_info; @@ -569,18 +542,13 @@ TEST (confirmation_height, all_block_types) node->block_confirm (state_send2); system.deadline_set (10s); - while (true) + while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 15) { - auto transaction = node->store.tx_begin_read (); - if (node->ledger.block_confirmed (transaction, state_send2->hash ())) - { - break; - } - ASSERT_NO_ERROR (system.poll ()); } auto transaction (node->store.tx_begin_read ()); + ASSERT_TRUE (node->ledger.block_confirmed (transaction, state_send2->hash ())); nano::account_info account_info; nano::confirmation_height_info confirmation_height_info; ASSERT_FALSE (node->store.account_get (transaction, nano::test_genesis_key.pub, account_info)); @@ -688,19 +656,18 @@ TEST (confirmation_height, observers) node1->process_active (send1); node1->block_processor.flush (); - bool confirmed (false); system.deadline_set (10s); - while (!confirmed) + while (node1->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 1) { - auto transaction = node1->store.tx_begin_read (); - confirmed = node1->ledger.block_confirmed (transaction, send1->hash ()); ASSERT_NO_ERROR (system.poll ()); } + auto transaction = node1->store.tx_begin_read (); + ASSERT_TRUE (node1->ledger.block_confirmed (transaction, send1->hash ())); ASSERT_EQ (1, node1->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in)); ASSERT_EQ (1, node1->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out)); } -// This tests when a read has been done and the block no longer exists by the time a write is done +// This tests when a read has been done, but the block no longer exists by the time a write is done TEST (confirmation_height, modified_chain) { nano::system system; @@ -721,18 +688,23 @@ TEST (confirmation_height, modified_chain) } node->confirmation_height_processor.add (send->hash ()); - { // The write guard prevents the confirmation height processor doing any writes + system.deadline_set (10s); auto write_guard = node->write_database_queue.wait (nano::writer::testing); while (!node->write_database_queue.contains (nano::writer::confirmation_height)) - ; + { + ASSERT_NO_ERROR (system.poll ()); + } store.block_del (store.tx_begin_write (), send->hash (), send->type ()); } + system.deadline_set (10s); while (node->write_database_queue.contains (nano::writer::confirmation_height)) - ; + { + ASSERT_NO_ERROR (system.poll ()); + } ASSERT_EQ (1, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::invalid_block, nano::stat::dir::in)); } @@ -763,23 +735,14 @@ TEST (confirmation_height, pending_observer_callbacks) node->confirmation_height_processor.add (send1->hash ()); - while (true) - { - if (node->pending_confirmation_height.size () == 0) - { - break; - } - } - node->confirmation_height_processor.add (send.hash ()); - system.deadline_set (5s); - while (node->ledger.cache.cemented_count < 3) + system.deadline_set (10s); + while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 1 || node->ledger.stats.count (nano::stat::type::observer, nano::stat::detail::all, nano::stat::dir::out) != 1) { ASSERT_NO_ERROR (system.poll ()); } - // Confirm the callback is not called under this circumstance + // Confirm the callback is not called under this circumstance because there is no election information ASSERT_EQ (2, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in)); - ASSERT_EQ (0, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out)); } TEST (confirmation_height, prioritize_frontiers) @@ -1039,6 +1002,18 @@ TEST (confirmation_height, callback_confirmed_history) auto transaction = node->store.tx_begin_read (); ASSERT_TRUE (node->ledger.block_confirmed (transaction, send->hash ())); + system.deadline_set (10s); + while (node->active.size () > 0) + { + ASSERT_NO_ERROR (system.poll ()); + } + + system.deadline_set (10s); + while (node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_quorum, nano::stat::dir::out) != 1) + { + ASSERT_NO_ERROR (system.poll ()); + } + ASSERT_EQ (1, node->active.list_confirmed ().size ()); ASSERT_EQ (0, node->active.blocks.size ()); @@ -1048,8 +1023,7 @@ TEST (confirmation_height, callback_confirmed_history) ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_quorum, nano::stat::dir::out)); ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_inactive, nano::stat::dir::out)); - nano::lock_guard guard (node->active.mutex); - ASSERT_EQ (0, node->active.pending_conf_height.size ()); + ASSERT_EQ (0, node->active.election_winner_details_size ()); } namespace nano @@ -1089,14 +1063,14 @@ TEST (confirmation_height, dependent_election) } system.deadline_set (10s); - while (node->pending_confirmation_height.size () != 1) + while (node->confirmation_height_processor.awaiting_processing_size () != 1) { ASSERT_NO_ERROR (system.poll ()); } { - nano::lock_guard guard (node->pending_confirmation_height.mutex); - ASSERT_EQ (*node->pending_confirmation_height.pending.begin (), send2->hash ()); + nano::lock_guard guard (node->confirmation_height_processor.mutex); + ASSERT_EQ (*node->confirmation_height_processor.awaiting_processing.begin (), send2->hash ()); } // Now put the other block in active so it can be confirmed as a dependent election @@ -1104,20 +1078,166 @@ TEST (confirmation_height, dependent_election) node->confirmation_height_processor.unpause (); system.deadline_set (10s); - while (node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_quorum, nano::stat::dir::out) != 1 && node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_conf_height, nano::stat::dir::out) != 1) + while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 3) { ASSERT_NO_ERROR (system.poll ()); } - ASSERT_EQ (3, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out)); + ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_quorum, nano::stat::dir::out)); + ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_conf_height, nano::stat::dir::out)); ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_inactive, nano::stat::dir::out)); ASSERT_EQ (3, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in)); - nano::lock_guard guard (node->active.mutex); - ASSERT_EQ (0, node->active.pending_conf_height.size ()); + ASSERT_EQ (0, node->active.election_winner_details_size ()); +} + +// This test checks that a receive block with uncemented blocks below cements them too. +TEST (confirmation_height, cemented_gap_below_receive) +{ + nano::system system; + nano::node_config node_config (nano::get_available_port (), system.logging); + node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; + auto node = system.add_node (node_config); + + system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); + nano::block_hash latest (node->latest (nano::test_genesis_key.pub)); + + nano::keypair key1; + system.wallet (0)->insert_adhoc (key1.prv); + + nano::send_block send (latest, key1.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (latest)); + nano::send_block send1 (send.hash (), key1.pub, nano::genesis_amount - nano::Gxrb_ratio * 2, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send.hash ())); + nano::keypair dummy_key; + nano::send_block dummy_send (send1.hash (), dummy_key.pub, nano::genesis_amount - nano::Gxrb_ratio * 3, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send1.hash ())); + + nano::open_block open (send.hash (), nano::genesis_account, key1.pub, key1.prv, key1.pub, *system.work.generate (key1.pub)); + nano::receive_block receive1 (open.hash (), send1.hash (), key1.prv, key1.pub, *system.work.generate (open.hash ())); + nano::send_block send2 (receive1.hash (), nano::test_genesis_key.pub, nano::Gxrb_ratio, key1.prv, key1.pub, *system.work.generate (receive1.hash ())); + + nano::receive_block receive2 (dummy_send.hash (), send2.hash (), nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (dummy_send.hash ())); + nano::send_block dummy_send1 (receive2.hash (), dummy_key.pub, nano::genesis_amount - nano::Gxrb_ratio * 3, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (receive2.hash ())); + + nano::keypair key2; + system.wallet (0)->insert_adhoc (key2.prv); + nano::send_block send3 (dummy_send1.hash (), key2.pub, nano::genesis_amount - nano::Gxrb_ratio * 4, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (dummy_send1.hash ())); + nano::send_block dummy_send2 (send3.hash (), dummy_key.pub, nano::genesis_amount - nano::Gxrb_ratio * 5, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send3.hash ())); + + auto open1 = std::make_shared (send3.hash (), nano::genesis_account, key2.pub, key2.prv, key2.pub, *system.work.generate (key2.pub)); + + { + auto transaction = node->store.tx_begin_write (); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send1).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, dummy_send).code); + + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, open).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, receive1).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send2).code); + + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, receive2).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, dummy_send1).code); + + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send3).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, dummy_send2).code); + + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *open1).code); + } + + add_callback_stats (*node); + + node->block_confirm (open1); + system.deadline_set (10s); + while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 10) + { + ASSERT_NO_ERROR (system.poll ()); + } + + auto transaction = node->store.tx_begin_read (); + ASSERT_TRUE (node->ledger.block_confirmed (transaction, open1->hash ())); + ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_quorum, nano::stat::dir::out)); + ASSERT_EQ (0, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_conf_height, nano::stat::dir::out)); + ASSERT_EQ (9, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_inactive, nano::stat::dir::out)); + ASSERT_EQ (10, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in)); +} + +// This test checks that a receive block with uncemented blocks below cements them too, compared with the test above, this +// is the first write in this chain. +TEST (confirmation_height, cemented_gap_below_no_cache) +{ + nano::system system; + nano::node_config node_config (nano::get_available_port (), system.logging); + node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; + auto node = system.add_node (node_config); + + system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); + nano::block_hash latest (node->latest (nano::test_genesis_key.pub)); + + nano::keypair key1; + system.wallet (0)->insert_adhoc (key1.prv); + + nano::send_block send (latest, key1.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (latest)); + nano::send_block send1 (send.hash (), key1.pub, nano::genesis_amount - nano::Gxrb_ratio * 2, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send.hash ())); + nano::keypair dummy_key; + nano::send_block dummy_send (send1.hash (), dummy_key.pub, nano::genesis_amount - nano::Gxrb_ratio * 3, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send1.hash ())); + + nano::open_block open (send.hash (), nano::genesis_account, key1.pub, key1.prv, key1.pub, *system.work.generate (key1.pub)); + nano::receive_block receive1 (open.hash (), send1.hash (), key1.prv, key1.pub, *system.work.generate (open.hash ())); + nano::send_block send2 (receive1.hash (), nano::test_genesis_key.pub, nano::Gxrb_ratio, key1.prv, key1.pub, *system.work.generate (receive1.hash ())); + + nano::receive_block receive2 (dummy_send.hash (), send2.hash (), nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (dummy_send.hash ())); + nano::send_block dummy_send1 (receive2.hash (), dummy_key.pub, nano::genesis_amount - nano::Gxrb_ratio * 3, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (receive2.hash ())); + + nano::keypair key2; + system.wallet (0)->insert_adhoc (key2.prv); + nano::send_block send3 (dummy_send1.hash (), key2.pub, nano::genesis_amount - nano::Gxrb_ratio * 4, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (dummy_send1.hash ())); + nano::send_block dummy_send2 (send3.hash (), dummy_key.pub, nano::genesis_amount - nano::Gxrb_ratio * 5, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send3.hash ())); + + auto open1 = std::make_shared (send3.hash (), nano::genesis_account, key2.pub, key2.prv, key2.pub, *system.work.generate (key2.pub)); + + { + auto transaction = node->store.tx_begin_write (); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send1).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, dummy_send).code); + + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, open).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, receive1).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send2).code); + + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, receive2).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, dummy_send1).code); + + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send3).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, dummy_send2).code); + + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *open1).code); + } + + // Force some blocks to be cemented so that the cached confirmed info variable is empty + { + auto transaction (node->store.tx_begin_write ()); + node->store.confirmation_height_put (transaction, nano::genesis_account, nano::confirmation_height_info{ 3, send1.hash () }); + node->store.confirmation_height_put (transaction, key1.pub, nano::confirmation_height_info{ 2, receive1.hash () }); + } + + add_callback_stats (*node); + + node->block_confirm (open1); + system.deadline_set (10s); + while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 6) + { + ASSERT_NO_ERROR (system.poll ()); + } + + auto transaction = node->store.tx_begin_read (); + ASSERT_TRUE (node->ledger.block_confirmed (transaction, open1->hash ())); + ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_quorum, nano::stat::dir::out)); + ASSERT_EQ (0, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_conf_height, nano::stat::dir::out)); + ASSERT_EQ (5, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_inactive, nano::stat::dir::out)); + ASSERT_EQ (6, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in)); } -TEST (confirmation_height, dependent_election_after_already_cemented) +TEST (confirmation_height, election_winner_details_clearing) { nano::system system; nano::node_config node_config (nano::get_available_port (), system.logging); @@ -1130,60 +1250,65 @@ TEST (confirmation_height, dependent_election_after_already_cemented) nano::keypair key1; auto send = std::make_shared (latest, key1.pub, nano::genesis_amount - nano::Gxrb_ratio, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (latest)); auto send1 = std::make_shared (send->hash (), key1.pub, nano::genesis_amount - nano::Gxrb_ratio * 2, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send->hash ())); + nano::send_block send2 (send1->hash (), key1.pub, nano::genesis_amount - nano::Gxrb_ratio * 3, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (send1->hash ())); { auto transaction = node->store.tx_begin_write (); ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *send).code); ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, *send1).code); + ASSERT_EQ (nano::process_result::progress, node->ledger.process (transaction, send2).code); } add_callback_stats (*node); + node->block_confirm (send1); + system.deadline_set (10s); + while (node->active.size () > 0) { - node->block_confirm (send1); - auto write_guard = node->write_database_queue.wait (nano::writer::testing); - system.deadline_set (10s); - while (node->active.size () > 0) - { - ASSERT_NO_ERROR (system.poll ()); - } - - ASSERT_EQ (0, node->active.list_confirmed ().size ()); - { - nano::lock_guard guard (node->active.mutex); - ASSERT_EQ (0, node->active.blocks.size ()); - } + ASSERT_NO_ERROR (system.poll ()); + } - auto transaction = node->store.tx_begin_read (); - ASSERT_FALSE (node->ledger.block_confirmed (transaction, send->hash ())); + ASSERT_EQ (0, node->active.list_confirmed ().size ()); + { + nano::lock_guard guard (node->active.mutex); + ASSERT_EQ (0, node->active.blocks.size ()); + } - system.deadline_set (10s); - while (!node->write_database_queue.contains (nano::writer::confirmation_height)) - { - ASSERT_NO_ERROR (system.poll ()); - } + system.deadline_set (10s); + while (node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out) != 2) + { + ASSERT_NO_ERROR (system.poll ()); + } - node->block_confirm (send); - system.deadline_set (10s); - while (node->active.size () > 0) - { - ASSERT_NO_ERROR (system.poll ()); - } + ASSERT_EQ (0, node->active.election_winner_details_size ()); + node->block_confirm (send); + system.deadline_set (10s); + while (node->active.size () > 0) + { + ASSERT_NO_ERROR (system.poll ()); } + // Wait until this block is confirmed system.deadline_set (10s); - while (node->pending_confirmation_height.size () != 0) + while (node->active.election_winner_details_size () != 1 && !node->confirmation_height_processor.current ().is_zero ()) { ASSERT_NO_ERROR (system.poll ()); } + ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_inactive, nano::stat::dir::out)); + + // election_winner_details should get cleared during another batch of cementing, so add another block + node->confirmation_height_processor.add (send2.hash ()); + system.deadline_set (10s); - nano::unique_lock lk (node->active.mutex); - while (node->active.pending_conf_height.size () > 0) + while (node->active.election_winner_details_size () > 0) { - lk.unlock (); ASSERT_NO_ERROR (system.poll ()); - lk.lock (); } + + ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_inactive, nano::stat::dir::out)); + ASSERT_EQ (2, node->stats.count (nano::stat::type::http_callback, nano::stat::detail::http_callback, nano::stat::dir::out)); + ASSERT_EQ (1, node->stats.count (nano::stat::type::observer, nano::stat::detail::observer_confirmation_active_quorum, nano::stat::dir::out)); + ASSERT_EQ (3, node->stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in)); } } diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 8f7eeb005f..49f5b4f5a3 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -1720,10 +1720,14 @@ TEST (node, broadcast_elected) node_flags.disable_tcp_realtime = true; node_flags.disable_bootstrap_listener = true; } - nano::system system (3, type, node_flags); - auto node0 (system.nodes[0]); - auto node1 (system.nodes[1]); - auto node2 (system.nodes[2]); + nano::system system; + nano::node_config node_config (nano::get_available_port (), system.logging); + node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled; + auto node0 = system.add_node (node_config, node_flags, type); + node_config.peering_port = nano::get_available_port (); + auto node1 = system.add_node (node_config, node_flags, type); + node_config.peering_port = nano::get_available_port (); + auto node2 = system.add_node (node_config, node_flags, type); nano::keypair rep_big; nano::keypair rep_small; nano::keypair rep_other; diff --git a/nano/node/active_transactions.cpp b/nano/node/active_transactions.cpp index 6e0ec1cc4f..12e70b2a07 100644 --- a/nano/node/active_transactions.cpp +++ b/nano/node/active_transactions.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -10,7 +11,8 @@ using namespace std::chrono; -nano::active_transactions::active_transactions (nano::node & node_a) : +nano::active_transactions::active_transactions (nano::node & node_a, nano::confirmation_height_processor & confirmation_height_processor_a) : +confirmation_height_processor (confirmation_height_processor_a), node (node_a), multipliers_cb (20, 1.), trended_active_difficulty (node_a.network_params.network.publish_threshold), @@ -27,6 +29,16 @@ thread ([this]() { request_loop (); }) { + // Register a callback which will get called after a block is cemented + confirmation_height_processor.add_cemented_observer ([this](nano::confirmation_height_processor::callback_data const & callback_data) { + this->block_cemented_callback (callback_data.block, callback_data.sideband); + }); + + // Register a callback which will get called after a batch of blocks is written and observer calls finished + confirmation_height_processor.add_cemented_batch_finished_observer ([this]() { + this->cemented_batch_finished_callback (); + }); + assert (min_time_between_requests > std::chrono::milliseconds (node.network_params.network.request_interval_ms)); assert (min_time_between_floods > std::chrono::milliseconds (node.network_params.network.request_interval_ms)); nano::unique_lock lock (mutex); @@ -90,7 +102,7 @@ void nano::active_transactions::search_frontiers (nano::transaction const & tran error = node.store.confirmation_height_get (transaction_a, cementable_account.account, confirmation_height_info); release_assert (!error); - if (info.block_count > confirmation_height_info.height && !this->node.pending_confirmation_height.is_processing_block (info.head)) + if (info.block_count > confirmation_height_info.height && !this->confirmation_height_processor.is_processing_block (info.head)) { auto block (this->node.store.block_get (transaction_a, info.head)); if (!this->start (block, true)) @@ -112,49 +124,87 @@ void nano::active_transactions::search_frontiers (nano::transaction const & tran next_frontier_check = steady_clock::now () + (agressive_factor / test_network_factor); } } -void nano::active_transactions::post_confirmation_height_set (nano::transaction const & transaction_a, std::shared_ptr block_a, nano::block_sideband const & sideband_a, nano::election_status_type election_status_type_a) + +void nano::active_transactions::block_cemented_callback (std::shared_ptr const & block_a, nano::block_sideband const & sideband_a) { - if (election_status_type_a == nano::election_status_type::inactive_confirmation_height) + auto transaction = node.store.tx_begin_read (); + + boost::optional election_status_type; + if (!confirmation_height_processor.is_processing_block (block_a->hash ())) { - nano::account account (0); - nano::uint128_t amount (0); - bool is_state_send (false); - nano::account pending_account (0); - node.process_confirmed_data (transaction_a, block_a, block_a->hash (), sideband_a, account, amount, is_state_send, pending_account); - node.observers.blocks.notify (nano::election_status{ block_a, 0, std::chrono::duration_cast (std::chrono::system_clock::now ().time_since_epoch ()), std::chrono::duration_values::zero (), 0, 1, 0, nano::election_status_type::inactive_confirmation_height }, account, amount, is_state_send); + election_status_type = confirm_block (transaction, block_a); } else { - auto hash (block_a->hash ()); - nano::lock_guard lock (mutex); - auto existing (pending_conf_height.find (hash)); - if (existing != pending_conf_height.end ()) + // This block was explicitly added to the confirmation height_processor + election_status_type = nano::election_status_type::active_confirmed_quorum; + } + + if (election_status_type.is_initialized ()) + { + if (election_status_type == nano::election_status_type::inactive_confirmation_height) { - auto election = existing->second; - if (election->confirmed && !election->stopped && election->status.winner->hash () == hash) + nano::account account (0); + nano::uint128_t amount (0); + bool is_state_send (false); + nano::account pending_account (0); + node.process_confirmed_data (transaction, block_a, block_a->hash (), sideband_a, account, amount, is_state_send, pending_account); + node.observers.blocks.notify (nano::election_status{ block_a, 0, std::chrono::duration_cast (std::chrono::system_clock::now ().time_since_epoch ()), std::chrono::duration_values::zero (), 0, 1, 0, nano::election_status_type::inactive_confirmation_height }, account, amount, is_state_send); + } + else + { + auto hash (block_a->hash ()); + nano::lock_guard lock (mutex); + auto existing (election_winner_details.find (hash)); + if (existing != election_winner_details.end ()) { - add_confirmed (existing->second->status, block_a->qualified_root ()); - - node.receive_confirmed (transaction_a, block_a, hash); - nano::account account (0); - nano::uint128_t amount (0); - bool is_state_send (false); - nano::account pending_account (0); - node.process_confirmed_data (transaction_a, block_a, hash, sideband_a, account, amount, is_state_send, pending_account); - election->status.type = election_status_type_a; - election->status.confirmation_request_count = election->confirmation_request_count; - node.observers.blocks.notify (election->status, account, amount, is_state_send); - if (amount > 0) + auto election = existing->second; + if (election->confirmed && !election->stopped && election->status.winner->hash () == hash) { - node.observers.account_balance.notify (account, false); - if (!pending_account.is_zero ()) + add_confirmed (existing->second->status, block_a->qualified_root ()); + + node.receive_confirmed (transaction, block_a, hash); + nano::account account (0); + nano::uint128_t amount (0); + bool is_state_send (false); + nano::account pending_account (0); + node.process_confirmed_data (transaction, block_a, hash, sideband_a, account, amount, is_state_send, pending_account); + election->status.type = *election_status_type; + election->status.confirmation_request_count = election->confirmation_request_count; + node.observers.blocks.notify (election->status, account, amount, is_state_send); + if (amount > 0) { - node.observers.account_balance.notify (pending_account, true); + node.observers.account_balance.notify (account, false); + if (!pending_account.is_zero ()) + { + node.observers.account_balance.notify (pending_account, true); + } } } + + election_winner_details.erase (hash); } + } + } +} - pending_conf_height.erase (hash); +void nano::active_transactions::cemented_batch_finished_callback () +{ + // Depending on timing there is a situation where the election_winner_details is not reset. + // This can happen when a block wins an election, and the block is confirmed + observer + // called before the block hash gets added to election_winner_details. If the block is confirmed + // callbacks have already been done, so we can safely just remove it. + auto transaction = node.store.tx_begin_read (); + nano::lock_guard guard (mutex); + for (auto it = election_winner_details.begin (); it != election_winner_details.end ();) + { + if (node.ledger.block_confirmed (transaction, it->first)) + { + it = election_winner_details.erase (it); + } + else + { + ++it; } } } @@ -220,7 +270,7 @@ void nano::active_transactions::request_confirm (nano::unique_lock & // Due to the confirmation height processor working asynchronously and compressing several roots into one frontier, probably_unconfirmed_frontiers can be wrong { - auto pending_confirmation_height_size (node.pending_confirmation_height.size ()); + auto pending_confirmation_height_size (confirmation_height_processor.awaiting_processing_size ()); bool probably_unconfirmed_frontiers (node.ledger.cache.block_count > node.ledger.cache.cemented_count + roots.size () + pending_confirmation_height_size); bool bootstrap_weight_reached (node.ledger.cache.block_count >= node.ledger.bootstrap_weight_max_blocks); if (node.config.frontiers_confirmation != nano::frontiers_confirmation_mode::disabled && bootstrap_weight_reached && probably_unconfirmed_frontiers && pending_confirmation_height_size < confirmed_frontiers_max_pending_cut_off) @@ -344,7 +394,7 @@ void nano::active_transactions::request_loop () void nano::active_transactions::prioritize_account_for_confirmation (nano::active_transactions::prioritize_num_uncemented & cementable_frontiers_a, size_t & cementable_frontiers_size_a, nano::account const & account_a, nano::account_info const & info_a, uint64_t confirmation_height) { - if (info_a.block_count > confirmation_height && !node.pending_confirmation_height.is_processing_block (info_a.head)) + if (info_a.block_count > confirmation_height && !confirmation_height_processor.is_processing_block (info_a.head)) { auto num_uncemented = info_a.block_count - confirmation_height; nano::lock_guard guard (mutex); @@ -386,7 +436,7 @@ void nano::active_transactions::prioritize_account_for_confirmation (nano::activ void nano::active_transactions::prioritize_frontiers_for_confirmation (nano::transaction const & transaction_a, std::chrono::milliseconds ledger_accounts_time_a, std::chrono::milliseconds wallet_account_time_a) { // Don't try to prioritize when there are a large number of pending confirmation heights as blocks can be cemented in the meantime, making the prioritization less reliable - if (node.pending_confirmation_height.size () < confirmed_frontiers_max_pending_cut_off) + if (confirmation_height_processor.awaiting_processing_size () < confirmed_frontiers_max_pending_cut_off) { size_t priority_cementable_frontiers_size; size_t priority_wallet_cementable_frontiers_size; @@ -884,12 +934,6 @@ bool nano::active_transactions::publish (std::shared_ptr block_a) return result; } -void nano::active_transactions::clear_block (nano::block_hash const & hash_a) -{ - nano::lock_guard guard (mutex); - pending_conf_height.erase (hash_a); -} - // Returns the type of election status requiring callbacks calling later boost::optional nano::active_transactions::confirm_block (nano::transaction const & transaction_a, std::shared_ptr block_a) { @@ -1030,6 +1074,12 @@ std::chrono::steady_clock::time_point nano::active_transactions::find_dropped_el } } +size_t nano::active_transactions::election_winner_details_size () +{ + nano::lock_guard guard (mutex); + return election_winner_details.size (); +} + nano::cementable_account::cementable_account (nano::account const & account_a, size_t blocks_uncemented_a) : account (account_a), blocks_uncemented (blocks_uncemented_a) { @@ -1040,20 +1090,18 @@ std::unique_ptr nano::collect_container_info (ac size_t roots_count; size_t blocks_count; size_t confirmed_count; - size_t pending_conf_height_count; { nano::lock_guard guard (active_transactions.mutex); roots_count = active_transactions.roots.size (); blocks_count = active_transactions.blocks.size (); confirmed_count = active_transactions.confirmed.size (); - pending_conf_height_count = active_transactions.pending_conf_height.size (); } auto composite = std::make_unique (name); composite->add_component (std::make_unique (container_info{ "roots", roots_count, sizeof (decltype (active_transactions.roots)::value_type) })); composite->add_component (std::make_unique (container_info{ "blocks", blocks_count, sizeof (decltype (active_transactions.blocks)::value_type) })); - composite->add_component (std::make_unique (container_info{ "pending_conf_height", pending_conf_height_count, sizeof (decltype (active_transactions.pending_conf_height)::value_type) })); + composite->add_component (std::make_unique (container_info{ "election_winner_details", active_transactions.election_winner_details_size (), sizeof (decltype (active_transactions.election_winner_details)::value_type) })); composite->add_component (std::make_unique (container_info{ "confirmed", confirmed_count, sizeof (decltype (active_transactions.confirmed)::value_type) })); composite->add_component (std::make_unique (container_info{ "priority_wallet_cementable_frontiers_count", active_transactions.priority_wallet_cementable_frontiers_size (), sizeof (nano::cementable_account) })); composite->add_component (std::make_unique (container_info{ "priority_cementable_frontiers_count", active_transactions.priority_cementable_frontiers_size (), sizeof (nano::cementable_account) })); diff --git a/nano/node/active_transactions.hpp b/nano/node/active_transactions.hpp index 1b390d77a6..af2ebee723 100644 --- a/nano/node/active_transactions.hpp +++ b/nano/node/active_transactions.hpp @@ -34,6 +34,7 @@ class block_sideband; class election; class vote; class transaction; +class confirmation_height_processor; class conflict_info final { @@ -72,7 +73,7 @@ class active_transactions final // clang-format on public: - explicit active_transactions (nano::node &); + explicit active_transactions (nano::node &, nano::confirmation_height_processor &); ~active_transactions (); // Start an election for a block // Call action with confirmed block, may be different than what we started with @@ -96,7 +97,8 @@ class active_transactions final void stop (); bool publish (std::shared_ptr block_a); boost::optional confirm_block (nano::transaction const &, std::shared_ptr); - void post_confirmation_height_set (nano::transaction const & transaction_a, std::shared_ptr block_a, nano::block_sideband const & sideband_a, nano::election_status_type election_status_type_a); + void block_cemented_callback (std::shared_ptr const & block_a, nano::block_sideband const & sideband_a); + void cemented_batch_finished_callback (); // clang-format off boost::multi_index_container multipliers_cb; @@ -122,8 +125,8 @@ class active_transactions final size_t priority_wallet_cementable_frontiers_size (); boost::circular_buffer difficulty_trend (); size_t inactive_votes_cache_size (); - std::unordered_map> pending_conf_height; - void clear_block (nano::block_hash const & hash_a); + std::unordered_map> election_winner_details; + size_t election_winner_details_size (); void add_dropped_elections_cache (nano::qualified_root const &); std::chrono::steady_clock::time_point find_dropped_elections_cache (nano::qualified_root const &); size_t dropped_elections_cache_size (); diff --git a/nano/node/cli.cpp b/nano/node/cli.cpp index b2025f25b9..586e8cbf85 100644 --- a/nano/node/cli.cpp +++ b/nano/node/cli.cpp @@ -1172,7 +1172,7 @@ void reset_confirmation_heights (nano::block_store & store) // Then make sure the confirmation height of the genesis account open block is 1 nano::network_params network_params; - store.confirmation_height_put (transaction, network_params.ledger.genesis_account, { 1, network_params.ledger.genesis_block }); + store.confirmation_height_put (transaction, network_params.ledger.genesis_account, { 1, network_params.ledger.genesis_hash }); } bool is_using_rocksdb (boost::filesystem::path const & data_path, std::error_code & ec) diff --git a/nano/node/confirmation_height_processor.cpp b/nano/node/confirmation_height_processor.cpp index 24a7eaaee3..774270f858 100644 --- a/nano/node/confirmation_height_processor.cpp +++ b/nano/node/confirmation_height_processor.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include #include #include @@ -15,10 +14,8 @@ #include #include -nano::confirmation_height_processor::confirmation_height_processor (nano::pending_confirmation_height & pending_confirmation_height_a, nano::ledger & ledger_a, nano::active_transactions & active_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logger_mt & logger_a) : -pending_confirmations (pending_confirmation_height_a), +nano::confirmation_height_processor::confirmation_height_processor (nano::ledger & ledger_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logger_mt & logger_a) : ledger (ledger_a), -active (active_a), logger (logger_a), write_database_queue (write_database_queue_a), batch_separate_pending_min_time (batch_separate_pending_min_time_a), @@ -46,44 +43,46 @@ void nano::confirmation_height_processor::stop () void nano::confirmation_height_processor::run () { - nano::unique_lock lk (pending_confirmations.mutex); + nano::unique_lock lk (mutex); while (!stopped) { - if (!paused && !pending_confirmations.pending.empty ()) + if (!paused && !awaiting_processing.empty ()) { - pending_confirmations.current_hash = *pending_confirmations.pending.begin (); - pending_confirmations.pending.erase (pending_confirmations.current_hash); - // Copy the hash so can be used outside owning the lock - auto current_pending_block = pending_confirmations.current_hash; lk.unlock (); if (pending_writes.empty ()) { - // Separate blocks which are pending confirmation height can be batched by a minimum processing time (to improve disk write performance), so make sure the slate is clean when a new batch is starting. - confirmed_iterated_pairs.clear (); + // Separate blocks which are pending confirmation height can be batched by a minimum processing time (to improve lmdb disk write performance), + // so make sure the slate is clean when a new batch is starting. + accounts_confirmed_info.clear (); + accounts_confirmed_info_size = 0; timer.restart (); } - add_confirmation_height (current_pending_block); + set_next_hash (); + process (); lk.lock (); - pending_confirmations.current_hash = 0; } else { - // If there are no blocks pending confirmation, then make sure we flush out the remaining writes + // If there are blocks pending cementing, then make sure we flush out the remaining writes + lk.unlock (); if (!pending_writes.empty ()) { - lk.unlock (); auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height); - write_pending (pending_writes); + cement_blocks (); lk.lock (); + original_hash.clear (); } else { + lk.lock (); + original_hash.clear (); condition.wait (lk); } } } } +// Pausing only affects processing new blocks, not the current one being processed. Currently only used in tests void nano::confirmation_height_processor::pause () { paused = true; @@ -98,95 +97,139 @@ void nano::confirmation_height_processor::unpause () void nano::confirmation_height_processor::add (nano::block_hash const & hash_a) { { - nano::lock_guard lk (pending_confirmations.mutex); - pending_confirmations.pending.insert (hash_a); + nano::lock_guard lk (mutex); + awaiting_processing.insert (hash_a); } condition.notify_one (); } -/** - * For all the blocks below this height which have been implicitly confirmed check if they - * are open/receive blocks, and if so follow the source blocks and iteratively repeat to genesis. - * To limit write locking and to keep the confirmation height ledger correctly synced, confirmations are - * written from the ground upwards in batches. - */ -void nano::confirmation_height_processor::add_confirmation_height (nano::block_hash const & hash_a) +// The next block hash to iterate over, the priority is as follows: +// 1 - The next block in the account chain for the last processed receive (if there is any) +// 2 - The next receive block which is closest to genesis +// 3 - The last checkpoint hit. +// 4 - The hash that was passed in originally. Either all checkpoints were exhausted (this can happen when there are many accounts to genesis) +// or all other blocks have been processed. +nano::confirmation_height_processor::top_and_next_hash nano::confirmation_height_processor::get_next_block (boost::optional const & next_in_receive_chain_a, boost::circular_buffer_space_optimized const & checkpoints_a, boost::circular_buffer_space_optimized const & receive_source_pairs, boost::optional & receive_details_a) { - boost::optional receive_details; - auto current = hash_a; - assert (receive_source_pairs_size == 0); - release_assert (receive_source_pairs.empty ()); - - auto read_transaction (ledger.store.tx_begin_read ()); - auto last_iteration = false; - // Traverse account chain and all sources for receive blocks iteratively - do + top_and_next_hash next; + if (next_in_receive_chain_a.is_initialized ()) + { + next = *next_in_receive_chain_a; + } + else if (!receive_source_pairs.empty ()) + { + auto next_receive_source_pair = receive_source_pairs.back (); + receive_details_a = next_receive_source_pair.receive_details; + next = { next_receive_source_pair.source_hash, receive_details_a->next, receive_details_a->height + 1 }; + } + else if (!checkpoints_a.empty ()) + { + next = { checkpoints_a.back (), boost::none, 0 }; + } + else + { + next = { original_hash, boost::none, 0 }; + } + + return next; +} + +nano::block_hash nano::confirmation_height_processor::get_least_unconfirmed_hash_from_top_level (nano::transaction const & transaction_a, nano::block_hash const & hash_a, nano::account const & account_a, nano::confirmation_height_info const & confirmation_height_info_a, uint64_t & block_height_a) +{ + nano::block_hash least_unconfirmed_hash = hash_a; + nano::block_sideband sideband; + if (confirmation_height_info_a.height != 0) { - if (!receive_source_pairs.empty ()) + if (block_height_a > confirmation_height_info_a.height) { - receive_details = receive_source_pairs.back ().receive_details; - current = receive_source_pairs.back ().source_hash; + release_assert (ledger.store.block_get (transaction_a, confirmation_height_info_a.frontier, &sideband) != nullptr); + least_unconfirmed_hash = sideband.successor; + block_height_a = sideband.height + 1; } - else + } + else + { + // No blocks have been confirmed, so the first block will be the open block + nano::account_info account_info; + release_assert (!ledger.store.account_get (transaction_a, account_a, account_info)); + least_unconfirmed_hash = account_info.open_block; + block_height_a = 1; + } + return least_unconfirmed_hash; +} + +void nano::confirmation_height_processor::set_next_hash () +{ + nano::lock_guard guard (mutex); + assert (!awaiting_processing.empty ()); + original_hash = *awaiting_processing.begin (); + original_hashes_pending.insert (original_hash); + awaiting_processing.erase (original_hash); +} + +void nano::confirmation_height_processor::process () +{ + nano::block_sideband sideband; + nano::confirmation_height_info confirmation_height_info; + auto transaction (ledger.store.tx_begin_read ()); + + boost::optional next_in_receive_chain; + boost::circular_buffer_space_optimized checkpoints{ max_items }; + boost::circular_buffer_space_optimized receive_source_pairs{ max_items }; + nano::block_hash current; + do + { + boost::optional receive_details; + auto hash_to_process = get_next_block (next_in_receive_chain, checkpoints, receive_source_pairs, receive_details); + current = hash_to_process.top; + + auto top_level_hash = current; + nano::account account (ledger.store.block_account (transaction, current)); + release_assert (!ledger.store.confirmation_height_get (transaction, account, confirmation_height_info)); + + // Checks if we have encountered this account before but not commited changes yet, if so then update the cached confirmation height + auto account_it = accounts_confirmed_info.find (account); + if (account_it != accounts_confirmed_info.cend () && account_it->second.confirmed_height > confirmation_height_info.height) { - // If receive_details is set then this is the final iteration and we are back to the original chain. - // We need to confirm any blocks below the original hash (incl self) and the first receive block - // (if the original block is not already a receive) - if (receive_details) - { - current = hash_a; - receive_details = boost::none; - last_iteration = true; - } + confirmation_height_info.height = account_it->second.confirmed_height; + confirmation_height_info.frontier = account_it->second.iterated_frontier; } - auto block_height (ledger.store.block_account_height (read_transaction, current)); - nano::account account (ledger.store.block_account (read_transaction, current)); - nano::confirmation_height_info confirmation_height_info; - release_assert (!ledger.store.confirmation_height_get (read_transaction, account, confirmation_height_info)); - auto confirmation_height = confirmation_height_info.height; - auto iterated_height = confirmation_height; - auto account_it = confirmed_iterated_pairs.find (account); - if (account_it != confirmed_iterated_pairs.cend ()) + nano::block_sideband sideband; + auto block = ledger.store.block_get (transaction, current, &sideband); + assert (block != nullptr); + auto block_height = sideband.height; + bool already_cemented = confirmation_height_info.height >= block_height; + + // If we are not already at the bottom of the account chain (1 above cemented frontier) then find it + if (!already_cemented && block_height - confirmation_height_info.height > 1) { - if (account_it->second.confirmed_height > confirmation_height) + if (block_height - confirmation_height_info.height == 2) { - confirmation_height = account_it->second.confirmed_height; - iterated_height = confirmation_height; + // If there is 1 uncemented block in-between this block and the cemented frontier, + // we can just use the previous block to get the least unconfirmed hash. + current = block->previous (); + --block_height; } - if (account_it->second.iterated_height > iterated_height) + else if (!next_in_receive_chain.is_initialized ()) { - iterated_height = account_it->second.iterated_height; + current = get_least_unconfirmed_hash_from_top_level (transaction, current, account, confirmation_height_info, block_height); } - } - - if (!last_iteration && current == hash_a && confirmation_height >= block_height) - { - auto it = std::find_if (pending_writes.begin (), pending_writes.end (), [&hash_a](auto & conf_height_details) { - auto it = std::find_if (conf_height_details.block_callbacks_required.begin (), conf_height_details.block_callbacks_required.end (), [&hash_a](auto & callback_data) { - return callback_data.block->hash () == hash_a; - }); - return (it != conf_height_details.block_callbacks_required.end ()); - }); - - if (it == pending_writes.end ()) + else { - // This is a block which has been added to the processor but already has its confirmation height set (or about to be set) - // Just need to perform active cleanup, no callbacks are needed. - active.clear_block (hash_a); + // Use the cached successor of the last receive which saves having to do more IO in get_least_unconfirmed_hash_from_top_level + // as we already know what the next block we should process should be. + current = *hash_to_process.next; + block_height = hash_to_process.next_height; } } - auto count_before_receive = receive_source_pairs.size (); - std::vector block_callbacks_required; - if (block_height > iterated_height) - { - if ((block_height - iterated_height) > 20000) - { - logger.always_log ("Iterating over a large account chain for setting confirmation height. The top block: ", current.to_string ()); - } + auto top_most_non_receive_block_hash = current; - collect_unconfirmed_receive_and_sources_for_account (block_height, iterated_height, current, account, read_transaction, block_callbacks_required); + bool hit_receive = false; + if (!already_cemented) + { + hit_receive = iterate (transaction, block_height, current, checkpoints, top_most_non_receive_block_hash, top_level_hash, receive_source_pairs, account); } // Exit early when the processor has been stopped, otherwise this function may take a @@ -196,307 +239,407 @@ void nano::confirmation_height_processor::add_confirmation_height (nano::block_h break; } - // No longer need the read transaction - read_transaction.reset (); + // next_in_receive_chain can be modified when writing, so need to cache it here before resetting + auto is_set = next_in_receive_chain.is_initialized (); + next_in_receive_chain = boost::none; - // If this adds no more open or receive blocks, then we can now confirm this account as well as the linked open/receive block - // Collect as pending any writes to the database and do them in bulk after a certain time. - auto confirmed_receives_pending = (count_before_receive != receive_source_pairs.size ()); - if (!confirmed_receives_pending) + // Need to also handle the case where we are hitting receives where the sends below should be confirmed + if (!hit_receive || (receive_source_pairs.size () == 1 && top_most_non_receive_block_hash != current)) { - if (block_height > confirmation_height) - { - // Check whether the previous block has been seen. If so, the rest of sends below have already been seen so don't count them - if (account_it != confirmed_iterated_pairs.cend ()) - { - account_it->second.confirmed_height = block_height; - if (block_height > iterated_height) - { - account_it->second.iterated_height = block_height; - } - } - else - { - confirmed_iterated_pairs.emplace (account, confirmed_iterated_pair{ block_height, block_height }); - } + preparation_data preparation_data{ transaction, top_most_non_receive_block_hash, already_cemented, checkpoints, account_it, confirmation_height_info, account, block_height, current, receive_details, next_in_receive_chain }; + prepare_iterated_blocks_for_cementing (preparation_data); - pending_writes.emplace_back (account, current, block_height, block_height - confirmation_height, block_callbacks_required); + // If used the top level, don't pop off the receive source pair because it wasn't used + if (!is_set && !receive_source_pairs.empty ()) + { + receive_source_pairs.pop_back (); } - if (receive_details) + auto total_pending_write_block_count = std::accumulate (pending_writes.cbegin (), pending_writes.cend (), uint64_t (0), [](uint64_t total, auto const & write_details_a) { + return total += write_details_a.top_height - write_details_a.bottom_height + 1; + }); + + auto max_batch_write_size_reached = (total_pending_write_block_count >= batch_write_size); + // When there are a lot of pending confirmation height blocks, it is more efficient to + // bulk some of them up to enable better write performance which becomes the bottleneck. + auto min_time_exceeded = (timer.since_start () >= batch_separate_pending_min_time); + auto finished_iterating = current == original_hash; + auto non_awaiting_processing = awaiting_processing_size () == 0; + auto should_output = finished_iterating && (non_awaiting_processing || min_time_exceeded); + auto force_write = pending_writes.size () >= pending_writes_max_size || accounts_confirmed_info.size () >= pending_writes_max_size; + + if (((max_batch_write_size_reached || should_output) && !pending_writes.empty ()) || force_write) { - // Check whether the previous block has been seen. If so, the rest of sends below have already been seen so don't count them - auto const & receive_account = receive_details->account; - auto receive_account_it = confirmed_iterated_pairs.find (receive_account); - if (receive_account_it != confirmed_iterated_pairs.cend ()) + bool error = false; + // If nothing is currently using the database write lock then write the cemented pending blocks otherwise continue iterating + if (write_database_queue.process (nano::writer::confirmation_height)) { - // Get current height - auto current_height = receive_account_it->second.confirmed_height; - receive_account_it->second.confirmed_height = receive_details->height; - receive_details->num_blocks_confirmed = receive_details->height - current_height; + auto scoped_write_guard = write_database_queue.pop (); + error = cement_blocks (); } - else + else if (force_write) { - confirmed_iterated_pairs.emplace (receive_account, confirmed_iterated_pair{ receive_details->height, receive_details->height }); + auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height); + error = cement_blocks (); + } + // Don't set any more cemented blocks from the original hash if an inconsistency is found + if (error) + { + checkpoints.clear (); + break; } - - pending_writes.push_back (*receive_details); } + } + + transaction.refresh (); + } while (!receive_source_pairs.empty () || current != original_hash); + + assert (checkpoints.empty ()); +} + +bool nano::confirmation_height_processor::iterate (nano::read_transaction const & transaction_a, uint64_t bottom_height_a, nano::block_hash const & bottom_hash_a, boost::circular_buffer_space_optimized & checkpoints_a, nano::block_hash & top_most_non_receive_block_hash_a, nano::block_hash const & top_level_hash_a, boost::circular_buffer_space_optimized & receive_source_pairs_a, nano::account const & account_a) +{ + bool reached_target = false; + bool hit_receive = false; + auto hash = bottom_hash_a; + nano::block_sideband sideband; + uint64_t num_blocks = 0; + while (!hash.is_zero () && !reached_target && !stopped) + { + // Keep iterating upwards until we either reach the desired block or the second receive. + // Once a receive is cemented, we can cement all blocks above it until the next receive, so store those details for later. + ++num_blocks; + auto block = ledger.store.block_get (transaction_a, hash, &sideband); + auto source (block->source ()); + if (source.is_zero ()) + { + source = block->link (); + } - if (!receive_source_pairs.empty ()) + if (!source.is_zero () && !ledger.is_epoch_link (source) && ledger.store.source_exists (transaction_a, source)) + { + hit_receive = true; + reached_target = true; + auto next = !sideband.successor.is_zero () && sideband.successor != top_level_hash_a ? boost::optional (sideband.successor) : boost::none; + receive_source_pairs_a.push_back (receive_source_pair{ receive_chain_details{ account_a, sideband.height, hash, top_level_hash_a, next, bottom_height_a, bottom_hash_a }, source }); + // Store a checkpoint every max_items so that we can always traverse a long number of accounts to genesis + if (receive_source_pairs_a.size () % max_items == 0) { - // Pop from the end - receive_source_pairs.erase (receive_source_pairs.end () - 1); - --receive_source_pairs_size; + checkpoints_a.push_back (top_level_hash_a); } } - else if (block_height > iterated_height) + else { - if (account_it != confirmed_iterated_pairs.cend ()) + // Found a send/change/epoch block which isn't the desired top level + top_most_non_receive_block_hash_a = hash; + if (hash == top_level_hash_a) { - account_it->second.iterated_height = block_height; + reached_target = true; } else { - confirmed_iterated_pairs.emplace (account, confirmed_iterated_pair{ confirmation_height, block_height }); + hash = sideband.successor; } } - auto max_write_size_reached = (pending_writes.size () >= batch_write_size); - // When there are a lot of pending confirmation height blocks, it is more efficient to - // bulk some of them up to enable better write performance which becomes the bottleneck. - auto min_time_exceeded = (timer.since_start () >= batch_separate_pending_min_time); - auto finished_iterating = receive_source_pairs.empty (); - auto no_pending = pending_confirmations.size () == 0; - auto should_output = finished_iterating && (no_pending || min_time_exceeded); - - if ((max_write_size_reached || should_output) && !pending_writes.empty ()) + // We could be traversing a very large account so we don't want to open read transactions for too long. + if ((num_blocks > 0) && num_blocks % batch_read_size == 0) { - if (write_database_queue.process (nano::writer::confirmation_height)) - { - auto scoped_write_guard = write_database_queue.pop (); - auto error = write_pending (pending_writes); - // Don't set any more blocks as confirmed from the original hash if an inconsistency is found - if (error) - { - break; - } - } + transaction_a.refresh (); } + } - read_transaction.renew (); - } while (!receive_source_pairs.empty () || current != hash_a); + return hit_receive; } -/* - * Returns true if there was an error in finding one of the blocks to write a confirmation height for, false otherwise - */ -bool nano::confirmation_height_processor::write_pending (std::deque & all_pending_a) +// Once the path to genesis has been iterated to, we can begin to cement the lowest blocks in the accounts. This sets up +// the non-receive blocks which have been iterated for an account, and the associated receive block. +void nano::confirmation_height_processor::prepare_iterated_blocks_for_cementing (preparation_data & preparation_data_a) { - auto total_pending_write_block_count = std::accumulate (all_pending_a.cbegin (), all_pending_a.cend (), uint64_t (0), [](uint64_t total, conf_height_details const & conf_height_details_a) { - return total += conf_height_details_a.num_blocks_confirmed; - }); - - // Write in batches - while (total_pending_write_block_count > 0) + if (!preparation_data_a.already_cemented) { - uint64_t num_accounts_processed = 0; - auto transaction (ledger.store.tx_begin_write ({}, { nano::tables::confirmation_height })); - while (!all_pending_a.empty ()) + // Add the non-receive blocks iterated for this account + auto block_height = (ledger.store.block_account_height (preparation_data_a.transaction, preparation_data_a.top_most_non_receive_block_hash)); + if (block_height > preparation_data_a.confirmation_height_info.height) { - const auto & pending = all_pending_a.front (); - nano::confirmation_height_info confirmation_height_info; - auto error = ledger.store.confirmation_height_get (transaction, pending.account, confirmation_height_info); - release_assert (!error); - auto confirmation_height = confirmation_height_info.height; - if (pending.height > confirmation_height) + confirmed_info confirmed_info_l{ block_height, preparation_data_a.top_most_non_receive_block_hash }; + if (preparation_data_a.account_it != accounts_confirmed_info.cend ()) { -#ifndef NDEBUG - // Do more thorough checking in Debug mode, indicates programming error. - nano::block_sideband sideband; - auto block = ledger.store.block_get (transaction, pending.hash, &sideband); - static nano::network_constants network_constants; - assert (network_constants.is_test_network () || block != nullptr); - assert (network_constants.is_test_network () || sideband.height == pending.height); -#else - auto block = ledger.store.block_get (transaction, pending.hash); -#endif - // Check that the block still exists as there may have been changes outside this processor. - if (!block) - { - logger.always_log ("Failed to write confirmation height for: ", pending.hash.to_string ()); - ledger.stats.inc (nano::stat::type::confirmation_height, nano::stat::detail::invalid_block); - receive_source_pairs.clear (); - receive_source_pairs_size = 0; - all_pending_a.clear (); - return true; - } - - for (auto & callback_data : pending.block_callbacks_required) - { - active.post_confirmation_height_set (transaction, callback_data.block, callback_data.sideband, callback_data.election_status_type); - } - - ledger.stats.add (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in, pending.height - confirmation_height); - assert (pending.num_blocks_confirmed == pending.height - confirmation_height); - confirmation_height = pending.height; - ledger.cache.cemented_count += pending.num_blocks_confirmed; - ledger.store.confirmation_height_put (transaction, pending.account, { confirmation_height, pending.hash }); + preparation_data_a.account_it->second = confirmed_info_l; } - total_pending_write_block_count -= pending.num_blocks_confirmed; - ++num_accounts_processed; - all_pending_a.erase (all_pending_a.begin ()); - - if (num_accounts_processed >= batch_write_size) + else { - // Commit changes periodically to reduce time holding write locks for long chains - break; + accounts_confirmed_info.emplace (preparation_data_a.account, confirmed_info_l); + accounts_confirmed_info_size = accounts_confirmed_info.size (); } + + preparation_data_a.checkpoints.erase (std::remove (preparation_data_a.checkpoints.begin (), preparation_data_a.checkpoints.end (), preparation_data_a.top_most_non_receive_block_hash), preparation_data_a.checkpoints.end ()); + pending_writes.emplace_back (preparation_data_a.account, preparation_data_a.bottom_height, preparation_data_a.bottom_most, block_height, preparation_data_a.top_most_non_receive_block_hash); + ++pending_writes_size; } } - assert (all_pending_a.empty ()); - return false; + + // Add the receive block and all non-receive blocks above that one + auto & receive_details = preparation_data_a.receive_details; + if (receive_details) + { + auto receive_confirmed_info_it = accounts_confirmed_info.find (receive_details->account); + if (receive_confirmed_info_it != accounts_confirmed_info.cend ()) + { + auto & receive_confirmed_info = receive_confirmed_info_it->second; + receive_confirmed_info.confirmed_height = receive_details->height; + receive_confirmed_info.iterated_frontier = receive_details->hash; + } + else + { + accounts_confirmed_info.emplace (receive_details->account, confirmed_info{ receive_details->height, receive_details->hash }); + accounts_confirmed_info_size = accounts_confirmed_info.size (); + } + + if (receive_details->next.is_initialized ()) + { + preparation_data_a.next_in_receive_chain = top_and_next_hash{ receive_details->top_level, receive_details->next, receive_details->height + 1 }; + } + else + { + preparation_data_a.checkpoints.erase (std::remove (preparation_data_a.checkpoints.begin (), preparation_data_a.checkpoints.end (), receive_details->hash), preparation_data_a.checkpoints.end ()); + } + + pending_writes.emplace_back (receive_details->account, receive_details->bottom_height, receive_details->bottom_most, receive_details->height, receive_details->hash); + ++pending_writes_size; + } } -void nano::confirmation_height_processor::collect_unconfirmed_receive_and_sources_for_account (uint64_t block_height_a, uint64_t confirmation_height_a, nano::block_hash const & hash_a, nano::account const & account_a, nano::read_transaction const & transaction_a, std::vector & block_callbacks_required) +bool nano::confirmation_height_processor::cement_blocks () { - auto hash (hash_a); - auto num_to_confirm = block_height_a - confirmation_height_a; - - // Store heights of blocks - constexpr auto height_not_set = std::numeric_limits::max (); - auto next_height = height_not_set; - while ((num_to_confirm > 0) && !hash.is_zero () && !stopped) + // Will contain all blocks that have been cemented (bounded by batch_write_size) + // and will get run through the cemented observer callback + std::vector cemented_blocks; { - nano::block_sideband sideband; - auto block (ledger.store.block_get (transaction_a, hash, &sideband)); - if (block) + // This only writes to the confirmation_height table and is the only place to do so in a single process + auto transaction (ledger.store.tx_begin_write ({}, { nano::tables::confirmation_height })); + + // Cement all pending entries, each entry is specific to an account and contains the least amount + // of blocks to retain consistent cementing across all account chains to genesis. + while (!pending_writes.empty ()) { - if (!pending_confirmations.is_processing_block (hash)) + const auto & pending = pending_writes.front (); + const auto & account = pending.account; + + auto write_confirmation_height = [&account, &ledger = ledger, &transaction](uint64_t num_blocks_cemented, uint64_t confirmation_height, nano::block_hash const & confirmed_frontier) { +#ifndef NDEBUG + // Extra debug checks + nano::confirmation_height_info confirmation_height_info; + assert (!ledger.store.confirmation_height_get (transaction, account, confirmation_height_info)); + nano::block_sideband sideband; + assert (ledger.store.block_get (transaction, confirmed_frontier, &sideband)); + assert (sideband.height == confirmation_height_info.height + num_blocks_cemented); +#endif + ledger.store.confirmation_height_put (transaction, account, nano::confirmation_height_info{ confirmation_height, confirmed_frontier }); + ledger.cache.cemented_count += num_blocks_cemented; + ledger.stats.add (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in, num_blocks_cemented); + }; + + nano::confirmation_height_info confirmation_height_info; + release_assert (!ledger.store.confirmation_height_get (transaction, pending.account, confirmation_height_info)); + + // Some blocks need to be cemented at least + if (pending.top_height > confirmation_height_info.height) { - auto election_status_type = active.confirm_block (transaction_a, block); - if (election_status_type.is_initialized ()) + nano::block_sideband sideband; + // The highest hash which will be cemented + nano::block_hash new_cemented_frontier; + uint64_t num_blocks_confirmed = 0; + uint64_t start_height = 0; + if (pending.bottom_height > confirmation_height_info.height) { - block_callbacks_required.emplace_back (block, sideband, *election_status_type); + new_cemented_frontier = pending.bottom_hash; + // If we are higher than the cemented frontier, we should be exactly 1 block above + assert (pending.bottom_height == confirmation_height_info.height + 1); + num_blocks_confirmed = pending.top_height - pending.bottom_height + 1; + start_height = pending.bottom_height; + } + else + { + auto block = ledger.store.block_get (transaction, confirmation_height_info.frontier, &sideband); + new_cemented_frontier = sideband.successor; + num_blocks_confirmed = pending.top_height - confirmation_height_info.height; + start_height = confirmation_height_info.height + 1; } - } - else - { - // This block is the original which is having its confirmation height set on - block_callbacks_required.emplace_back (block, sideband, nano::election_status_type::active_confirmed_quorum); - } - auto source (block->source ()); - if (source.is_zero ()) - { - source = block->link (); - } + auto total_blocks_cemented = 0; + auto num_blocks_iterated = 0; - if (!source.is_zero () && !ledger.is_epoch_link (source) && ledger.store.source_exists (transaction_a, source)) - { - auto block_height = confirmation_height_a + num_to_confirm; - // Set the height for the receive block above (if there is one) - if (next_height != height_not_set) + auto block = ledger.store.block_get (transaction, new_cemented_frontier, &sideband); + + // Cementing starts from the bottom of the chain and works upwards. This is because chains can have effectively + // an infinite number of send/change blocks in a row. We don't want to hold the write transaction open for too long. + for (; num_blocks_confirmed - num_blocks_iterated != 0; ++num_blocks_iterated) { - receive_source_pairs.back ().receive_details.num_blocks_confirmed = next_height - block_height; + if (!block) + { + logger.always_log ("Failed to write confirmation height for: ", new_cemented_frontier.to_string ()); + ledger.stats.inc (nano::stat::type::confirmation_height, nano::stat::detail::invalid_block); + pending_writes.clear (); + pending_writes_size = 0; + return true; + } + + cemented_blocks.emplace_back (block, sideband); - auto & receive_callbacks_required = receive_source_pairs.back ().receive_details.block_callbacks_required; + // We have likely hit a long chain, flush these callbacks and continue + if (cemented_blocks.size () == batch_write_size) + { + auto num_blocks_cemented = num_blocks_iterated - total_blocks_cemented + 1; + total_blocks_cemented += num_blocks_cemented; + write_confirmation_height (num_blocks_cemented, start_height + total_blocks_cemented - 1, new_cemented_frontier); + transaction.commit (); + notify_observers (cemented_blocks); + cemented_blocks.clear (); + transaction.renew (); + } - // Don't include the last one as that belongs to the next recieve - std::copy (block_callbacks_required.begin (), block_callbacks_required.end () - 1, std::back_inserter (receive_callbacks_required)); - block_callbacks_required = { block_callbacks_required.back () }; + // Get the next block in the chain until we have reached the final desired one + auto last_iteration = (num_blocks_confirmed - num_blocks_iterated) == 1; + if (!last_iteration) + { + new_cemented_frontier = sideband.successor; + block = ledger.store.block_get (transaction, new_cemented_frontier, &sideband); + } + else + { + // Confirm it is indeed the last one + assert (new_cemented_frontier == pending.top_hash); + } } - receive_source_pairs.emplace_back (conf_height_details{ account_a, hash, block_height, height_not_set, {} }, source); - ++receive_source_pairs_size; - next_height = block_height; + auto num_blocks_cemented = num_blocks_confirmed - total_blocks_cemented; + write_confirmation_height (num_blocks_cemented, pending.top_height, new_cemented_frontier); } - hash = block->previous (); + auto it = accounts_confirmed_info.find (pending.account); + if (it != accounts_confirmed_info.cend () && it->second.confirmed_height == pending.top_height) + { + accounts_confirmed_info.erase (pending.account); + accounts_confirmed_info_size = accounts_confirmed_info.size (); + } + pending_writes.pop_front (); + --pending_writes_size; } + } - // We could be traversing a very large account so we don't want to open read transactions for too long. - if (num_to_confirm % batch_read_size == 0) + notify_observers (cemented_blocks); + + assert (pending_writes.empty ()); + assert (pending_writes_size == 0); + nano::lock_guard guard (mutex); + original_hashes_pending.clear (); + return false; +} + +void nano::confirmation_height_processor::add_cemented_observer (std::function const & callback_a) +{ + nano::lock_guard guard (mutex); + cemented_observers.push_back (callback_a); +} + +void nano::confirmation_height_processor::add_cemented_batch_finished_observer (std::function const & callback_a) +{ + nano::lock_guard guard (mutex); + cemented_batch_finished_observers.push_back (callback_a); +} + +void nano::confirmation_height_processor::notify_observers (std::vector const & cemented_blocks) +{ + for (auto const & block_callback_data : cemented_blocks) + { + for (auto const & observer : cemented_observers) { - transaction_a.refresh (); + observer (block_callback_data); } - - --num_to_confirm; } - // Update the number of blocks confirmed by the last receive block - if (!receive_source_pairs.empty ()) + if (!cemented_blocks.empty ()) { - auto & last_receive_details = receive_source_pairs.back ().receive_details; - last_receive_details.num_blocks_confirmed = last_receive_details.height - confirmation_height_a; - last_receive_details.block_callbacks_required = block_callbacks_required; + for (auto const & observer : cemented_batch_finished_observers) + { + observer (); + } } } -nano::confirmation_height_processor::conf_height_details::conf_height_details (nano::account const & account_a, nano::block_hash const & hash_a, uint64_t height_a, uint64_t num_blocks_confirmed_a, std::vector const & block_callbacks_required_a) : +nano::confirmation_height_processor::receive_chain_details::receive_chain_details (nano::account const & account_a, uint64_t height_a, nano::block_hash const & hash_a, nano::block_hash const & top_level_a, boost::optional next_a, uint64_t bottom_height_a, nano::block_hash const & bottom_most_a) : account (account_a), -hash (hash_a), height (height_a), -num_blocks_confirmed (num_blocks_confirmed_a), -block_callbacks_required (block_callbacks_required_a) +hash (hash_a), +top_level (top_level_a), +next (next_a), +bottom_height (bottom_height_a), +bottom_most (bottom_most_a) { } -nano::confirmation_height_processor::receive_source_pair::receive_source_pair (confirmation_height_processor::conf_height_details const & receive_details_a, const block_hash & source_a) : -receive_details (receive_details_a), -source_hash (source_a) +nano::confirmation_height_processor::write_details::write_details (nano::account const & account_a, uint64_t bottom_height_a, nano::block_hash const & bottom_hash_a, uint64_t top_height_a, nano::block_hash const & top_hash_a) : +bottom_height (bottom_height_a), +bottom_hash (bottom_hash_a), +top_height (top_height_a), +top_hash (top_hash_a), +account (account_a) { } -nano::confirmation_height_processor::confirmed_iterated_pair::confirmed_iterated_pair (uint64_t confirmed_height_a, uint64_t iterated_height_a) : -confirmed_height (confirmed_height_a), iterated_height (iterated_height_a) +std::unique_ptr nano::collect_container_info (confirmation_height_processor & confirmation_height_processor_a, const std::string & name_a) { + auto composite = std::make_unique (name_a); + + size_t cemented_observers_count; + size_t cemented_batch_finished_observer_count; + { + nano::lock_guard guard (confirmation_height_processor_a.mutex); + cemented_observers_count = confirmation_height_processor_a.cemented_observers.size (); + cemented_batch_finished_observer_count = confirmation_height_processor_a.cemented_batch_finished_observers.size (); + } + + composite->add_component (std::make_unique (container_info{ "cemented_observers", cemented_observers_count, sizeof (decltype (confirmation_height_processor_a.cemented_observers)::value_type) })); + composite->add_component (std::make_unique (container_info{ "cemented_batch_finished_observers", cemented_batch_finished_observer_count, sizeof (decltype (confirmation_height_processor_a.cemented_batch_finished_observers)::value_type) })); + composite->add_component (std::make_unique (container_info{ "pending_writes", confirmation_height_processor_a.pending_writes_size, sizeof (decltype (confirmation_height_processor_a.pending_writes)::value_type) })); + composite->add_component (std::make_unique (container_info{ "accounts_confirmed_info", confirmation_height_processor_a.accounts_confirmed_info_size, sizeof (decltype (confirmation_height_processor_a.accounts_confirmed_info)::value_type) })); + composite->add_component (std::make_unique (container_info{ "awaiting_processing", confirmation_height_processor_a.awaiting_processing_size (), sizeof (decltype (confirmation_height_processor_a.awaiting_processing)::value_type) })); + return composite; } -nano::confirmation_height_processor::callback_data::callback_data (std::shared_ptr const & block_a, nano::block_sideband const & sideband_a, nano::election_status_type election_status_type_a) : -block (block_a), -sideband (sideband_a), -election_status_type (election_status_type_a) +size_t nano::confirmation_height_processor::awaiting_processing_size () { + nano::lock_guard guard (mutex); + return awaiting_processing.size (); } -std::unique_ptr nano::collect_container_info (confirmation_height_processor & confirmation_height_processor_a, const std::string & name_a) +bool nano::confirmation_height_processor::is_processing_block (nano::block_hash const & hash_a) { - size_t receive_source_pairs_count = confirmation_height_processor_a.receive_source_pairs_size; - auto composite = std::make_unique (name_a); - composite->add_component (std::make_unique (container_info{ "receive_source_pairs", receive_source_pairs_count, sizeof (decltype (confirmation_height_processor_a.receive_source_pairs)::value_type) })); - return composite; + nano::lock_guard guard (mutex); + return original_hashes_pending.find (hash_a) != original_hashes_pending.cend () || awaiting_processing.find (hash_a) != awaiting_processing.cend (); } -size_t nano::pending_confirmation_height::size () +nano::block_hash nano::confirmation_height_processor::current () { nano::lock_guard lk (mutex); - return pending.size (); + return original_hash; } -bool nano::pending_confirmation_height::is_processing_block (nano::block_hash const & hash_a) +nano::confirmation_height_processor::receive_source_pair::receive_source_pair (confirmation_height_processor::receive_chain_details const & receive_details_a, const block_hash & source_a) : +receive_details (receive_details_a), +source_hash (source_a) { - // First check the hash currently being processed - nano::lock_guard lk (mutex); - if (!current_hash.is_zero () && current_hash == hash_a) - { - return true; - } - - // Check remaining pending confirmations - return pending.find (hash_a) != pending.cend (); } -nano::block_hash nano::pending_confirmation_height::current () +nano::confirmation_height_processor::confirmed_info::confirmed_info (uint64_t confirmed_height_a, nano::block_hash const & iterated_frontier_a) : +confirmed_height (confirmed_height_a), +iterated_frontier (iterated_frontier_a) { - nano::lock_guard lk (mutex); - return current_hash; } -std::unique_ptr nano::collect_container_info (pending_confirmation_height & pending_confirmation_height_a, const std::string & name_a) +nano::confirmation_height_processor::callback_data::callback_data (std::shared_ptr const & block_a, nano::block_sideband const & sideband_a) : +block (block_a), +sideband (sideband_a) { - size_t pending_count = pending_confirmation_height_a.size (); - auto composite = std::make_unique (name_a); - composite->add_component (std::make_unique (container_info{ "pending", pending_count, sizeof (nano::block_hash) })); - return composite; } diff --git a/nano/node/confirmation_height_processor.hpp b/nano/node/confirmation_height_processor.hpp index fc596c2b12..cf5f943a61 100644 --- a/nano/node/confirmation_height_processor.hpp +++ b/nano/node/confirmation_height_processor.hpp @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include @@ -13,112 +12,158 @@ namespace nano { class ledger; -class active_transactions; class read_transaction; class logger_mt; class write_database_queue; -class pending_confirmation_height -{ -public: - size_t size (); - bool is_processing_block (nano::block_hash const &); - nano::block_hash current (); - -private: - std::mutex mutex; - std::unordered_set pending; - /** This is the last block popped off the confirmation height pending collection */ - nano::block_hash current_hash{ 0 }; - friend class confirmation_height_processor; - friend class confirmation_height_pending_observer_callbacks_Test; - friend class confirmation_height_dependent_election_Test; - friend class confirmation_height_dependent_election_after_already_cemented_Test; -}; - -std::unique_ptr collect_container_info (pending_confirmation_height &, const std::string &); - class confirmation_height_processor final { public: - confirmation_height_processor (pending_confirmation_height &, nano::ledger &, nano::active_transactions &, nano::write_database_queue &, std::chrono::milliseconds, nano::logger_mt &); + confirmation_height_processor (nano::ledger &, nano::write_database_queue &, std::chrono::milliseconds, nano::logger_mt &); ~confirmation_height_processor (); - void add (nano::block_hash const &); - void stop (); void pause (); void unpause (); + void stop (); + void add (nano::block_hash const & hash_a); + void run (); + size_t awaiting_processing_size (); + bool is_processing_block (nano::block_hash const &); + nano::block_hash current (); + + class callback_data final + { + public: + callback_data (std::shared_ptr const & block_a, nano::block_sideband const & sideband_a); + std::shared_ptr block; + nano::block_sideband sideband; + }; + + void add_cemented_observer (std::function const &); + void add_cemented_batch_finished_observer (std::function const &); - /** The maximum amount of accounts to iterate over while writing */ - static uint64_t constexpr batch_write_size = 2048; + /** The maximum amount of blocks to iterate over while writing */ + static uint64_t constexpr batch_write_size = 4096; /** The maximum number of blocks to be read in while iterating over a long account chain */ static uint64_t constexpr batch_read_size = 4096; private: - class callback_data final + class top_and_next_hash final { public: - callback_data (std::shared_ptr const &, nano::block_sideband const &, nano::election_status_type); - std::shared_ptr block; - nano::block_sideband sideband; - nano::election_status_type election_status_type; + nano::block_hash top; + boost::optional next; + uint64_t next_height; }; - class conf_height_details final + class confirmed_info { public: - conf_height_details (nano::account const &, nano::block_hash const &, uint64_t, uint64_t, std::vector const &); + confirmed_info (uint64_t confirmed_height_a, nano::block_hash const & iterated_frontier); + uint64_t confirmed_height; + nano::block_hash iterated_frontier; + }; + + /* Holds confirmation height/cemented frontier in memory for accounts while iterating */ + std::unordered_map accounts_confirmed_info; + std::atomic accounts_confirmed_info_size{ 0 }; + class receive_chain_details final + { + public: + receive_chain_details (nano::account const &, uint64_t, nano::block_hash const &, nano::block_hash const &, boost::optional, uint64_t, nano::block_hash const &); nano::account account; - nano::block_hash hash; uint64_t height; - uint64_t num_blocks_confirmed; - std::vector block_callbacks_required; + nano::block_hash hash; + nano::block_hash top_level; + boost::optional next; + uint64_t bottom_height; + nano::block_hash bottom_most; }; - class receive_source_pair final + class preparation_data final { public: - receive_source_pair (conf_height_details const &, const nano::block_hash &); + nano::transaction const & transaction; + nano::block_hash const & top_most_non_receive_block_hash; + bool already_cemented; + boost::circular_buffer_space_optimized & checkpoints; + decltype (accounts_confirmed_info.begin ()) account_it; + nano::confirmation_height_info const & confirmation_height_info; + nano::account const & account; + uint64_t bottom_height; + nano::block_hash const & bottom_most; + boost::optional & receive_details; + boost::optional & next_in_receive_chain; + }; - conf_height_details receive_details; - nano::block_hash source_hash; + class write_details final + { + public: + write_details (nano::account const &, uint64_t, nano::block_hash const &, uint64_t, nano::block_hash const &); + nano::account account; + // This is the first block hash (bottom most) which is not cemented + uint64_t bottom_height; + nano::block_hash bottom_hash; + // Desired cemented frontier + uint64_t top_height; + nano::block_hash top_hash; }; - class confirmed_iterated_pair + class receive_source_pair final { public: - confirmed_iterated_pair (uint64_t confirmed_height_a, uint64_t iterated_height_a); - uint64_t confirmed_height; - uint64_t iterated_height; + receive_source_pair (receive_chain_details const &, const nano::block_hash &); + + receive_chain_details receive_details; + nano::block_hash source_hash; }; + std::mutex mutex; + + // Hashes which have been added to the confirmation height processor, but not yet processed + std::unordered_set awaiting_processing; + // Hashes which have been added and processed, but have not been cemented + std::unordered_set original_hashes_pending; + std::vector> cemented_observers; + std::vector> cemented_batch_finished_observers; + + /** This is the last block popped off the confirmation height pending collection */ + nano::block_hash original_hash{ 0 }; + + nano::ledger & ledger; + nano::logger_mt & logger; nano::condition_variable condition; - nano::pending_confirmation_height & pending_confirmations; std::atomic stopped{ false }; std::atomic paused{ false }; - nano::ledger & ledger; - nano::active_transactions & active; - nano::logger_mt & logger; - std::atomic receive_source_pairs_size{ 0 }; - std::vector receive_source_pairs; - - std::deque pending_writes; - // Store the highest confirmation heights for accounts in pending_writes to reduce unnecessary iterating, - // and iterated height to prevent iterating over the same blocks more than once from self-sends or "circular" sends between the same accounts. - std::unordered_map confirmed_iterated_pairs; - nano::timer timer; nano::write_database_queue & write_database_queue; std::chrono::milliseconds batch_separate_pending_min_time; - std::thread thread; + nano::timer timer; + nano::network_constants network_constants; - void run (); - void add_confirmation_height (nano::block_hash const &); - void collect_unconfirmed_receive_and_sources_for_account (uint64_t, uint64_t, nano::block_hash const &, nano::account const &, nano::read_transaction const &, std::vector &); - bool write_pending (std::deque &); + bool cement_blocks (); + + static uint32_t constexpr max_items{ 65536 }; + + std::deque pending_writes; + std::atomic pending_writes_size{ 0 }; + static uint32_t constexpr pending_writes_max_size{ max_items }; + + std::thread thread; friend std::unique_ptr collect_container_info (confirmation_height_processor &, const std::string &); friend class confirmation_height_pending_observer_callbacks_Test; + friend class confirmation_height_dependent_election_Test; + friend class confirmation_height_dependent_election_after_already_cemented_Test; + +private: + top_and_next_hash get_next_block (boost::optional const &, boost::circular_buffer_space_optimized const &, boost::circular_buffer_space_optimized const & receive_source_pairs, boost::optional &); + nano::block_hash get_least_unconfirmed_hash_from_top_level (nano::transaction const &, nano::block_hash const &, nano::account const &, nano::confirmation_height_info const &, uint64_t &); + void notify_observers (std::vector const & cemented_blocks); + void prepare_iterated_blocks_for_cementing (preparation_data &); + void set_next_hash (); + void process (); + bool iterate (nano::read_transaction const &, uint64_t, nano::block_hash const &, boost::circular_buffer_space_optimized &, nano::block_hash &, nano::block_hash const &, boost::circular_buffer_space_optimized &, nano::account const &); }; std::unique_ptr collect_container_info (confirmation_height_processor &, const std::string &); diff --git a/nano/node/election.cpp b/nano/node/election.cpp index 028b579edc..654bebf9d2 100644 --- a/nano/node/election.cpp +++ b/nano/node/election.cpp @@ -37,15 +37,14 @@ void nano::election::confirm_once (nano::election_status_type type_a) auto status_l (status); auto node_l (node.shared ()); auto confirmation_action_l (confirmation_action); + node.active.election_winner_details.emplace (status.winner->hash (), shared_from_this ()); node.background ([node_l, status_l, confirmation_action_l]() { node_l->process_confirmed (status_l); confirmation_action_l (status_l.winner); }); - auto root (status.winner->qualified_root ()); - node.active.pending_conf_height.emplace (status.winner->hash (), shared_from_this ()); clear_blocks (); clear_dependent (); - node.active.roots.erase (root); + node.active.roots.erase (status.winner->qualified_root ()); } } diff --git a/nano/node/json_handler.cpp b/nano/node/json_handler.cpp index 94cb7aca71..eac194bd38 100644 --- a/nano/node/json_handler.cpp +++ b/nano/node/json_handler.cpp @@ -1761,7 +1761,7 @@ void nano::json_handler::confirmation_active () void nano::json_handler::confirmation_height_currently_processing () { - auto hash = node.pending_confirmation_height.current (); + auto hash = node.confirmation_height_processor.current (); if (!hash.is_zero ()) { response_l.put ("hash", hash.to_string ()); diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 1b9c05a9ac..1bcd82cf77 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -144,9 +144,9 @@ block_processor_thread ([this]() { online_reps (ledger, network_params, config.online_weight_minimum.number ()), votes_cache (wallets), vote_uniquer (block_uniquer), -active (*this), +confirmation_height_processor (ledger, write_database_queue, config.conf_height_processor_batch_min_time, logger), +active (*this, confirmation_height_processor), aggregator (network_params.network, config, stats, votes_cache, store, wallets), -confirmation_height_processor (pending_confirmation_height, ledger, active, write_database_queue, config.conf_height_processor_batch_min_time, logger), payment_observer_processor (observers.blocks), wallets (wallets_store.init_error (), *this), startup_time (std::chrono::steady_clock::now ()) @@ -597,7 +597,6 @@ std::unique_ptr nano::collect_container_info (no composite->add_component (collect_container_info (node.block_uniquer, "block_uniquer")); composite->add_component (collect_container_info (node.vote_uniquer, "vote_uniquer")); composite->add_component (collect_container_info (node.confirmation_height_processor, "confirmation_height_processor")); - composite->add_component (collect_container_info (node.pending_confirmation_height, "pending_confirmation_height")); composite->add_component (collect_container_info (node.worker, "worker")); composite->add_component (collect_container_info (node.distributed_work, "distributed_work")); composite->add_component (collect_container_info (node.aggregator, "request_aggregator")); @@ -696,8 +695,8 @@ void nano::node::stop () } aggregator.stop (); vote_processor.stop (); - confirmation_height_processor.stop (); active.stop (); + confirmation_height_processor.stop (); network.stop (); telemetry.stop (); if (websocket_server) @@ -1088,7 +1087,7 @@ void nano::node::block_confirm (std::shared_ptr block_a) bool nano::node::block_confirmed_or_being_confirmed (nano::transaction const & transaction_a, nano::block_hash const & hash_a) { - return ledger.block_confirmed (transaction_a, hash_a) || pending_confirmation_height.is_processing_block (hash_a); + return ledger.block_confirmed (transaction_a, hash_a) || confirmation_height_processor.is_processing_block (hash_a); } nano::uint128_t nano::node::delta () const diff --git a/nano/node/node.hpp b/nano/node/node.hpp index 41e674f4ae..0938cb3b05 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -183,10 +183,9 @@ class node final : public std::enable_shared_from_this nano::keypair node_id; nano::block_uniquer block_uniquer; nano::vote_uniquer vote_uniquer; - nano::pending_confirmation_height pending_confirmation_height; // Used by both active and confirmation height processor + nano::confirmation_height_processor confirmation_height_processor; nano::active_transactions active; nano::request_aggregator aggregator; - nano::confirmation_height_processor confirmation_height_processor; nano::payment_observer_processor payment_observer_processor; nano::wallets wallets; const std::chrono::steady_clock::time_point startup_time; diff --git a/nano/rpc_test/rpc.cpp b/nano/rpc_test/rpc.cpp index 5eadf5c68d..96ef64e654 100644 --- a/nano/rpc_test/rpc.cpp +++ b/nano/rpc_test/rpc.cpp @@ -6154,7 +6154,7 @@ TEST (rpc, confirmation_height_currently_processing) rpc.start (); system.deadline_set (10s); - while (!node->pending_confirmation_height.is_processing_block (previous_genesis_chain_hash)) + while (!node->confirmation_height_processor.is_processing_block (previous_genesis_chain_hash)) { ASSERT_NO_ERROR (system.poll ()); } @@ -6172,16 +6172,10 @@ TEST (rpc, confirmation_height_currently_processing) ASSERT_EQ (frontier->hash ().to_string (), hash); } - // Wait until confirmation has been set + // Wait until confirmation has been set and not processing anything system.deadline_set (10s); - while (true) + while (!node->confirmation_height_processor.current ().is_zero ()) { - auto transaction = node->store.tx_begin_read (); - if (node->ledger.block_confirmed (transaction, frontier->hash ())) - { - break; - } - ASSERT_NO_ERROR (system.poll ()); } @@ -7045,7 +7039,7 @@ TEST (rpc, block_confirmed) } // Should no longer be processing the block after confirmation is set - ASSERT_FALSE (node->pending_confirmation_height.is_processing_block (send->hash ())); + ASSERT_FALSE (node->confirmation_height_processor.is_processing_block (send->hash ())); // Requesting confirmation for this should now succeed request.put ("hash", send->hash ().to_string ()); diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index 3c4e1d7d14..7938176c57 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -496,18 +496,13 @@ TEST (confirmation_height, many_accounts_single_confirmation) } system.deadline_set (60s); - while (true) + auto transaction = node->store.tx_begin_read (); + while (!node->ledger.block_confirmed (transaction, last_open_hash)) { - auto transaction = node->store.tx_begin_read (); - if (node->ledger.block_confirmed (transaction, last_open_hash)) - { - break; - } - ASSERT_NO_ERROR (system.poll ()); + transaction.refresh (); } - auto transaction (node->store.tx_begin_read ()); // All frontiers (except last) should have 2 blocks and both should be confirmed for (auto i (node->store.latest_begin (transaction)), n (node->store.latest_end ()); i != n; ++i) { @@ -520,7 +515,20 @@ TEST (confirmation_height, many_accounts_single_confirmation) ASSERT_EQ (count, account_info.block_count); } + auto cemented_count = 0; + for (auto i (node->ledger.store.confirmation_height_begin (transaction)), n (node->ledger.store.confirmation_height_end ()); i != n; ++i) + { + cemented_count += i->second.height; + } + + ASSERT_EQ (cemented_count, node->ledger.cache.cemented_count); ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in), num_accounts * 2 - 2); + + system.deadline_set (20s); + while ((node->ledger.cache.cemented_count - 1) != node->stats.count (nano::stat::type::observer, nano::stat::detail::all, nano::stat::dir::out)) + { + ASSERT_NO_ERROR (system.poll ()); + } } // Can take up to 10 minutes @@ -539,7 +547,7 @@ TEST (confirmation_height, many_accounts_many_confirmations) node->active.next_frontier_check = std::chrono::steady_clock::now () + 7200s; } - auto num_accounts = 10000; + auto num_accounts = nano::confirmation_height_processor::batch_write_size * 2 + 50; auto latest_genesis = node->latest (nano::test_genesis_key.pub); std::vector> open_blocks; { @@ -569,6 +577,21 @@ TEST (confirmation_height, many_accounts_many_confirmations) { ASSERT_NO_ERROR (system.poll ()); } + + auto transaction = node->store.tx_begin_read (); + auto cemented_count = 0; + for (auto i (node->ledger.store.confirmation_height_begin (transaction)), n (node->ledger.store.confirmation_height_end ()); i != n; ++i) + { + cemented_count += i->second.height; + } + + ASSERT_EQ (cemented_count, node->ledger.cache.cemented_count); + + system.deadline_set (20s); + while ((node->ledger.cache.cemented_count - 1) != node->stats.count (nano::stat::type::observer, nano::stat::detail::all, nano::stat::dir::out)) + { + ASSERT_NO_ERROR (system.poll ()); + } } TEST (confirmation_height, long_chains) @@ -588,7 +611,7 @@ TEST (confirmation_height, long_chains) node->active.next_frontier_check = std::chrono::steady_clock::now () + 7200s; } - constexpr auto num_blocks = 10000; + constexpr auto num_blocks = nano::confirmation_height_processor::batch_write_size * 2 + 50; // First open the other account nano::send_block send (latest, key1.pub, nano::genesis_amount - nano::Gxrb_ratio + num_blocks + 1, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (latest)); @@ -660,7 +683,20 @@ TEST (confirmation_height, long_chains) ASSERT_EQ (num_blocks + 1, confirmation_height_info.height); ASSERT_EQ (num_blocks + 1, account_info.block_count); + auto cemented_count = 0; + for (auto i (node->ledger.store.confirmation_height_begin (transaction)), n (node->ledger.store.confirmation_height_end ()); i != n; ++i) + { + cemented_count += i->second.height; + } + + ASSERT_EQ (cemented_count, node->ledger.cache.cemented_count); ASSERT_EQ (node->ledger.stats.count (nano::stat::type::confirmation_height, nano::stat::detail::blocks_confirmed, nano::stat::dir::in), num_blocks * 2 + 2); + + system.deadline_set (20s); + while ((node->ledger.cache.cemented_count - 1) != node->stats.count (nano::stat::type::observer, nano::stat::detail::all, nano::stat::dir::out)) + { + ASSERT_NO_ERROR (system.poll ()); + } } // Can take up to 1 hour @@ -678,7 +714,7 @@ TEST (confirmation_height, prioritize_frontiers_overwrite) node->active.next_frontier_check = std::chrono::steady_clock::now () + 7200s; } - auto num_accounts = node->active.max_priority_cementable_frontiers * 2; + auto num_accounts = node->active.max_priority_cementable_frontiers * 2 + 50; nano::keypair last_keypair = nano::test_genesis_key; auto last_open_hash = node->latest (nano::test_genesis_key.pub); // Clear confirmation height so that the genesis account has the same amount of uncemented blocks as the other frontiers