Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bounded memory and redesign in the confirmation height processor #2531

Merged
merged 13 commits into from
Feb 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
331 changes: 228 additions & 103 deletions nano/core_test/confirmation_height.cpp

Large diffs are not rendered by default.

12 changes: 8 additions & 4 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1720,10 +1720,14 @@ TEST (node, broadcast_elected)
node_flags.disable_tcp_realtime = true;
node_flags.disable_bootstrap_listener = true;
}
nano::system system (3, type, node_flags);
auto node0 (system.nodes[0]);
auto node1 (system.nodes[1]);
auto node2 (system.nodes[2]);
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.frontiers_confirmation = nano::frontiers_confirmation_mode::disabled;
auto node0 = system.add_node (node_config, node_flags, type);
node_config.peering_port = nano::get_available_port ();
auto node1 = system.add_node (node_config, node_flags, type);
node_config.peering_port = nano::get_available_port ();
auto node2 = system.add_node (node_config, node_flags, type);
nano::keypair rep_big;
nano::keypair rep_small;
nano::keypair rep_other;
Expand Down
136 changes: 92 additions & 44 deletions nano/node/active_transactions.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <nano/lib/threading.hpp>
#include <nano/node/active_transactions.hpp>
#include <nano/node/confirmation_height_processor.hpp>
#include <nano/node/election.hpp>
#include <nano/node/node.hpp>

Expand All @@ -10,7 +11,8 @@

using namespace std::chrono;

