Skip to content

Commit

Permalink
Add the process_live_dipatcher class (#4181)
Browse files Browse the repository at this point in the history
* Add the process_live_dispatcher class

* Move from block_processor::process_live to process_live_dispatcher class

* Remove block_post_events class
  • Loading branch information
thsfs committed Mar 14, 2023
1 parent 4b4c271 commit 3004649
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 54 deletions.
2 changes: 2 additions & 0 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,8 @@ add_library(
portmapping.cpp
prioritization.cpp
prioritization.hpp
process_live_dispatcher.cpp
process_live_dispatcher.hpp
repcrawler.hpp
repcrawler.cpp
request_aggregator.hpp
Expand Down
42 changes: 2 additions & 40 deletions nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,12 @@
#include <nano/lib/timer.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/node.hpp>
#include <nano/node/websocket.hpp>
#include <nano/secure/store.hpp>

#include <boost/format.hpp>

std::chrono::milliseconds constexpr nano::block_processor::confirmation_request_delay;

nano::block_post_events::block_post_events (std::function<nano::read_transaction ()> && get_transaction_a) :
get_transaction (std::move (get_transaction_a))
{
}

nano::block_post_events::~block_post_events ()
{
debug_assert (get_transaction != nullptr);
auto transaction (get_transaction ());
for (auto const & i : events)
{
i (transaction);
}
}

nano::block_processor::block_processor (nano::node & node_a, nano::write_database_queue & write_database_queue_a) :
next_log (std::chrono::steady_clock::now ()),
node (node_a),
Expand Down Expand Up @@ -238,7 +222,6 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
{
std::deque<processed_t> processed;
auto scoped_write_guard = write_database_queue.wait (nano::writer::process_batch);
block_post_events post_events ([&store = node.store] { return store.tx_begin_read (); });
auto transaction (node.store.tx_begin_write ({ tables::accounts, tables::blocks, tables::frontiers, tables::pending, tables::unchecked }));
nano::timer<std::chrono::milliseconds> timer_l;
lock_a.lock ();
Expand Down Expand Up @@ -305,7 +288,7 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
}
}
number_of_blocks_processed++;
auto result = process_one (transaction, post_events, block, force);
auto result = process_one (transaction, block, force);
processed.emplace_back (result, block);
lock_a.lock ();
}
Expand All @@ -318,25 +301,7 @@ auto nano::block_processor::process_batch (nano::unique_lock<nano::mutex> & lock
return processed;
}

void nano::block_processor::process_live (nano::transaction const & transaction_a, std::shared_ptr<nano::block> const & block_a)
{
// Start collecting quorum on block
if (node.ledger.dependents_confirmed (transaction_a, *block_a))
{
auto account = block_a->account ().is_zero () ? block_a->sideband ().account : block_a->account ();
node.scheduler.activate (account, transaction_a);
}

// Notify inactive vote cache about a new live block
node.inactive_vote_cache.trigger (block_a->hash ());

if (node.websocket.server && node.websocket.server->any_subscriber (nano::websocket::topic::new_unconfirmed_block))
{
node.websocket.server->broadcast (nano::websocket::message_builder ().new_block_arrived (*block_a));
}
}

nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, block_post_events & events_a, std::shared_ptr<nano::block> block, bool const forced_a)
nano::process_return nano::block_processor::process_one (nano::write_transaction const & transaction_a, std::shared_ptr<nano::block> block, bool const forced_a)
{
nano::process_return result;
auto hash (block->hash ());
Expand All @@ -351,9 +316,6 @@ nano::process_return nano::block_processor::process_one (nano::write_transaction
block->serialize_json (block_string, node.config.logging.single_line_record ());
node.logger.try_log (boost::str (boost::format ("Processing block %1%: %2%") % hash.to_string () % block_string));
}
events_a.events.emplace_back ([this, block] (nano::transaction const & post_event_transaction_a) {
process_live (post_event_transaction_a, block);
});
queue_unchecked (transaction_a, hash);
/* For send blocks check epoch open unchecked (gap pending).
For state blocks check only send subtype and only if block epoch is not last epoch.
Expand Down
14 changes: 1 addition & 13 deletions nano/node/blockprocessor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,6 @@ class transaction;
class write_transaction;
class write_database_queue;

class block_post_events final
{
public:
explicit block_post_events (std::function<nano::read_transaction ()> &&);
~block_post_events ();
std::deque<std::function<void (nano::read_transaction const &)>> events;

private:
std::function<nano::read_transaction ()> get_transaction;
};

/**
* Processing blocks is a potentially long IO operation.
* This class isolates block insertion from other operations like servicing network operations
Expand Down Expand Up @@ -65,10 +54,9 @@ class block_processor final
blocking_observer blocking;

private:
nano::process_return process_one (nano::write_transaction const &, block_post_events &, std::shared_ptr<nano::block> block, bool const = false);
nano::process_return process_one (nano::write_transaction const &, std::shared_ptr<nano::block> block, bool const = false);
void queue_unchecked (nano::write_transaction const &, nano::hash_or_account const &);
std::deque<processed_t> process_batch (nano::unique_lock<nano::mutex> &);
void process_live (nano::transaction const &, std::shared_ptr<nano::block> const &);
void process_verified_state_blocks (std::deque<nano::state_block_signature_verification::value_type> &, std::vector<int> const &, std::vector<nano::block_hash> const &, std::vector<nano::signature> const &);
void add_impl (std::shared_ptr<nano::block> block);
bool stopped{ false };
Expand Down
4 changes: 3 additions & 1 deletion nano/node/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,13 @@ nano::node::node (boost::asio::io_context & io_ctx_a, boost::filesystem::path co
node_seq (seq),
block_broadcast{ network, block_arrival, !flags.disable_block_processor_republishing },
block_publisher{ active },
gap_tracker{ gap_cache }
gap_tracker{ gap_cache },
process_live_dispatcher{ ledger, scheduler, inactive_vote_cache, websocket }
{
block_broadcast.connect (block_processor);
block_publisher.connect (block_processor);
gap_tracker.connect (block_processor);
process_live_dispatcher.connect (block_processor);
unchecked.use_memory = [this] () { return ledger.bootstrap_weight_reached (); };
unchecked.satisfied = [this] (nano::unchecked_info const & info) {
this->block_processor.add (info.block);
Expand Down
2 changes: 2 additions & 0 deletions nano/node/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <nano/node/online_reps.hpp>
#include <nano/node/optimistic_scheduler.hpp>
#include <nano/node/portmapping.hpp>
#include <nano/node/process_live_dispatcher.hpp>
#include <nano/node/repcrawler.hpp>
#include <nano/node/request_aggregator.hpp>
#include <nano/node/signatures.hpp>
Expand Down Expand Up @@ -196,6 +197,7 @@ class node final : public std::enable_shared_from_this<nano::node>
nano::block_broadcast block_broadcast;
nano::block_publisher block_publisher;
nano::gap_tracker gap_tracker;
nano::process_live_dispatcher process_live_dispatcher;

std::chrono::steady_clock::time_point const startup_time;
std::chrono::seconds unchecked_cutoff = std::chrono::seconds (7 * 24 * 60 * 60); // Week
Expand Down
59 changes: 59 additions & 0 deletions nano/node/process_live_dispatcher.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#include <nano/lib/blocks.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/election_scheduler.hpp>
#include <nano/node/process_live_dispatcher.hpp>
#include <nano/node/vote_cache.hpp>
#include <nano/node/websocket.hpp>
#include <nano/secure/common.hpp>
#include <nano/secure/ledger.hpp>
#include <nano/secure/store.hpp>

nano::process_live_dispatcher::process_live_dispatcher (nano::ledger & ledger, nano::election_scheduler & scheduler, nano::vote_cache & inactive_vote_cache, nano::websocket_server & websocket) :
ledger{ ledger },
scheduler{ scheduler },
inactive_vote_cache{ inactive_vote_cache },
websocket{ websocket }
{
}

void nano::process_live_dispatcher::connect (nano::block_processor & block_processor)
{
block_processor.batch_processed.add ([this] (auto const & batch) {
auto const transaction = ledger.store.tx_begin_read ();
for (auto const & [result, block] : batch)
{
debug_assert (block != nullptr);
inspect (result, *block, transaction);
}
});
}

void nano::process_live_dispatcher::inspect (nano::process_return const & result, nano::block const & block, nano::transaction const & transaction)
{
switch (result.code)
{
case nano::process_result::progress:
process_live (block, transaction);
break;
default:
break;
}
}

void nano::process_live_dispatcher::process_live (nano::block const & block, nano::transaction const & transaction)
{
// Start collecting quorum on block
if (ledger.dependents_confirmed (transaction, block))
{
auto account = block.account ().is_zero () ? block.sideband ().account : block.account ();
scheduler.activate (account, transaction);
}

// Notify inactive vote cache about a new live block
inactive_vote_cache.trigger (block.hash ());

if (websocket.server && websocket.server->any_subscriber (nano::websocket::topic::new_unconfirmed_block))
{
websocket.server->broadcast (nano::websocket::message_builder ().new_block_arrived (block));
}
}
31 changes: 31 additions & 0 deletions nano/node/process_live_dispatcher.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once

namespace nano
{
class ledger;
class election_scheduler;
class vote_cache;
class websocket_server;
class block_processor;
class process_return;
class block;
class transaction;

// Observes confirmed blocks and dispatches the process_live function.
class process_live_dispatcher
{
public:
process_live_dispatcher (nano::ledger & ledger, nano::election_scheduler & scheduler, nano::vote_cache & inactive_vote_cache, nano::websocket_server & websocket);
void connect (nano::block_processor & block_processor);

private:
// Block_processor observer
void inspect (nano::process_return const & result, nano::block const & block, nano::transaction const & transaction);
void process_live (nano::block const & block, nano::transaction const & transaction);

nano::ledger & ledger;
nano::election_scheduler & scheduler;
nano::vote_cache & inactive_vote_cache;
nano::websocket_server & websocket;
};
}

0 comments on commit 3004649

Please sign in to comment.