Skip to content

Commit

Permalink
Release write_guard lock when no longer required (#2716)
Browse files Browse the repository at this point in the history
* Release write_guard lock when no longer required

* Use move constructor for write_guard

* Increase maximum values for various settings

* Formatting

* Remove unnecessary extra space in comment

* Update slow_tests

* Fix slow_test check

* Dynamically set batch_write_size based on previous write performance. Change is gradual to account for random spikes/slowdowns.

* Add a tolerance in case amount to cement is just above to save waiting on block processor for small amount of blocks

* Reduce batch_write_size in slow_tests now that it's configurable so that it takes less time

* Don't call release if there's no blocks which were cemented at the end

* Typo in comment (thanks Gui)

* Prevent yoyoing as much

* (Unrelated) Fix prioritize_frontiers_overwrite test

* Increase amount of time spent searching for frontiers when there is a low amount of active transactions

* Have a force_write in unbounded to be consistent with bounded which is based on blocks

* Typo in comment

* Modify heuristics for updating active multiplier (Gui comment)

* Give magic number a variable (gui)

* Fix incorrect comparison (Gui)

* Add public function to determine if write_guard is owned and use that (Gui)
  • Loading branch information
wezrule committed Apr 22, 2020
1 parent 95bfae4 commit 996d903
Show file tree
Hide file tree
Showing 11 changed files with 196 additions and 81 deletions.
6 changes: 4 additions & 2 deletions nano/node/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,9 @@ void nano::active_transactions::frontiers_confirmation (nano::unique_lock<std::m
{
// Spend some time prioritizing accounts with the most uncemented blocks to reduce voting traffic
auto request_interval = std::chrono::milliseconds (node.network_params.network.request_interval_ms);
auto time_spent_prioritizing_ledger_accounts = request_interval / 100;
// Spend longer searching ledger accounts when there is a low amount of elections going on
auto low_active = roots.size () < 1000;
auto time_spent_prioritizing_ledger_accounts = request_interval / (low_active ? 20 : 100);
auto time_spent_prioritizing_wallet_accounts = request_interval / 250;
lock_a.unlock ();
auto transaction = node.store.tx_begin_read ();
Expand Down Expand Up @@ -800,7 +802,7 @@ void nano::active_transactions::update_active_multiplier (nano::unique_lock<std:
last_prioritized_multiplier.reset ();
double multiplier (1.);
// Heurestic to filter out non-saturated network and frontier confirmation
if (roots.size () > prioritized_cutoff / 2 || (node.network_params.network.is_test_network () && !roots.empty ()))
if (roots.size () >= prioritized_cutoff || (node.network_params.network.is_test_network () && !roots.empty ()))
{
auto & sorted_roots = roots.get<tag_difficulty> ();
std::vector<double> prioritized;
Expand Down
66 changes: 53 additions & 13 deletions nano/node/confirmation_height_bounded.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@

#include <numeric>

nano::confirmation_height_bounded::confirmation_height_bounded (nano::ledger & ledger_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logger_mt & logger_a, std::atomic<bool> & stopped_a, nano::block_hash const & original_hash_a, std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> const & notify_observers_callback_a, std::function<void(nano::block_hash const &)> const & notify_block_already_cemented_observers_callback_a, std::function<uint64_t ()> const & awaiting_processing_size_callback_a) :
nano::confirmation_height_bounded::confirmation_height_bounded (nano::ledger & ledger_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logger_mt & logger_a, std::atomic<bool> & stopped_a, nano::block_hash const & original_hash_a, uint64_t & batch_write_size_a, std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> const & notify_observers_callback_a, std::function<void(nano::block_hash const &)> const & notify_block_already_cemented_observers_callback_a, std::function<uint64_t ()> const & awaiting_processing_size_callback_a) :
ledger (ledger_a),
write_database_queue (write_database_queue_a),
batch_separate_pending_min_time (batch_separate_pending_min_time_a),
logger (logger_a),
stopped (stopped_a),
original_hash (original_hash_a),
batch_write_size (batch_write_size_a),
notify_observers_callback (notify_observers_callback_a),
notify_block_already_cemented_observers_callback (notify_block_already_cemented_observers_callback_a),
awaiting_processing_size_callback (awaiting_processing_size_callback_a)
Expand Down Expand Up @@ -156,7 +157,7 @@ void nano::confirmation_height_bounded::process ()
return total += write_details_a.top_height - write_details_a.bottom_height + 1;
});

auto max_batch_write_size_reached = (total_pending_write_block_count >= confirmation_height::batch_write_size);
auto max_batch_write_size_reached = (total_pending_write_block_count >= batch_write_size);
// When there are a lot of pending confirmation height blocks, it is more efficient to
// bulk some of them up to enable better write performance which becomes the bottleneck.
auto min_time_exceeded = (timer.since_start () >= batch_separate_pending_min_time);
Expand All @@ -165,19 +166,19 @@ void nano::confirmation_height_bounded::process ()
auto should_output = finished_iterating && (non_awaiting_processing || min_time_exceeded);
auto force_write = pending_writes.size () >= pending_writes_max_size || accounts_confirmed_info.size () >= pending_writes_max_size;

if (((max_batch_write_size_reached || should_output) && !pending_writes.empty ()) || force_write)
if ((max_batch_write_size_reached || should_output || force_write) && !pending_writes.empty ())
{
bool error = false;
// If nothing is currently using the database write lock then write the cemented pending blocks otherwise continue iterating
if (write_database_queue.process (nano::writer::confirmation_height))
{
auto scoped_write_guard = write_database_queue.pop ();
error = cement_blocks ();
error = cement_blocks (scoped_write_guard);
}
else if (force_write)
{
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
error = cement_blocks ();
error = cement_blocks (scoped_write_guard);
}
// Don't set any more cemented blocks from the original hash if an inconsistency is found
if (error)
Expand Down Expand Up @@ -332,15 +333,16 @@ void nano::confirmation_height_bounded::prepare_iterated_blocks_for_cementing (p
}
}

bool nano::confirmation_height_bounded::cement_blocks ()
bool nano::confirmation_height_bounded::cement_blocks (nano::write_guard & scoped_write_guard_a)
{
// Will contain all blocks that have been cemented (bounded by batch_write_size)
// and will get run through the cemented observer callback
std::vector<std::shared_ptr<nano::block>> cemented_blocks;
{
// This only writes to the confirmation_height table and is the only place to do so in a single process
auto transaction (ledger.store.tx_begin_write ({}, { nano::tables::confirmation_height }));

nano::timer<> timer;
timer.start ();
// Cement all pending entries, each entry is specific to an account and contains the least amount
// of blocks to retain consistent cementing across all account chains to genesis.
while (!pending_writes.empty ())
Expand Down Expand Up @@ -407,22 +409,52 @@ bool nano::confirmation_height_bounded::cement_blocks ()
return true;
}

auto last_iteration = (num_blocks_confirmed - num_blocks_iterated) == 1;

cemented_blocks.emplace_back (block);

// We have likely hit a long chain, flush these callbacks and continue
if (cemented_blocks.size () == confirmation_height::batch_write_size)
// Flush these callbacks and continue as we write in batches (ideally maximum 250ms) to not hold write db transaction for too long.
// Include a tolerance to save having to potentially wait on the block processor if the number of blocks to cement is only a bit higher than the max.
if (cemented_blocks.size () > batch_write_size + (batch_write_size / 10))
{
auto num_blocks_cemented = num_blocks_iterated - total_blocks_cemented + 1;
total_blocks_cemented += num_blocks_cemented;
write_confirmation_height (num_blocks_cemented, start_height + total_blocks_cemented - 1, new_cemented_frontier);
transaction.commit ();

// Update the maximum amount of blocks to write next time based on the time it took to cement this batch.
if (!network_params.network.is_test_network ())
{
auto const amount_to_change = batch_write_size / 10; // 10%
auto const maximum_batch_write_time = 250; // milliseconds
auto const maximum_batch_write_time_increase_cutoff = maximum_batch_write_time - (maximum_batch_write_time / 5);
if (timer.since_start ().count () > maximum_batch_write_time)
{
// Reduce (unless we have hit a floor)
auto const minimum_batch_size = 16384u;
batch_write_size = std::max<uint64_t> (minimum_batch_size, batch_write_size - amount_to_change);
}
else if (timer.since_start ().count () < maximum_batch_write_time_increase_cutoff)
{
// Increase amount of blocks written for next batch if the time for writing this one is sufficiently lower than the max time to warrant changing
batch_write_size += amount_to_change;
}
}

scoped_write_guard_a.release ();
notify_observers_callback (cemented_blocks);
cemented_blocks.clear ();
transaction.renew ();

// Only aquire transaction if there are any blocks left
if (!(last_iteration && pending_writes.size () == 1))
{
scoped_write_guard_a = write_database_queue.wait (nano::writer::confirmation_height);
transaction.renew ();
timer.restart ();
}
}

// Get the next block in the chain until we have reached the final desired one
auto last_iteration = (num_blocks_confirmed - num_blocks_iterated) == 1;
if (!last_iteration)
{
new_cemented_frontier = block->sideband ().successor;
Expand All @@ -436,7 +468,10 @@ bool nano::confirmation_height_bounded::cement_blocks ()
}

auto num_blocks_cemented = num_blocks_confirmed - total_blocks_cemented;
write_confirmation_height (num_blocks_cemented, pending.top_height, new_cemented_frontier);
if (num_blocks_cemented > 0)
{
write_confirmation_height (num_blocks_cemented, pending.top_height, new_cemented_frontier);
}
}

auto it = accounts_confirmed_info.find (pending.account);
Expand All @@ -450,7 +485,12 @@ bool nano::confirmation_height_bounded::cement_blocks ()
}
}

notify_observers_callback (cemented_blocks);
// Scope guard could have been released earlier (0 cemented_blocks would indicate that)
if (scoped_write_guard_a.is_owned () && !cemented_blocks.empty ())
{
scoped_write_guard_a.release ();
notify_observers_callback (cemented_blocks);
}

debug_assert (pending_writes.empty ());
debug_assert (pending_writes_size == 0);
Expand Down
14 changes: 9 additions & 5 deletions nano/node/confirmation_height_bounded.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@ class ledger;
class read_transaction;
class logger_mt;
class write_database_queue;
class write_guard;

class confirmation_height_bounded final
{
public:
confirmation_height_bounded (nano::ledger &, nano::write_database_queue &, std::chrono::milliseconds, nano::logger_mt &, std::atomic<bool> &, nano::block_hash const &, std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> const &, std::function<void(nano::block_hash const &)> const &, std::function<uint64_t ()> const &);
confirmation_height_bounded (nano::ledger &, nano::write_database_queue &, std::chrono::milliseconds, nano::logger_mt &, std::atomic<bool> &, nano::block_hash const &, uint64_t &, std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> const &, std::function<void(nano::block_hash const &)> const &, std::function<uint64_t ()> const &);
bool pending_empty () const;
void prepare_new ();
void process ();
bool cement_blocks ();
bool cement_blocks (nano::write_guard & scoped_write_guard_a);

private:
class top_and_next_hash final
Expand Down Expand Up @@ -53,9 +54,10 @@ class confirmation_height_bounded final
};

/** The maximum number of blocks to be read in while iterating over a long account chain */
static uint64_t constexpr batch_read_size = 4096;
uint64_t const batch_read_size = 65536;

static uint32_t constexpr max_items{ 65536 };
/** The maximum number of various containers to keep the memory bounded */
uint32_t const max_items{ 131072 };

// All of the atomic variables here just track the size for use in collect_container_info.
// This is so that no mutexes are needed during the algorithm itself, which would otherwise be needed
Expand All @@ -64,7 +66,7 @@ class confirmation_height_bounded final
// This allows the load and stores to use relaxed atomic memory ordering.
std::deque<write_details> pending_writes;
nano::relaxed_atomic_integral<uint64_t> pending_writes_size{ 0 };
static uint32_t constexpr pending_writes_max_size{ max_items };
uint32_t const pending_writes_max_size{ max_items };
/* Holds confirmation height/cemented frontier in memory for accounts while iterating */
std::unordered_map<account, confirmed_info> accounts_confirmed_info;
nano::relaxed_atomic_integral<uint64_t> accounts_confirmed_info_size{ 0 };
Expand Down Expand Up @@ -120,9 +122,11 @@ class confirmation_height_bounded final
nano::logger_mt & logger;
std::atomic<bool> & stopped;
nano::block_hash const & original_hash;
uint64_t & batch_write_size;
std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> notify_observers_callback;
std::function<void(nano::block_hash const &)> notify_block_already_cemented_observers_callback;
std::function<uint64_t ()> awaiting_processing_size_callback;
nano::network_params network_params;

friend std::unique_ptr<nano::container_info_component> collect_container_info (confirmation_height_bounded &, const std::string & name_a);
};
Expand Down
16 changes: 10 additions & 6 deletions nano/node/confirmation_height_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ nano::confirmation_height_processor::confirmation_height_processor (nano::ledger
ledger (ledger_a),
write_database_queue (write_database_queue_a),
// clang-format off
confirmation_height_unbounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }),
confirmation_height_bounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }),
confirmation_height_unbounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, batch_write_size, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }),
confirmation_height_bounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, batch_write_size, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }),
// clang-format on
thread ([this, &latch, mode_a]() {
nano::thread_role::set (nano::thread_role::name::confirmation_height_processing);
Expand Down Expand Up @@ -106,15 +106,19 @@ void nano::confirmation_height_processor::run (confirmation_height_mode mode_a)
if (!confirmation_height_bounded_processor.pending_empty ())
{
debug_assert (confirmation_height_unbounded_processor.pending_empty ());
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
confirmation_height_bounded_processor.cement_blocks ();
{
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
confirmation_height_bounded_processor.cement_blocks (scoped_write_guard);
}
lock_and_cleanup ();
}
else if (!confirmation_height_unbounded_processor.pending_empty ())
{
debug_assert (confirmation_height_bounded_processor.pending_empty ());
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
confirmation_height_unbounded_processor.cement_blocks ();
{
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
confirmation_height_unbounded_processor.cement_blocks (scoped_write_guard);
}
lock_and_cleanup ();
}
else
Expand Down
6 changes: 6 additions & 0 deletions nano/node/confirmation_height_processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class confirmation_height_processor final

nano::ledger & ledger;
nano::write_database_queue & write_database_queue;
/** The maximum amount of blocks to write at once. This is dynamically modified by the bounded processor based on previous write performance **/
uint64_t batch_write_size{ 65536 };

confirmation_height_unbounded confirmation_height_unbounded_processor;
confirmation_height_bounded confirmation_height_bounded_processor;
std::thread thread;
Expand All @@ -70,6 +73,9 @@ class confirmation_height_processor final
friend class confirmation_height_dependent_election_Test;
friend class confirmation_height_dependent_election_after_already_cemented_Test;
friend class confirmation_height_dynamic_algorithm_no_transition_while_pending_Test;
friend class confirmation_height_many_accounts_many_confirmations_Test;
friend class confirmation_height_long_chains_Test;
friend class confirmation_height_many_accounts_single_confirmation_Test;
};

std::unique_ptr<container_info_component> collect_container_info (confirmation_height_processor &, const std::string &);
Expand Down
Loading

0 comments on commit 996d903

Please sign in to comment.