nano::active_transactions::active_transactions (nano::node & node_a) :
nano::active_transactions::active_transactions (nano::node & node_a, nano::confirmation_height_processor & confirmation_height_processor_a) :
confirmation_height_processor (confirmation_height_processor_a),
node (node_a),
multipliers_cb (20, 1.),
trended_active_difficulty (node_a.network_params.network.publish_threshold),
Expand All @@ -27,6 +29,16 @@ thread ([this]() {
request_loop ();
})
{
// Register a callback which will get called after a block is cemented
confirmation_height_processor.add_cemented_observer ([this](nano::confirmation_height_processor::callback_data const & callback_data) {
this->block_cemented_callback (callback_data.block, callback_data.sideband);
});

// Register a callback which will get called after a batch of blocks is written and observer calls finished
confirmation_height_processor.add_cemented_batch_finished_observer ([this]() {
this->cemented_batch_finished_callback ();
});

assert (min_time_between_requests > std::chrono::milliseconds (node.network_params.network.request_interval_ms));
assert (min_time_between_floods > std::chrono::milliseconds (node.network_params.network.request_interval_ms));
nano::unique_lock<std::mutex> lock (mutex);
Expand Down Expand Up @@ -90,7 +102,7 @@ void nano::active_transactions::search_frontiers (nano::transaction const & tran
error = node.store.confirmation_height_get (transaction_a, cementable_account.account, confirmation_height_info);
release_assert (!error);

if (info.block_count > confirmation_height_info.height && !this->node.pending_confirmation_height.is_processing_block (info.head))
if (info.block_count > confirmation_height_info.height && !this->confirmation_height_processor.is_processing_block (info.head))
{
auto block (this->node.store.block_get (transaction_a, info.head));
if (!this->start (block, true))
Expand All @@ -112,49 +124,87 @@ void nano::active_transactions::search_frontiers (nano::transaction const & tran
next_frontier_check = steady_clock::now () + (agressive_factor / test_network_factor);
}
}
void nano::active_transactions::post_confirmation_height_set (nano::transaction const & transaction_a, std::shared_ptr<nano::block> block_a, nano::block_sideband const & sideband_a, nano::election_status_type election_status_type_a)

void nano::active_transactions::block_cemented_callback (std::shared_ptr<nano::block> const & block_a, nano::block_sideband const & sideband_a)
{
if (election_status_type_a == nano::election_status_type::inactive_confirmation_height)
auto transaction = node.store.tx_begin_read ();

boost::optional<nano::election_status_type> election_status_type;
if (!confirmation_height_processor.is_processing_block (block_a->hash ()))
{
nano::account account (0);
nano::uint128_t amount (0);
bool is_state_send (false);
nano::account pending_account (0);
node.process_confirmed_data (transaction_a, block_a, block_a->hash (), sideband_a, account, amount, is_state_send, pending_account);
node.observers.blocks.notify (nano::election_status{ block_a, 0, std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now ().time_since_epoch ()), std::chrono::duration_values<std::chrono::milliseconds>::zero (), 0, 1, 0, nano::election_status_type::inactive_confirmation_height }, account, amount, is_state_send);
election_status_type = confirm_block (transaction, block_a);
}
else
{
auto hash (block_a->hash ());
nano::lock_guard<std::mutex> lock (mutex);
auto existing (pending_conf_height.find (hash));
if (existing != pending_conf_height.end ())
// This block was explicitly added to the confirmation height_processor
election_status_type = nano::election_status_type::active_confirmed_quorum;
}

if (election_status_type.is_initialized ())
{
if (election_status_type == nano::election_status_type::inactive_confirmation_height)
{
auto election = existing->second;
if (election->confirmed && !election->stopped && election->status.winner->hash () == hash)
nano::account account (0);
nano::uint128_t amount (0);
bool is_state_send (false);
nano::account pending_account (0);
node.process_confirmed_data (transaction, block_a, block_a->hash (), sideband_a, account, amount, is_state_send, pending_account);
node.observers.blocks.notify (nano::election_status{ block_a, 0, std::chrono::duration_cast<std::chrono::milliseconds> (std::chrono::system_clock::now ().time_since_epoch ()), std::chrono::duration_values<std::chrono::milliseconds>::zero (), 0, 1, 0, nano::election_status_type::inactive_confirmation_height }, account, amount, is_state_send);
}
else
{
auto hash (block_a->hash ());
nano::lock_guard<std::mutex> lock (mutex);
auto existing (election_winner_details.find (hash));
if (existing != election_winner_details.end ())
{
add_confirmed (existing->second->status, block_a->qualified_root ());

node.receive_confirmed (transaction_a, block_a, hash);
nano::account account (0);
nano::uint128_t amount (0);
bool is_state_send (false);
nano::account pending_account (0);
node.process_confirmed_data (transaction_a, block_a, hash, sideband_a, account, amount, is_state_send, pending_account);
election->status.type = election_status_type_a;
election->status.confirmation_request_count = election->confirmation_request_count;
node.observers.blocks.notify (election->status, account, amount, is_state_send);
if (amount > 0)
auto election = existing->second;
if (election->confirmed && !election->stopped && election->status.winner->hash () == hash)
{
node.observers.account_balance.notify (account, false);
if (!pending_account.is_zero ())
add_confirmed (existing->second->status, block_a->qualified_root ());

node.receive_confirmed (transaction, block_a, hash);
nano::account account (0);
nano::uint128_t amount (0);
bool is_state_send (false);
nano::account pending_account (0);
node.process_confirmed_data (transaction, block_a, hash, sideband_a, account, amount, is_state_send, pending_account);
election->status.type = *election_status_type;
election->status.confirmation_request_count = election->confirmation_request_count;
node.observers.blocks.notify (election->status, account, amount, is_state_send);
if (amount > 0)
{
node.observers.account_balance.notify (pending_account, true);
node.observers.account_balance.notify (account, false);
if (!pending_account.is_zero ())
{
node.observers.account_balance.notify (pending_account, true);
}
}
}

election_winner_details.erase (hash);
}
}
}
}

pending_conf_height.erase (hash);
void nano::active_transactions::cemented_batch_finished_callback ()
{
// Depending on timing there is a situation where the election_winner_details is not reset.
// This can happen when a block wins an election, and the block is confirmed + observer
// called before the block hash gets added to election_winner_details. If the block is confirmed
// callbacks have already been done, so we can safely just remove it.
auto transaction = node.store.tx_begin_read ();
nano::lock_guard<std::mutex> guard (mutex);
for (auto it = election_winner_details.begin (); it != election_winner_details.end ();)
{
if (node.ledger.block_confirmed (transaction, it->first))
{
it = election_winner_details.erase (it);
}
else
{
++it;
}
}
}
Expand Down Expand Up @@ -220,7 +270,7 @@ void nano::active_transactions::request_confirm (nano::unique_lock<std::mutex> &

// Due to the confirmation height processor working asynchronously and compressing several roots into one frontier, probably_unconfirmed_frontiers can be wrong
{
auto pending_confirmation_height_size (node.pending_confirmation_height.size ());
auto pending_confirmation_height_size (confirmation_height_processor.awaiting_processing_size ());
bool probably_unconfirmed_frontiers (node.ledger.cache.block_count > node.ledger.cache.cemented_count + roots.size () + pending_confirmation_height_size);
bool bootstrap_weight_reached (node.ledger.cache.block_count >= node.ledger.bootstrap_weight_max_blocks);
if (node.config.frontiers_confirmation != nano::frontiers_confirmation_mode::disabled && bootstrap_weight_reached && probably_unconfirmed_frontiers && pending_confirmation_height_size < confirmed_frontiers_max_pending_cut_off)
Expand Down Expand Up @@ -344,7 +394,7 @@ void nano::active_transactions::request_loop ()

void nano::active_transactions::prioritize_account_for_confirmation (nano::active_transactions::prioritize_num_uncemented & cementable_frontiers_a, size_t & cementable_frontiers_size_a, nano::account const & account_a, nano::account_info const & info_a, uint64_t confirmation_height)
{
if (info_a.block_count > confirmation_height && !node.pending_confirmation_height.is_processing_block (info_a.head))
if (info_a.block_count > confirmation_height && !confirmation_height_processor.is_processing_block (info_a.head))
{
auto num_uncemented = info_a.block_count - confirmation_height;
nano::lock_guard<std::mutex> guard (mutex);
Expand Down Expand Up @@ -386,7 +436,7 @@ void nano::active_transactions::prioritize_account_for_confirmation (nano::activ
void nano::active_transactions::prioritize_frontiers_for_confirmation (nano::transaction const & transaction_a, std::chrono::milliseconds ledger_accounts_time_a, std::chrono::milliseconds wallet_account_time_a)
{
// Don't try to prioritize when there are a large number of pending confirmation heights as blocks can be cemented in the meantime, making the prioritization less reliable
if (node.pending_confirmation_height.size () < confirmed_frontiers_max_pending_cut_off)
if (confirmation_height_processor.awaiting_processing_size () < confirmed_frontiers_max_pending_cut_off)
{
size_t priority_cementable_frontiers_size;
size_t priority_wallet_cementable_frontiers_size;
Expand Down Expand Up @@ -884,12 +934,6 @@ bool nano::active_transactions::publish (std::shared_ptr<nano::block> block_a)
return result;
}

void nano::active_transactions::clear_block (nano::block_hash const & hash_a)
{
nano::lock_guard<std::mutex> guard (mutex);
pending_conf_height.erase (hash_a);
}

// Returns the type of election status requiring callbacks calling later
boost::optional<nano::election_status_type> nano::active_transactions::confirm_block (nano::transaction const & transaction_a, std::shared_ptr<nano::block> block_a)
{
Expand Down Expand Up @@ -1030,6 +1074,12 @@ std::chrono::steady_clock::time_point nano::active_transactions::find_dropped_el
}
}

size_t nano::active_transactions::election_winner_details_size ()
{
nano::lock_guard<std::mutex> guard (mutex);
return election_winner_details.size ();
}

nano::cementable_account::cementable_account (nano::account const & account_a, size_t blocks_uncemented_a) :
account (account_a), blocks_uncemented (blocks_uncemented_a)
{
Expand All @@ -1040,20 +1090,18 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (ac
size_t roots_count;
size_t blocks_count;
size_t confirmed_count;
size_t pending_conf_height_count;

{
nano::lock_guard<std::mutex> guard (active_transactions.mutex);
roots_count = active_transactions.roots.size ();
blocks_count = active_transactions.blocks.size ();
confirmed_count = active_transactions.confirmed.size ();
pending_conf_height_count = active_transactions.pending_conf_height.size ();
}

auto composite = std::make_unique<container_info_composite> (name);
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "roots", roots_count, sizeof (decltype (active_transactions.roots)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "blocks", blocks_count, sizeof (decltype (active_transactions.blocks)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "pending_conf_height", pending_conf_height_count, sizeof (decltype (active_transactions.pending_conf_height)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "election_winner_details", active_transactions.election_winner_details_size (), sizeof (decltype (active_transactions.election_winner_details)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "confirmed", confirmed_count, sizeof (decltype (active_transactions.confirmed)::value_type) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "priority_wallet_cementable_frontiers_count", active_transactions.priority_wallet_cementable_frontiers_size (), sizeof (nano::cementable_account) }));
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "priority_cementable_frontiers_count", active_transactions.priority_cementable_frontiers_size (), sizeof (nano::cementable_account) }));
Expand Down
11 changes: 7 additions & 4 deletions nano/node/active_transactions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class block_sideband;
class election;
class vote;
class transaction;
class confirmation_height_processor;

class conflict_info final
{
Expand Down Expand Up @@ -72,7 +73,7 @@ class active_transactions final
// clang-format on

public:
explicit active_transactions (nano::node &);
explicit active_transactions (nano::node &, nano::confirmation_height_processor &);
~active_transactions ();
// Start an election for a block
// Call action with confirmed block, may be different than what we started with
Expand All @@ -96,7 +97,8 @@ class active_transactions final
void stop ();
bool publish (std::shared_ptr<nano::block> block_a);
boost::optional<nano::election_status_type> confirm_block (nano::transaction const &, std::shared_ptr<nano::block>);
void post_confirmation_height_set (nano::transaction const & transaction_a, std::shared_ptr<nano::block> block_a, nano::block_sideband const & sideband_a, nano::election_status_type election_status_type_a);
void block_cemented_callback (std::shared_ptr<nano::block> const & block_a, nano::block_sideband const & sideband_a);
void cemented_batch_finished_callback ();
// clang-format off
boost::multi_index_container<nano::conflict_info,
mi::indexed_by<
Expand All @@ -114,6 +116,7 @@ class active_transactions final
void add_inactive_votes_cache (nano::block_hash const &, nano::account const &);
nano::gap_information find_inactive_votes_cache (nano::block_hash const &);
void erase_inactive_votes_cache (nano::block_hash const &);
nano::confirmation_height_processor & confirmation_height_processor;
nano::node & node;
std::mutex mutex;
boost::circular_buffer<double> multipliers_cb;
Expand All @@ -122,8 +125,8 @@ class active_transactions final
size_t priority_wallet_cementable_frontiers_size ();
boost::circular_buffer<double> difficulty_trend ();
size_t inactive_votes_cache_size ();
std::unordered_map<nano::block_hash, std::shared_ptr<nano::election>> pending_conf_height;
void clear_block (nano::block_hash const & hash_a);
std::unordered_map<nano::block_hash, std::shared_ptr<nano::election>> election_winner_details;
size_t election_winner_details_size ();
void add_dropped_elections_cache (nano::qualified_root const &);
std::chrono::steady_clock::time_point find_dropped_elections_cache (nano::qualified_root const &);
size_t dropped_elections_cache_size ();
Expand Down
2 changes: 1 addition & 1 deletion nano/node/cli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,7 @@ void reset_confirmation_heights (nano::block_store & store)

// Then make sure the confirmation height of the genesis account open block is 1
nano::network_params network_params;
store.confirmation_height_put (transaction, network_params.ledger.genesis_account, { 1, network_params.ledger.genesis_block });
store.confirmation_height_put (transaction, network_params.ledger.genesis_account, { 1, network_params.ledger.genesis_hash });
}

bool is_using_rocksdb (boost::filesystem::path const & data_path, std::error_code & ec)
Expand Down
Loading