Skip to content

Commit

Permalink
Bounded memory and redesign in the confirmation height processor (#2531)
Browse files Browse the repository at this point in the history
* Bounded memory and redesign in the confirmation height processor

* Disable frontiers confirmation for test, block doesn't exist during callback in fork resolution

* Fix rpc.confirmation_height_currently_processing test

* Cement blocks below receives not above

* Fixes gcc build (hopefully)

* Store start and end hash in pending write to remove extraneous IO

* Optimise for the case where the top hash is 2 above cemented frontier

* Fix TSAN issues

* Serg comments

* Use cached genesis_hash in CLI --confirmation_height_clear option

* Set accounts_confirmed_info_size to 0 when clearing

* Remove const for prepare_iterated_blocks_for_cementing as confusing
  • Loading branch information
wezrule committed Feb 7, 2020
1 parent 9627853 commit 1e594ba
Show file tree
Hide file tree
Showing 13 changed files with 941 additions and 546 deletions.
331 changes: 228 additions & 103 deletions nano/core_test/confirmation_height.cpp

Large diffs are not rendered by default.

12 changes: 8 additions & 4 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1720,10 +1720,14 @@ TEST (node, broadcast_elected)
node_flags.disable_tcp_realtime = true;
node_flags.disable_bootstrap_listener = true;
}
nano::system system (3, type, node_flags);
auto node0 (system.nodes[0]);
auto node1 (system.nodes[1]);
auto node2 (system.nodes[2]);
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto node0 = system.add_node (node_config, node_flags, type);
node_config.peering_port = nano::get_available_port ();
auto node1 = system.add_node (node_config, node_flags, type);
node_config.peering_port = nano::get_available_port ();
auto node2 = system.add_node (node_config, node_flags, type);
nano::keypair rep_big;
nano::keypair rep_small;
nano::keypair rep_other;
Expand Down
136 changes: 92 additions & 44 deletions nano/node/active_transactions.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <nano/lib/threading.hpp>
#include <nano/node/active_transactions.hpp>
#include <nano/node/confirmation_height_processor.hpp>
#include <nano/node/election.hpp>
#include <nano/node/node.hpp>

Expand All @@ -10,7 +11,8 @@

using namespace std::chrono;

