Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure propagation and removal for the work watcher #2709

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 112 additions & 37 deletions nano/core_test/wallet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1158,7 +1158,7 @@ TEST (wallet, deterministic_restore)
ASSERT_TRUE (wallet->exists (pub));
}

TEST (wallet, work_watcher_update)
TEST (work_watcher, update)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
Expand Down Expand Up @@ -1206,70 +1206,145 @@ TEST (wallet, work_watcher_update)
ASSERT_GT (updated_multiplier2, multiplier2);
}

TEST (wallet, work_watcher_generation_disabled)
TEST (work_watcher, propagate)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.enable_voting = false;
node_config.work_watcher_period = 1s;
node_config.work_threads = 0;
auto & node = *system.add_node (node_config);
nano::work_pool pool (std::numeric_limits<unsigned>::max ());
nano::genesis genesis;
nano::node_flags node_flags;
node_flags.disable_request_loop = true;
auto & node = *system.add_node (node_config, node_flags);
auto & wallet (*system.wallet (0));
wallet.insert_adhoc (nano::test_genesis_key.prv);
node_config.peering_port = nano::get_available_port ();
auto & node_passive = *system.add_node (node_config);
nano::keypair key;
auto block (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Mxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *pool.generate (genesis.hash ())));
auto difficulty (block->difficulty ());
node.wallets.watcher->add (block);
ASSERT_FALSE (node.process_local (block).code != nano::process_result::progress);
ASSERT_TRUE (node.wallets.watcher->is_watched (block->qualified_root ()));
auto multiplier = nano::normalized_multiplier (nano::difficulty::to_multiplier (difficulty, nano::work_threshold (block->work_version (), nano::block_details (nano::epoch::epoch_0, true, false, false))), node.network_params.network.publish_thresholds.epoch_1);
double updated_multiplier{ multiplier };
auto const block (wallet.send_action (nano::test_genesis_key.pub, key.pub, 100));
system.deadline_set (5s);
while (!node_passive.ledger.block_exists (block->hash ()))
{
ASSERT_NO_ERROR (system.poll ());
}
auto const multiplier (nano::normalized_multiplier (nano::difficulty::to_multiplier (block->difficulty (), nano::work_threshold (block->work_version (), nano::block_details (nano::epoch::epoch_0, false, false, false))), node.network_params.network.publish_thresholds.epoch_1));
auto updated_multiplier{ multiplier };
auto propagated_multiplier{ multiplier };
{
nano::lock_guard<std::mutex> guard (node.active.mutex);
node.active.trended_active_multiplier = multiplier * 1.001;
}
bool updated{ false };
bool propagated{ false };
system.deadline_set (10s);
while (!(updated && propagated))
{
nano::unique_lock<std::mutex> lock (node.active.mutex);
// Prevent active difficulty repopulating multipliers
node.network_params.network.request_interval_ms = 10000;
//fill multipliers_cb and update active difficulty;
for (auto i (0); i < node.active.multipliers_cb.size (); i++)
{
node.active.multipliers_cb.push_back (multiplier * (1.5 + i / 100.));
nano::lock_guard<std::mutex> guard (node.active.mutex);
{
auto const existing (node.active.roots.find (block->qualified_root ()));
ASSERT_NE (existing, node.active.roots.end ());
updated_multiplier = existing->multiplier;
}
}
node.active.update_active_multiplier (lock);
{
nano::lock_guard<std::mutex> guard (node_passive.active.mutex);
{
auto const existing (node_passive.active.roots.find (block->qualified_root ()));
ASSERT_NE (existing, node_passive.active.roots.end ());
propagated_multiplier = existing->multiplier;
}
}
updated = updated_multiplier != multiplier;
propagated = propagated_multiplier != multiplier;
ASSERT_NO_ERROR (system.poll ());
}
std::this_thread::sleep_for (5s);
ASSERT_GT (updated_multiplier, multiplier);
ASSERT_EQ (propagated_multiplier, updated_multiplier);
}

