Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release write_guard lock when no longer required #2716

Merged
merged 21 commits into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
7a50000
Release write_guard lock when no longer required
wezrule Apr 13, 2020
a24388c
Use move constructor for write_guard
wezrule Apr 14, 2020
152d669
Increase maximum values for various settings
wezrule Apr 14, 2020
8402c32
Formatting
wezrule Apr 14, 2020
cb2903a
Remove unnecessary extra space in comment
wezrule Apr 14, 2020
78c7684
Update slow_tests
wezrule Apr 14, 2020
b2ee204
Fix slow_test check
wezrule Apr 14, 2020
aed0766
Dynamically set batch_write_size based on previous write performance.…
wezrule Apr 14, 2020
34ff425
Add a tolerance in case amount to cement is just above to save waitin…
wezrule Apr 14, 2020
309a944
Reduce batch_write_size in slow_tests now that it's configurable so t…
wezrule Apr 14, 2020
2b3129c
Don't call release if there's no blocks which were cemented at the end
wezrule Apr 14, 2020
284b8aa
Typo in comment (thanks Gui)
wezrule Apr 14, 2020
0356320
Prevent yoyoing as much
wezrule Apr 14, 2020
bff12ee
(Unrelated) Fix prioritize_frontiers_overwrite test
wezrule Apr 14, 2020
ee14918
Increase amount of time spent searching for frontiers when there is a…
wezrule Apr 14, 2020
fc39bdc
Have a force_write in unbounded to be consistent with bounded which i…
wezrule Apr 15, 2020
2684367
Typo in comment
wezrule Apr 15, 2020
7f8874e
Modify heuristics for updating active multiplier (Gui comment)
wezrule Apr 16, 2020
1a3f8e3
Give magic number a variable (gui)
wezrule Apr 16, 2020
21e55c3
Fix incorrect comparison (Gui)
wezrule Apr 16, 2020
6d7ff55
Add public function to determine if write_guard is owned and use that…
wezrule Apr 16, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions nano/node/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,9 @@ void nano::active_transactions::frontiers_confirmation (nano::unique_lock<std::m
{
// Spend some time prioritizing accounts with the most uncemented blocks to reduce voting traffic
auto request_interval = std::chrono::milliseconds (node.network_params.network.request_interval_ms);
auto time_spent_prioritizing_ledger_accounts = request_interval / 100;
// Spend longer searching ledger accounts when there is a low amount of elections going on
auto low_active = roots.size () < 1000;
auto time_spent_prioritizing_ledger_accounts = request_interval / (low_active ? 20 : 100);
auto time_spent_prioritizing_wallet_accounts = request_interval / 250;
lock_a.unlock ();
auto transaction = node.store.tx_begin_read ();
Expand Down Expand Up @@ -800,7 +802,7 @@ void nano::active_transactions::update_active_multiplier (nano::unique_lock<std:
last_prioritized_multiplier.reset ();
double multiplier (1.);
// Heurestic to filter out non-saturated network and frontier confirmation
if (roots.size () > prioritized_cutoff / 2 || (node.network_params.network.is_test_network () && !roots.empty ()))
if (roots.size () >= prioritized_cutoff || (node.network_params.network.is_test_network () && !roots.empty ()))
{
auto & sorted_roots = roots.get<tag_difficulty> ();
std::vector<double> prioritized;
Expand Down
66 changes: 53 additions & 13 deletions nano/node/confirmation_height_bounded.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@

#include <numeric>

nano::confirmation_height_bounded::confirmation_height_bounded (nano::ledger & ledger_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logger_mt & logger_a, std::atomic<bool> & stopped_a, nano::block_hash const & original_hash_a, std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> const & notify_observers_callback_a, std::function<void(nano::block_hash const &)> const & notify_block_already_cemented_observers_callback_a, std::function<uint64_t ()> const & awaiting_processing_size_callback_a) :
nano::confirmation_height_bounded::confirmation_height_bounded (nano::ledger & ledger_a, nano::write_database_queue & write_database_queue_a, std::chrono::milliseconds batch_separate_pending_min_time_a, nano::logger_mt & logger_a, std::atomic<bool> & stopped_a, nano::block_hash const & original_hash_a, uint64_t & batch_write_size_a, std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> const & notify_observers_callback_a, std::function<void(nano::block_hash const &)> const & notify_block_already_cemented_observers_callback_a, std::function<uint64_t ()> const & awaiting_processing_size_callback_a) :
ledger (ledger_a),
write_database_queue (write_database_queue_a),
batch_separate_pending_min_time (batch_separate_pending_min_time_a),
logger (logger_a),
stopped (stopped_a),
original_hash (original_hash_a),
batch_write_size (batch_write_size_a),
notify_observers_callback (notify_observers_callback_a),
notify_block_already_cemented_observers_callback (notify_block_already_cemented_observers_callback_a),
awaiting_processing_size_callback (awaiting_processing_size_callback_a)
Expand Down Expand Up @@ -156,7 +157,7 @@ void nano::confirmation_height_bounded::process ()
return total += write_details_a.top_height - write_details_a.bottom_height + 1;
});

auto max_batch_write_size_reached = (total_pending_write_block_count >= confirmation_height::batch_write_size);
auto max_batch_write_size_reached = (total_pending_write_block_count >= batch_write_size);
// When there are a lot of pending confirmation height blocks, it is more efficient to
// bulk some of them up to enable better write performance which becomes the bottleneck.
auto min_time_exceeded = (timer.since_start () >= batch_separate_pending_min_time);
Expand All @@ -165,19 +166,19 @@ void nano::confirmation_height_bounded::process ()
auto should_output = finished_iterating && (non_awaiting_processing || min_time_exceeded);
auto force_write = pending_writes.size () >= pending_writes_max_size || accounts_confirmed_info.size () >= pending_writes_max_size;

if (((max_batch_write_size_reached || should_output) && !pending_writes.empty ()) || force_write)
if ((max_batch_write_size_reached || should_output || force_write) && !pending_writes.empty ())
{
bool error = false;
// If nothing is currently using the database write lock then write the cemented pending blocks otherwise continue iterating
if (write_database_queue.process (nano::writer::confirmation_height))
{
auto scoped_write_guard = write_database_queue.pop ();
error = cement_blocks ();
error = cement_blocks (scoped_write_guard);
}
else if (force_write)
{
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
error = cement_blocks ();
error = cement_blocks (scoped_write_guard);
}
// Don't set any more cemented blocks from the original hash if an inconsistency is found
if (error)
Expand Down Expand Up @@ -332,15 +333,16 @@ void nano::confirmation_height_bounded::prepare_iterated_blocks_for_cementing (p
}
}

bool nano::confirmation_height_bounded::cement_blocks ()
bool nano::confirmation_height_bounded::cement_blocks (nano::write_guard & scoped_write_guard_a)
{
// Will contain all blocks that have been cemented (bounded by batch_write_size)
// and will get run through the cemented observer callback
std::vector<std::shared_ptr<nano::block>> cemented_blocks;
{
// This only writes to the confirmation_height table and is the only place to do so in a single process
auto transaction (ledger.store.tx_begin_write ({}, { nano::tables::confirmation_height }));

nano::timer<> timer;
timer.start ();
// Cement all pending entries, each entry is specific to an account and contains the least amount
// of blocks to retain consistent cementing across all account chains to genesis.
while (!pending_writes.empty ())
Expand Down Expand Up @@ -407,22 +409,52 @@ bool nano::confirmation_height_bounded::cement_blocks ()
return true;
}

auto last_iteration = (num_blocks_confirmed - num_blocks_iterated) == 1;

cemented_blocks.emplace_back (block);

// We have likely hit a long chain, flush these callbacks and continue
if (cemented_blocks.size () == confirmation_height::batch_write_size)
// Flush these callbacks and continue as we write in batches (ideally maximum 250ms) to not hold write db transaction for too long.
// Include a tolerance to save having to potentially wait on the block processor if the number of blocks to cement is only a bit higher than the max.
if (cemented_blocks.size () > batch_write_size + (batch_write_size / 10))
{
auto num_blocks_cemented = num_blocks_iterated - total_blocks_cemented + 1;
total_blocks_cemented += num_blocks_cemented;
write_confirmation_height (num_blocks_cemented, start_height + total_blocks_cemented - 1, new_cemented_frontier);
transaction.commit ();

// Update the maximum amount of blocks to write next time based on the time it took to cement this batch.
if (!network_params.network.is_test_network ())
{
auto const amount_to_change = batch_write_size / 10; // 10%
auto const maximum_batch_write_time = 250; // milliseconds
auto const maximum_batch_write_time_increase_cutoff = maximum_batch_write_time - (maximum_batch_write_time / 5);
if (timer.since_start ().count () > maximum_batch_write_time)
{
// Reduce (unless we have hit a floor)
auto const minimum_batch_size = 16384u;
batch_write_size = std::max<uint64_t> (minimum_batch_size, batch_write_size - amount_to_change);
}
else if (timer.since_start ().count () < maximum_batch_write_time_increase_cutoff)
{
// Increase amount of blocks written for next batch if the time for writing this one is sufficiently lower than the max time to warrant changing
batch_write_size += amount_to_change;
}
}

scoped_write_guard_a.release ();
notify_observers_callback (cemented_blocks);
cemented_blocks.clear ();
transaction.renew ();

// Only aquire transaction if there are any blocks left
if (!(last_iteration && pending_writes.size () == 1))
{
scoped_write_guard_a = write_database_queue.wait (nano::writer::confirmation_height);
transaction.renew ();
timer.restart ();
}
}

// Get the next block in the chain until we have reached the final desired one
auto last_iteration = (num_blocks_confirmed - num_blocks_iterated) == 1;
if (!last_iteration)
{
new_cemented_frontier = block->sideband ().successor;
Expand All @@ -436,7 +468,10 @@ bool nano::confirmation_height_bounded::cement_blocks ()
}

auto num_blocks_cemented = num_blocks_confirmed - total_blocks_cemented;
write_confirmation_height (num_blocks_cemented, pending.top_height, new_cemented_frontier);
if (num_blocks_cemented > 0)
{
write_confirmation_height (num_blocks_cemented, pending.top_height, new_cemented_frontier);
}
}

auto it = accounts_confirmed_info.find (pending.account);
Expand All @@ -450,7 +485,12 @@ bool nano::confirmation_height_bounded::cement_blocks ()
}
}