nano::active_transactions::active_transactions (nano::node & node_a) :
nano::active_transactions::active_transactions (nano::node & node_a, nano::confirmation_height_processor & confirmation_height_processor_a) :
confirmation_height_processor (confirmation_height_processor_a),
node (node_a),
multipliers_cb (20, 1.),
trended_active_difficulty (node_a.network_params.network.publish_threshold),
Expand All @@ -27,6 +29,16 @@ thread ([this]() {
request_loop ();
})
{
// Register a callback which will get called after a block is cemented
confirmation_height_processor.add_cemented_observer ([this](nano::confirmation_height_processor::callback_data const & callback_data) {
this->block_cemented_callback (callback_data.block, callback_data.sideband);
});

// Register a callback which will get called after a batch of blocks is written and observer calls finished
confirmation_height_processor.add_cemented_batch_finished_observer ([this]() {
this->cemented_batch_finished_callback ();
});

assert (min_time_between_requests > std::chrono::milliseconds (node.network_params.network.request_interval_ms));
assert (min_time_between_floods > std::chrono::milliseconds (node.network_params.network.request_interval_ms));
nano::unique_lock<std::mutex> lock (mutex);
Expand Down Expand Up @@ -90,7 +102,7 @@ void nano::active_transactions::search_frontiers (nano::transaction const & tran
error = node.store.confirmation_height_get (transaction_a, cementable_account.account, confirmation_height_info);
release_assert (!error);

if (info.block_count > confirmation_height_info.height && !this->node.pending_confirmation_height.is_processing_block (info.head))
if (info.block_count > confirmation_height_info.height && !this->confirmation_height_processor.is_processing_block (info.head))
{
auto block (this->node.store.block_get (transaction_a, info.head));
if (!this->start (block, true))
Expand All @@ -112,49 +124,87 @@ void nano::active_transactions::search_frontiers (nano::transaction const & tran
next_frontier_check = steady_clock::now () + (agressive_factor / test_network_factor);
}
}
void nano::active_transactions::post_confirmation_height_set (nano::transaction const & transaction_a, std::shared_ptr<nano::block> block_a, nano::block_sideband const & sideband_a, nano::election_status_type election_status_type_a)

void nano::active_transactions::block_cemented_callback (std::shared_ptr<nano::block> const & block_a, nano::block_sideband const & sideband_a)
{
if (election_status_type_a == nano::election_status_type::inactive_confirmation_height)
auto transaction = node.store.tx_begin_read ();

boost::optional<nano::election_status_type> election_status_type;
if (!confirmation_height_processor.is_processing_block (block_a->hash ()))
{
nano::account account (0);
nano::uint128_t amount (0);
bool is_state_send (false);
nano::account pending_account (0);
node.process_confirmed_data (transaction_a, block_a, block_a->hash (), sideband_a, account, amount, is_state_send, pending_account);
node.observers.blocks.notify (nano::election_status{ block_a, 0, std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now ().time_since_epoch ()), std::chrono::duration_values<std::chrono::milliseconds>::zero (), 0, 1, 0, nano::election_status_type::inactive_confirmation_height }, account, amount, is_state_send);
election_status_type = confirm_block (transaction, block_a);
}
else
{
auto hash (block_a->hash ());
nano::lock_guard<std::mutex> lock (mutex);
auto existing (pending_conf_height.find (hash));
if (existing != pending_conf_height.end ())
// This block was explicitly added to the confirmation height_processor
election_status_type = nano::election_status_type::active_confirmed_quorum;
}

if (election_status_type.is_initialized ())
{
if (election_status_type == nano::election_status_type::inactive_confirmation_height)
{
auto election = existing->second;
if (election->confirmed && !election->stopped && election->status.winner->hash () == hash)
nano::account account (0);
nano::uint128_t amount (0);
bool is_state_send (false);
nano::account pending_account (0);
node.process_confirmed_data (transaction, block_a, block_a->hash (), sideband_a, account, amount, is_state_send, pending_account);
node.observers.blocks.notify (nano::election_status{ block_a, 0, std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now ().time_since_epoch ()), std::chrono::duration_values<std::chrono::milliseconds>::zero (), 0, 1, 0, nano::election_status_type::inactive_confirmation_height }, account, amount, is_state_send);
}
else
{
auto hash (block_a->hash ());
nano::lock_guard<std::mutex> lock (mutex);
auto existing (election_winner_details.find (hash));
if (existing != election_winner_details.end ())
{
add_confirmed (existing->second->status, block_a->qualified_root ());

node.receive_confirmed (transaction_a, block_a, hash);
nano::account account (0);
nano::uint128_t amount (0);
bool is_state_send (false);
nano::account pending_account (0);
node.process_confirmed_data (transaction_a, block_a, hash, sideband_a, account, amount, is_state_send, pending_account);
election->status.type = election_status_type_a;
election->status.confirmation_request_count = election->confirmation_request_count;
node.observers.blocks.notify (election->status, account, amount, is_state_send);
if (amount > 0)
auto election = existing->second;
if (election->confirmed && !election->stopped && election->status.winner->hash () == hash)
{
node.observers.account_balance.notify (account, false);
if (!pending_account.is_zero ())
add_confirmed (existing->second->status, block_a->qualified_root ());

node.receive_confirmed (transaction, block_a, hash);
nano::account account (0);
nano::uint128_t amount (0);
bool is_state_send (false);
nano::account pending_account (0);
node.process_confirmed_data (transaction, block_a, hash, sideband_a, account, amount, is_state_send, pending_account);
election->status.type = *election_status_type;
election->status.confirmation_request_count = election->confirmation_request_count;
node.observers.blocks.notify (election->status, account, amount, is_state_send);
if (amount > 0)
{
node.observers.account_balance.notify (pending_account, true);
node.observers.account_balance.notify (account, false);
if (!pending_account.is_zero ())
{
node.observers.account_balance.notify (pending_account, true);
}
}
}

election_winner_details.erase (hash);
}
}
}
}