nano::lock_guard<std::mutex> guard (node.active.mutex);
TEST (work_watcher, removed_after_win)
{
nano::system system (1);
auto & node (*system.nodes[0]);
auto & wallet (*system.wallet (0));
wallet.insert_adhoc (nano::test_genesis_key.prv);
nano::keypair key;
ASSERT_EQ (0, wallet.wallets.watcher->size ());
auto const block1 (wallet.send_action (nano::test_genesis_key.pub, key.pub, 100));
ASSERT_EQ (1, wallet.wallets.watcher->size ());
system.deadline_set (5s);
while (node.wallets.watcher->is_watched (block1->qualified_root ()))
{
auto const existing (node.active.roots.find (block->qualified_root ()));
//if existing is junk the block has been confirmed already
ASSERT_NE (existing, node.active.roots.end ());
updated_multiplier = existing->multiplier;
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (updated_multiplier, multiplier);
ASSERT_TRUE (node.distributed_work.items.empty ());
ASSERT_EQ (0, node.wallets.watcher->size ());
}

TEST (wallet, work_watcher_removed)
TEST (work_watcher, removed_after_lose)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.enable_voting = false;
node_config.work_watcher_period = 1s;
auto & node = *system.add_node (node_config);
(void)node;
auto & wallet (*system.wallet (0));
wallet.insert_adhoc (nano::test_genesis_key.prv);
nano::keypair key;
ASSERT_EQ (0, wallet.wallets.watcher->size ());
auto const block (wallet.send_action (nano::test_genesis_key.pub, key.pub, 100));
ASSERT_EQ (1, wallet.wallets.watcher->size ());
auto transaction (wallet.wallets.tx_begin_write ());
system.deadline_set (3s);
while (0 == wallet.wallets.watcher->size ())
auto const block1 (wallet.send_action (nano::test_genesis_key.pub, key.pub, 100));
ASSERT_TRUE (node.wallets.watcher->is_watched (block1->qualified_root ()));
nano::genesis genesis;
auto fork1 (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::xrb_ratio, nano::test_genesis_key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *system.work.generate (genesis.hash ())));
node.process_active (fork1);
node.block_processor.flush ();
auto vote (std::make_shared<nano::vote> (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, fork1));
nano::confirm_ack message (vote);
node.network.process_message (message, nullptr);
system.deadline_set (5s);
while (node.wallets.watcher->is_watched (block1->qualified_root ()))
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_EQ (0, node.wallets.watcher->size ());
}

TEST (work_watcher, generation_disabled)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
node_config.enable_voting = false;
node_config.work_watcher_period = 1s;
node_config.work_threads = 0;
nano::node_flags node_flags;
node_flags.disable_request_loop = true;
auto & node = *system.add_node (node_config);
ASSERT_FALSE (node.work_generation_enabled ());
nano::work_pool pool (std::numeric_limits<unsigned>::max ());
nano::genesis genesis;
nano::keypair key;
auto block (std::make_shared<nano::state_block> (nano::test_genesis_key.pub, genesis.hash (), nano::test_genesis_key.pub, nano::genesis_amount - nano::Mxrb_ratio, key.pub, nano::test_genesis_key.prv, nano::test_genesis_key.pub, *pool.generate (genesis.hash ())));
auto difficulty (block->difficulty ());
node.wallets.watcher->add (block);
ASSERT_FALSE (node.process_local (block).code != nano::process_result::progress);
ASSERT_TRUE (node.wallets.watcher->is_watched (block->qualified_root ()));
auto multiplier = nano::normalized_multiplier (nano::difficulty::to_multiplier (difficulty, nano::work_threshold (block->work_version (), nano::block_details (nano::epoch::epoch_0, true, false, false))), node.network_params.network.publish_thresholds.epoch_1);
double updated_multiplier{ multiplier };
{
nano::lock_guard<std::mutex> guard (node.active.mutex);
node.active.trended_active_multiplier = multiplier * 10;
}
std::this_thread::sleep_for (2s);
ASSERT_TRUE (node.wallets.watcher->is_watched (block->qualified_root ()));
{
nano::lock_guard<std::mutex> guard (node.active.mutex);
auto const existing (node.active.roots.find (block->qualified_root ()));
ASSERT_NE (existing, node.active.roots.end ());
updated_multiplier = existing->multiplier;
}
ASSERT_EQ (updated_multiplier, multiplier);
ASSERT_TRUE (node.distributed_work.items.empty ());
}