notify_observers_callback (cemented_blocks);
// Scope guard could have been released earlier (0 cemented_blocks would indicate that)
if (scoped_write_guard_a.is_owned () && !cemented_blocks.empty ())
{
scoped_write_guard_a.release ();
notify_observers_callback (cemented_blocks);
}

debug_assert (pending_writes.empty ());
debug_assert (pending_writes_size == 0);
Expand Down
14 changes: 9 additions & 5 deletions nano/node/confirmation_height_bounded.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@ class ledger;
class read_transaction;
class logger_mt;
class write_database_queue;
class write_guard;

class confirmation_height_bounded final
{
public:
confirmation_height_bounded (nano::ledger &, nano::write_database_queue &, std::chrono::milliseconds, nano::logger_mt &, std::atomic<bool> &, nano::block_hash const &, std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> const &, std::function<void(nano::block_hash const &)> const &, std::function<uint64_t ()> const &);
confirmation_height_bounded (nano::ledger &, nano::write_database_queue &, std::chrono::milliseconds, nano::logger_mt &, std::atomic<bool> &, nano::block_hash const &, uint64_t &, std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> const &, std::function<void(nano::block_hash const &)> const &, std::function<uint64_t ()> const &);
bool pending_empty () const;
void prepare_new ();
void process ();
bool cement_blocks ();
bool cement_blocks (nano::write_guard & scoped_write_guard_a);

private:
class top_and_next_hash final
Expand Down Expand Up @@ -53,9 +54,10 @@ class confirmation_height_bounded final
};

