Skip to content

Commit

Permalink
Merge pull request #4167 from clemahieu/block_origin_remove
Browse files Browse the repository at this point in the history
Add batch block processing result observer to block_processor
  • Loading branch information
clemahieu committed Mar 5, 2023
2 parents 97abf64 + 8f9536b commit b7816ba
Show file tree
Hide file tree
Showing 18 changed files with 364 additions and 144 deletions.
4 changes: 2 additions & 2 deletions nano/core_test/active_transactions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1064,14 +1064,14 @@ TEST (active_transactions, conflicting_block_vote_existing_election)
.build_shared ();
auto vote_fork (std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_max, nano::vote::duration_max, std::vector<nano::block_hash>{ fork->hash () }));

ASSERT_EQ (nano::process_result::progress, node.process_local (send).code);
ASSERT_EQ (nano::process_result::progress, node.process_local (send).value ().code);
ASSERT_TIMELY_EQ (5s, 1, node.active.size ());

// Vote for conflicting block, but the block does not yet exist in the ledger
node.active.vote (vote_fork);

// Block now gets processed
ASSERT_EQ (nano::process_result::fork, node.process_local (fork).code);
ASSERT_EQ (nano::process_result::fork, node.process_local (fork).value ().code);

// Election must be confirmed
auto election (node.active.election (fork->qualified_root ()));
Expand Down
5 changes: 2 additions & 3 deletions nano/core_test/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ TEST (node, fork_publish_inactive)
std::shared_ptr<nano::election> election;
ASSERT_TIMELY (5s, election = node.active.election (send1->qualified_root ()));

ASSERT_EQ (nano::process_result::fork, node.process_local (send2).code);
ASSERT_EQ (nano::process_result::fork, node.process_local (send2).value ().code);

auto blocks = election->blocks ();
ASSERT_TIMELY_EQ (5s, blocks.size (), 2);
Expand Down Expand Up @@ -3436,7 +3436,6 @@ TEST (node, aggressive_flooding)
nano::test::system system;
nano::node_flags node_flags;
node_flags.disable_request_loop = true;
node_flags.disable_block_processor_republishing = true;
node_flags.disable_bootstrap_bulk_push_client = true;
node_flags.disable_bootstrap_bulk_pull_server = true;
node_flags.disable_bootstrap_listener = true;
Expand Down Expand Up @@ -3521,7 +3520,7 @@ TEST (node, aggressive_flooding)
.build ();
}
// Processing locally goes through the aggressive block flooding path
ASSERT_EQ (nano::process_result::progress, node1.process_local (block).code);
ASSERT_EQ (nano::process_result::progress, node1.process_local (block).value ().code);

auto all_have_block = [&nodes_wallets] (nano::block_hash const & hash_a) {
return std::all_of (nodes_wallets.begin (), nodes_wallets.end (), [hash = hash_a] (auto const & node_wallet) {
Expand Down
6 changes: 3 additions & 3 deletions nano/core_test/vote_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ TEST (vote_processor, no_broadcast_local)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build (ec);
ASSERT_FALSE (ec);
ASSERT_EQ (nano::process_result::progress, node.process_local (send).code);
ASSERT_EQ (nano::process_result::progress, node.process_local (send).value ().code);
ASSERT_TIMELY (10s, !node.active.empty ());
ASSERT_EQ (2 * node.config.vote_minimum.number (), node.weight (nano::dev::genesis_key.pub));
// Insert account in wallet. Votes on node are not enabled.
Expand Down Expand Up @@ -251,7 +251,7 @@ TEST (vote_processor, local_broadcast_without_a_representative)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build (ec);
ASSERT_FALSE (ec);
ASSERT_EQ (nano::process_result::progress, node.process_local (send).code);
ASSERT_EQ (nano::process_result::progress, node.process_local (send).value ().code);
ASSERT_TIMELY (10s, !node.active.empty ());
ASSERT_EQ (node.config.vote_minimum, node.weight (nano::dev::genesis_key.pub));
node.block_confirm (send);
Expand Down Expand Up @@ -299,7 +299,7 @@ TEST (vote_processor, no_broadcast_local_with_a_principal_representative)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build (ec);
ASSERT_FALSE (ec);
ASSERT_EQ (nano::process_result::progress, node.process_local (send).code);
ASSERT_EQ (nano::process_result::progress, node.process_local (send).value ().code);
ASSERT_TIMELY (10s, !node.active.empty ());
ASSERT_EQ (nano::dev::constants.genesis_amount - 2 * node.config.vote_minimum.number (), node.weight (nano::dev::genesis_key.pub));
// Insert account in wallet. Votes on node are not enabled.
Expand Down
2 changes: 1 addition & 1 deletion nano/core_test/websocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1062,7 +1062,7 @@ TEST (websocket, new_unconfirmed_block)
.work (*system.work.generate (nano::dev::genesis->hash ()))
.build_shared ();

ASSERT_EQ (nano::process_result::progress, node1->process_local (send1).code);
ASSERT_EQ (nano::process_result::progress, node1->process_local (send1).value ().code);

ASSERT_TIMELY (5s, future.wait_for (0s) == std::future_status::ready);

Expand Down
4 changes: 3 additions & 1 deletion nano/lib/errors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ std::string nano::error_rpc_messages::message (int ev) const
return "Signing by block hash is disabled";
case nano::error_rpc::source_not_found:
return "Source not found";
case nano::error_rpc::stopped:
return "Stopped";
}

return "Invalid error code";
Expand Down Expand Up @@ -503,4 +505,4 @@ std::error_code make_error_code (boost::system::errc::errc_t const & e)
return std::error_code (static_cast<int> (e), ::nano::error_conversion::generic_category ());
}
}
#endif
#endif
3 changes: 2 additions & 1 deletion nano/lib/errors.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ enum class error_rpc
requires_port_and_address,
rpc_control_disabled,
sign_hash_disabled,
source_not_found
source_not_found,
stopped
};

