Skip to content

Commit

Permalink
Generalizing block_processor's primary observer. It handles a batch o…
Browse files Browse the repository at this point in the history
…f blocks and their processing result.
  • Loading branch information
clemahieu committed Mar 2, 2023
1 parent 97abf64 commit a5bd37f
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 5 deletions.
12 changes: 8 additions & 4 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,9 @@ void nano::block_processor::process_blocks ()
{
active = true;
lock.unlock ();
process_batch (lock);
lock.lock ();
auto processed = process_batch (lock);
batch_processed.notify (processed);
lock.lock ();
active = false;
}
else
Expand Down Expand Up @@ -208,8 +209,9 @@ void nano::block_processor::process_verified_state_blocks (std::deque<nano::stat
condition.notify_all ();
}

void nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock_a)
auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock_a) -> std::deque<processed_t>
{
std::deque<processed_t> processed;
auto scoped_write_guard = write_database_queue.wait (nano::writer::process_batch);
block_post_events post_events ([&store = node.store] { return store.tx_begin_read (); });
auto transaction (node.store.tx_begin_write ({ tables::accounts, tables::blocks, tables::frontiers, tables::pending, tables::unchecked }));
Expand Down Expand Up @@ -278,7 +280,8 @@ void nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
}
}
number_of_blocks_processed++;
process_one (transaction, post_events, block, force);
auto result = process_one (transaction, post_events, block, force);
processed.emplace_back (result, block);
lock_a.lock ();
}
awaiting_write = false;
Expand All @@ -288,6 +291,7 @@ void nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
{
node.logger.always_log (boost::str (boost::format ("Processed %1% blocks (%2% blocks were forced) in %3% %4%") % number_of_blocks_processed % number_of_forced_processed % timer_l.value ().count () % timer_l.unit ()));
}
return processed;
}

void nano::block_processor::process_live (nano::transaction const & transaction_a, nano::block_hash const & hash_a, std::shared_ptr<nano::block> const & block_a, nano::process_return const & process_return_a, nano::block_origin const origin_a)
Expand Down
6 changes: 5 additions & 1 deletion nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,15 @@ class block_processor final
std::atomic<bool> flushing{ false };
// Delay required for average network propagartion before requesting confirmation
static std::chrono::milliseconds constexpr confirmation_request_delay{ 1500 };

public: // Events
using processed_t = std::pair<nano::process_return, std::shared_ptr<nano::block>>;
nano::observer_set<nano::transaction const &, nano::process_return const &, nano::block const &> processed;
nano::observer_set<std::deque<processed_t> const &> batch_processed;

private:
void queue_unchecked (nano::write_transaction const &, nano::hash_or_account const &);
void process_batch (nano::unique_lock<nano::mutex> &);
std::deque<processed_t> process_batch (nano::unique_lock<nano::mutex> &);
void process_live (nano::transaction const &, nano::block_hash const &, std::shared_ptr<nano::block> const &, nano::process_return const &, nano::block_origin const = nano::block_origin::remote);
void process_verified_state_blocks (std::deque<nano::state_block_signature_verification::value_type> &, std::vector<int> const &, std::vector<nano::block_hash> const &, std::vector<nano::signature> const &);
bool stopped{ false };
Expand Down

0 comments on commit a5bd37f

Please sign in to comment.