/** The maximum number of blocks to be read in while iterating over a long account chain */
static uint64_t constexpr batch_read_size = 4096;
uint64_t const batch_read_size = 65536;

static uint32_t constexpr max_items{ 65536 };
/** The maximum number of various containers to keep the memory bounded */
uint32_t const max_items{ 131072 };

// All of the atomic variables here just track the size for use in collect_container_info.
// This is so that no mutexes are needed during the algorithm itself, which would otherwise be needed
Expand All @@ -64,7 +66,7 @@ class confirmation_height_bounded final
// This allows the load and stores to use relaxed atomic memory ordering.
std::deque<write_details> pending_writes;
nano::relaxed_atomic_integral<uint64_t> pending_writes_size{ 0 };
static uint32_t constexpr pending_writes_max_size{ max_items };
uint32_t const pending_writes_max_size{ max_items };
/* Holds confirmation height/cemented frontier in memory for accounts while iterating */
std::unordered_map<account, confirmed_info> accounts_confirmed_info;
nano::relaxed_atomic_integral<uint64_t> accounts_confirmed_info_size{ 0 };
Expand Down Expand Up @@ -120,9 +122,11 @@ class confirmation_height_bounded final
nano::logger_mt & logger;
std::atomic<bool> & stopped;
nano::block_hash const & original_hash;
uint64_t & batch_write_size;
std::function<void(std::vector<std::shared_ptr<nano::block>> const &)> notify_observers_callback;
std::function<void(nano::block_hash const &)> notify_block_already_cemented_observers_callback;
std::function<uint64_t ()> awaiting_processing_size_callback;
nano::network_params network_params;