TEST (wallet, work_watcher_cancel)
TEST (work_watcher, cancel)
{
nano::system system;
nano::node_config node_config (nano::get_available_port (), system.logging);
Expand Down
2 changes: 1 addition & 1 deletion nano/node/blockprocessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ void nano::block_processor::process_batch (nano::unique_lock<std::mutex> & lock_
for (auto & i : rollback_list)
{
node.votes_cache.remove (i->hash ());
node.wallets.watcher->remove (i);
node.wallets.watcher->remove (*i);
// Stop all rolled back active transactions except initial
if (i->hash () != successor->hash ())
{
Expand Down
2 changes: 1 addition & 1 deletion nano/node/testing.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ void nano::system::generate_rollback (nano::node & node_a, std::vector<nano::acc
debug_assert (!error);
for (auto & i : rollback_list)
{
node_a.wallets.watcher->remove (i);
node_a.wallets.watcher->remove (*i);
node_a.active.erase (*i);
}
}
Expand Down
78 changes: 33 additions & 45 deletions nano/node/wallet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1420,7 +1420,7 @@ node (node_a),
stopped (false)
{
node.observers.blocks.add ([this](nano::election_status const & status_a, nano::account const & account_a, nano::amount const & amount_a, bool is_state_send_a) {
this->remove (status_a.winner);
this->remove (*status_a.winner);
});
}

Expand Down Expand Up @@ -1460,65 +1460,53 @@ void nano::work_watcher::watching (nano::qualified_root const & root_a, std::sha
std::weak_ptr<nano::work_watcher> watcher_w (shared_from_this ());
node.alarm.add (std::chrono::steady_clock::now () + node.config.work_watcher_period, [block_a, root_a, watcher_w]() {
auto watcher_l = watcher_w.lock ();
if (watcher_l && !watcher_l->stopped && block_a != nullptr)
if (watcher_l && !watcher_l->stopped && watcher_l->is_watched (root_a))
{
nano::unique_lock<std::mutex> lock (watcher_l->mutex);
if (watcher_l->watched.find (root_a) != watcher_l->watched.end ()) // not yet confirmed or cancelled
auto active_difficulty (watcher_l->node.active.limited_active_difficulty (*block_a));
/*
* Work watcher should still watch blocks even without work generation, although no rework is done
* Functionality may be added in the future that does not require updating work
*/
if (active_difficulty > block_a->difficulty () && watcher_l->node.work_generation_enabled ())
{
lock.unlock ();
auto active_difficulty (watcher_l->node.active.limited_active_difficulty (*block_a));
/*
* Work watcher should still watch blocks even without work generation, although no rework is done
* Functionality may be added in the future that does not require updating work
*/
if (active_difficulty > block_a->difficulty () && watcher_l->node.work_generation_enabled ())
{
watcher_l->node.work_generate (
block_a->work_version (), block_a->root (), active_difficulty, [watcher_l, block_a, root_a](boost::optional<uint64_t> work_a) {
if (block_a != nullptr && watcher_l != nullptr && !watcher_l->stopped)
watcher_l->node.work_generate (
block_a->work_version (), block_a->root (), active_difficulty, [watcher_l, block_a, root_a](boost::optional<uint64_t> work_a) {
if (watcher_l->is_watched (root_a))
{
if (work_a.is_initialized ())
{
bool updated_l{ false };
if (work_a.is_initialized ())
{
nano::state_block_builder builder;
std::error_code ec;
std::shared_ptr<nano::state_block> block (builder.from (*block_a).work (*work_a).build (ec));

if (!ec)
{
watcher_l->node.network.flood_block_initial (block);
watcher_l->node.active.update_difficulty (block);
watcher_l->update (root_a, block);
updated_l = true;
watcher_l->watching (root_a, block);
}
}
if (!updated_l)
debug_assert (nano::work_difficulty (block_a->work_version (), block_a->root (), *work_a) > block_a->difficulty ());
nano::state_block_builder builder;
std::error_code ec;
std::shared_ptr<nano::state_block> block (builder.from (*block_a).work (*work_a).build (ec));
if (!ec)
{
watcher_l->watching (root_a, block_a);
watcher_l->node.network.flood_block_initial (block);
watcher_l->node.active.update_difficulty (block);
watcher_l->update (root_a, block);
}
}
},
block_a->account ());
}
else
{
watcher_l->watching (root_a, block_a);
}
watcher_l->watching (root_a, block_a);
}
},
block_a->account ());
}
else
{
watcher_l->watching (root_a, block_a);
}
}
});
}

void nano::work_watcher::remove (std::shared_ptr<nano::block> block_a)
void nano::work_watcher::remove (nano::block const & block_a)
{
auto root_l (block_a->qualified_root ());
nano::lock_guard<std::mutex> lock (mutex);
auto existing (watched.find (root_l));
if (existing != watched.end () && existing->second->hash () == block_a->hash ())
auto existing (watched.find (block_a.qualified_root ()));
if (existing != watched.end ())
{
watched.erase (existing);
node.observers.work_cancel.notify (block_a->root ());
node.observers.work_cancel.notify (block_a.root ());
}
}

Expand Down
2 changes: 1 addition & 1 deletion nano/node/wallet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ class work_watcher final : public std::enable_shared_from_this<nano::work_watche
void add (std::shared_ptr<nano::block>);
void update (nano::qualified_root const &, std::shared_ptr<nano::state_block>);
void watching (nano::qualified_root const &, std::shared_ptr<nano::state_block>);
void remove (std::shared_ptr<nano::block>);
void remove (nano::block const &);
bool is_watched (nano::qualified_root const &);
size_t size ();
std::mutex mutex;
Expand Down