/** process_result related errors */
Expand Down
4 changes: 4 additions & 0 deletions nano/node/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ add_library(
bandwidth_limiter.cpp
block_arrival.hpp
block_arrival.cpp
block_broadcast.cpp
block_broadcast.hpp
blocking_observer.cpp
blocking_observer.hpp
blockprocessor.hpp
blockprocessor.cpp
bootstrap/block_deserializer.hpp
Expand Down
68 changes: 68 additions & 0 deletions nano/node/block_broadcast.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#include <nano/node/block_arrival.hpp>
#include <nano/node/block_broadcast.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/network.hpp>

nano::block_broadcast::block_broadcast (nano::network & network, nano::block_arrival & block_arrival) :
network{ network },
block_arrival{ block_arrival }
{
}

void nano::block_broadcast::connect (nano::block_processor & block_processor, bool enabled)
{
if (!enabled)
{
return;
}
block_processor.processed.add ([this] (auto const & result, auto const & block) {
switch (result.code)
{
case nano::process_result::progress:
observe (block);
break;
default:
break;
}
erase (block);
});
}

void nano::block_broadcast::observe (std::shared_ptr<nano::block> block)
{
nano::unique_lock<nano::mutex> lock{ mutex };
auto existing = local.find (block);
auto local_l = existing != local.end ();
lock.unlock ();
if (local_l)
{
// Block created on this node
// Perform more agressive initial flooding
network.flood_block_initial (block);
}
else
{
if (block_arrival.recent (block->hash ()))
{
// Block arrived from realtime traffic, do normal gossip.
network.flood_block (block, nano::transport::buffer_drop_policy::limiter);
}
else
{
// Block arrived from bootstrap
// Don't broadcast blocks we're bootstrapping
}
}
}

void nano::block_broadcast::set_local (std::shared_ptr<nano::block> block)
{
nano::lock_guard<nano::mutex> lock{ mutex };
local.insert (block);
}

void nano::block_broadcast::erase (std::shared_ptr<nano::block> block)
{
nano::lock_guard<nano::mutex> lock{ mutex };
local.erase (block);
}
32 changes: 32 additions & 0 deletions nano/node/block_broadcast.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once

#include <nano/lib/blocks.hpp>

#include <memory>
#include <unordered_set>

namespace nano
{
class block_arrival;
class block_processor;
class network;
// This class tracks blocks that originated from this node.
class block_broadcast
{
public:
block_broadcast (nano::network & network, nano::block_arrival & block_arrival);
// Add batch_processed observer to block_processor if enabled
void connect (nano::block_processor & block_processor, bool enabled);
// Block_processor observer
void observe (std::shared_ptr<nano::block> block);
// Mark a block as originating locally
void set_local (std::shared_ptr<nano::block> block);
void erase (std::shared_ptr<nano::block> block);

private:
nano::network & network;
nano::block_arrival & block_arrival;
std::unordered_set<std::shared_ptr<nano::block>> local; // Blocks originated on this node
nano::mutex mutex;
};
}
52 changes: 52 additions & 0 deletions nano/node/blocking_observer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#include <nano/node/blocking_observer.hpp>
#include <nano/node/blockprocessor.hpp>

void nano::blocking_observer::connect (nano::block_processor & block_processor)
{
block_processor.processed.add ([this] (auto const & result, auto const & block) {
observe (result, block);
});
}

void nano::blocking_observer::stop ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
stopped = true;
auto discard = std::move (blocking);
// Signal broken promises outside lock
lock.unlock ();
discard.clear (); // ~promise future_error
}

void nano::blocking_observer::observe (nano::process_return const & result, std::shared_ptr<nano::block> block)
{
nano::unique_lock<nano::mutex> lock{ mutex };
auto existing = blocking.find (block);
if (existing != blocking.end ())
{
auto promise = std::move (existing->second);
blocking.erase (existing);
// Signal promise outside of lock
lock.unlock ();
promise.set_value (result);
}
}

std::future<nano::process_return> nano::blocking_observer::insert (std::shared_ptr<nano::block> block)
{
nano::lock_guard<nano::mutex> lock{ mutex };
if (stopped)
{
std::promise<nano::process_return> promise;
return promise.get_future (); // ~promise future_error
}
auto iterator = blocking.emplace (block, std::promise<nano::process_return>{});
return iterator->second.get_future ();
}

bool nano::blocking_observer::exists (std::shared_ptr<nano::block> block)
{
nano::lock_guard<nano::mutex> lock{ mutex };
auto existing = blocking.find (block);
return existing != blocking.end ();
}
31 changes: 31 additions & 0 deletions nano/node/blocking_observer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
#pragma once

#include <nano/lib/locks.hpp>
#include <nano/secure/common.hpp>

#include <future>
#include <memory>
#include <unordered_map>

namespace nano
{
class block;
class block_processor;
// Observer that facilitates a blocking call to block processing which is done asynchronosly by the block_processor
class blocking_observer
{
public:
void connect (nano::block_processor & block_processor);
// Stop the observer and trigger broken promise exceptions
void stop ();
// Block processor observer
void observe (nano::process_return const & result, std::shared_ptr<nano::block> block);
[[nodiscard]] std::future<nano::process_return> insert (std::shared_ptr<nano::block> block);
bool exists (std::shared_ptr<nano::block> block);

private:
std::unordered_multimap<std::shared_ptr<nano::block>, std::promise<nano::process_return>> blocking;
bool stopped{ false };
nano::mutex mutex;
};
}

0 comments on commit b7816ba

Please sign in to comment.