friend std::unique_ptr<nano::container_info_component> collect_container_info (confirmation_height_bounded &, const std::string & name_a);
};
Expand Down
16 changes: 10 additions & 6 deletions nano/node/confirmation_height_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ nano::confirmation_height_processor::confirmation_height_processor (nano::ledger
ledger (ledger_a),
write_database_queue (write_database_queue_a),
// clang-format off
confirmation_height_unbounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }),
confirmation_height_bounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }),
confirmation_height_unbounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, batch_write_size, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }),
confirmation_height_bounded_processor (ledger_a, write_database_queue_a, batch_separate_pending_min_time_a, logger_a, stopped, original_hash, batch_write_size, [this](auto & cemented_blocks) { this->notify_observers (cemented_blocks); }, [this](auto const & block_hash_a) { this->notify_observers (block_hash_a); }, [this]() { return this->awaiting_processing_size (); }),
// clang-format on
thread ([this, &latch, mode_a]() {
nano::thread_role::set (nano::thread_role::name::confirmation_height_processing);
Expand Down Expand Up @@ -106,15 +106,19 @@ void nano::confirmation_height_processor::run (confirmation_height_mode mode_a)
if (!confirmation_height_bounded_processor.pending_empty ())
{
debug_assert (confirmation_height_unbounded_processor.pending_empty ());
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
confirmation_height_bounded_processor.cement_blocks ();
{
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
confirmation_height_bounded_processor.cement_blocks (scoped_write_guard);
}
lock_and_cleanup ();
}
else if (!confirmation_height_unbounded_processor.pending_empty ())
{
debug_assert (confirmation_height_bounded_processor.pending_empty ());
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
confirmation_height_unbounded_processor.cement_blocks ();
{
auto scoped_write_guard = write_database_queue.wait (nano::writer::confirmation_height);
confirmation_height_unbounded_processor.cement_blocks (scoped_write_guard);
}
lock_and_cleanup ();
}
else
Expand Down
6 changes: 6 additions & 0 deletions nano/node/confirmation_height_processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class confirmation_height_processor final

nano::ledger & ledger;
nano::write_database_queue & write_database_queue;
/** The maximum amount of blocks to write at once. This is dynamically modified by the bounded processor based on previous write performance **/
uint64_t batch_write_size{ 65536 };

confirmation_height_unbounded confirmation_height_unbounded_processor;
confirmation_height_bounded confirmation_height_bounded_processor;
std::thread thread;
Expand All @@ -70,6 +73,9 @@ class confirmation_height_processor final
friend class confirmation_height_dependent_election_Test;
friend class confirmation_height_dependent_election_after_already_cemented_Test;
friend class confirmation_height_dynamic_algorithm_no_transition_while_pending_Test;
friend class confirmation_height_many_accounts_many_confirmations_Test;
friend class confirmation_height_long_chains_Test;
friend class confirmation_height_many_accounts_single_confirmation_Test;
};

std::unique_ptr<container_info_component> collect_container_info (confirmation_height_processor &, const std::string &);
Expand Down
Loading