pending_conf_height.erase (hash);
void nano::active_transactions::cemented_batch_finished_callback ()
{
// Depending on timing there is a situation where the election_winner_details is not reset.
// This can happen when a block wins an election, and the block is confirmed + observer
// called before the block hash gets added to election_winner_details. If the block is confirmed
// callbacks have already been done, so we can safely just remove it.
auto transaction = node.store.tx_begin_read ();
nano::lock_guard<std::mutex> guard (mutex);
for (auto it = election_winner_details.begin (); it != election_winner_details.end ();)
{
if (node.ledger.block_confirmed (transaction, it->first))
{
it = election_winner_details.erase (it);
}
else
{
++it;
}
}
}
Expand Down Expand Up @@ -220,7 +270,7 @@ void nano::active_transactions::request_confirm (nano::unique_lock<std::mutex> &

// Due to the confirmation height processor working asynchronously and compressing several roots into one frontier, probably_unconfirmed_frontiers can be wrong
{
auto pending_confirmation_height_size (node.pending_confirmation_height.size ());
auto pending_confirmation_height_size (confirmation_height_processor.awaiting_processing_size ());
bool probably_unconfirmed_frontiers (node.ledger.cache.block_count > node.ledger.cache.cemented_count + roots.size () + pending_confirmation_height_size);
bool bootstrap_weight_reached (node.ledger.cache.block_count >= node.ledger.bootstrap_weight_max_blocks);
if (node.config.frontiers_confirmation != nano::frontiers_confirmation_mode::disabled && bootstrap_weight_reached && probably_unconfirmed_frontiers && pending_confirmation_height_size < confirmed_frontiers_max_pending_cut_off)
Expand Down Expand Up @@ -344,7 +394,7 @@ void nano::active_transactions::request_loop ()

void nano::active_transactions::prioritize_account_for_confirmation (nano::active_transactions::prioritize_num_uncemented & cementable_frontiers_a, size_t & cementable_frontiers_size_a, nano::account const & account_a, nano::account_info const & info_a, uint64_t confirmation_height)
{
if (info_a.block_count > confirmation_height && !node.pending_confirmation_height.is_processing_block (info_a.head))
if (info_a.block_count > confirmation_height && !confirmation_height_processor.is_processing_block (info_a.head))
{
auto num_uncemented = info_a.block_count - confirmation_height;
nano::lock_guard<std::mutex> guard (mutex);
Expand Down Expand Up @@ -386,7 +436,7 @@ void nano::active_transactions::prioritize_account_for_confirmation (nano::activ
void nano::active_transactions::prioritize_frontiers_for_confirmation (nano::transaction const & transaction_a, std::chrono::milliseconds ledger_accounts_time_a, std::chrono::milliseconds wallet_account_time_a)
{
// Don't try to prioritize when there are a large number of pending confirmation heights as blocks can be cemented in the meantime, making the prioritization less reliable
if (node.pending_confirmation_height.size () < confirmed_frontiers_max_pending_cut_off)
if (confirmation_height_processor.awaiting_processing_size () < confirmed_frontiers_max_pending_cut_off)
{
size_t priority_cementable_frontiers_size;
size_t priority_wallet_cementable_frontiers_size;
Expand Down Expand Up @@ -884,12 +934,6 @@ bool nano::active_transactions::publish (std::shared_ptr<nano::block> block_a)
return result;
}

void nano::active_transactions::clear_block (nano::block_hash const & hash_a)
{
nano::lock_guard<std::mutex> guard (mutex);
pending_conf_height.erase (hash_a);
}

// Returns the type of election status requiring callbacks calling later
boost::optional<nano::election_status_type> nano::active_transactions::confirm_block (nano::transaction const & transaction_a, std::shared_ptr<nano::block> block_a)
{
Expand Down Expand Up @@ -1030,6 +1074,12 @@ std::chrono::steady_clock::time_point nano::active_transactions::find_dropped_el
}
}

size_t nano::active_transactions::election_winner_details_size ()
{
nano::lock_guard<std::mutex> guard (mutex);
return election_winner_details.size ();
}

nano::cementable_account::cementable_account (nano::account const & account_a, size_t blocks_uncemented_a) :
account (account_a), blocks_uncemented (blocks_uncemented_a)
{
Expand All @@ -1040,20 +1090,18 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (ac
size_t roots_count;
size_t blocks_count;
size_t confirmed_count;
size_t pending_conf_height_count;

{
nano::lock_guard<std::mutex> guard (active_transactions.mutex);
roots_count = active_transactions.roots.size ();
blocks_count = active_transactions.blocks.size ();
confirmed_count = active_transactions.confirmed.size ();
pending_conf_height_count = active_transactions.pending_conf_height.size ();
}

auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "roots", roots_count, sizeof (decltype (active_transactions.roots)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocks", blocks_count, sizeof (decltype (active_transactions.blocks)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "pending_conf_height", pending_conf_height_count, sizeof (decltype (active_transactions.pending_conf_height)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "election_winner_details", active_transactions.election_winner_details_size (), sizeof (decltype (active_transactions.election_winner_details)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "confirmed", confirmed_count, sizeof (decltype (active_transactions.confirmed)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "priority_wallet_cementable_frontiers_count", active_transactions.priority_wallet_cementable_frontiers_size (), sizeof (nano::cementable_account) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "priority_cementable_frontiers_count", active_transactions.priority_cementable_frontiers_size (), sizeof (nano::cementable_account) }));
Expand Down
11 changes: 7 additions & 4 deletions nano/node/active_transactions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class block_sideband;
class election;
class vote;
class transaction;
class confirmation_height_processor;

class conflict_info final
{
Expand Down Expand Up @@ -72,7 +73,7 @@ class active_transactions final
// clang-format on

public:
explicit active_transactions (nano::node &);
explicit active_transactions (nano::node &, nano::confirmation_height_processor &);
~active_transactions ();
// Start an election for a block
// Call action with confirmed block, may be different than what we started with
Expand All @@ -96,7 +97,8 @@ class active_transactions final
void stop ();
bool publish (std::shared_ptr<nano::block> block_a);
boost::optional<nano::election_status_type> confirm_block (nano::transaction const &, std::shared_ptr<nano::block>);
void post_confirmation_height_set (nano::transaction const & transaction_a, std::shared_ptr<nano::block> block_a, nano::block_sideband const & sideband_a, nano::election_status_type election_status_type_a);
void block_cemented_callback (std::shared_ptr<nano::block> const & block_a, nano::block_sideband const & sideband_a);
void cemented_batch_finished_callback ();
// clang-format off
boost::multi_index_container<nano::conflict_info,
mi::indexed_by<
Expand All @@ -114,6 +116,7 @@ class active_transactions final
void add_inactive_votes_cache (nano::block_hash const &, nano::account const &);
nano::gap_information find_inactive_votes_cache (nano::block_hash const &);
void erase_inactive_votes_cache (nano::block_hash const &);
nano::confirmation_height_processor & confirmation_height_processor;
nano::node & node;
std::mutex mutex;
boost::circular_buffer<double> multipliers_cb;
Expand All @@ -122,8 +125,8 @@ class active_transactions final
size_t priority_wallet_cementable_frontiers_size ();
boost::circular_buffer<double> difficulty_trend ();
size_t inactive_votes_cache_size ();
std::unordered_map<nano::block_hash, std::shared_ptr<nano::election>> pending_conf_height;
void clear_block (nano::block_hash const & hash_a);
std::unordered_map<nano::block_hash, std::shared_ptr<nano::election>> election_winner_details;
size_t election_winner_details_size ();
void add_dropped_elections_cache (nano::qualified_root const &);
std::chrono::steady_clock::time_point find_dropped_elections_cache (nano::qualified_root const &);
size_t dropped_elections_cache_size ();
Expand Down
2 changes: 1 addition & 1 deletion nano/node/cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,7 @@ void reset_confirmation_heights (nano::block_store & store)

// Then make sure the confirmation height of the genesis account open block is 1
nano::network_params network_params;
store.confirmation_height_put (transaction, network_params.ledger.genesis_account, { 1, network_params.ledger.genesis_block });
store.confirmation_height_put (transaction, network_params.ledger.genesis_account, { 1, network_params.ledger.genesis_hash });
}

bool is_using_rocksdb (boost::filesystem::path const & data_path, std::error_code & ec)
Expand Down
Loading

0 comments on commit 1e594ba

Please sign in to comment.