Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve distributed_work stopping with ongoing worker tasks #2369

28 changes: 13 additions & 15 deletions nano/core_test/distributed_work.cpp
Expand Up @@ -5,6 +5,13 @@

using namespace std::chrono_literals;

TEST (distributed_work, stopped)
{
nano::system system (24000, 1);
system.nodes[0]->distributed_work.stop ();
ASSERT_TRUE (system.nodes[0]->distributed_work.make (nano::block_hash (), {}, {}, nano::network_constants::publish_test_threshold));
}

TEST (distributed_work, no_peers)
{
nano::system system (24000, 1);
Expand All @@ -17,7 +24,7 @@ TEST (distributed_work, no_peers)
work = work_a;
done = true;
};
node->distributed_work.make (hash, node->config.work_peers, callback, node->network_params.network.publish_threshold, nano::account ());
ASSERT_FALSE (node->distributed_work.make (hash, node->config.work_peers, callback, node->network_params.network.publish_threshold, nano::account ()));
system.deadline_set (5s);
while (!done)
{
Expand All @@ -39,16 +46,7 @@ TEST (distributed_work, no_peers_disabled)
nano::node_config node_config (24000, system.logging);
node_config.work_threads = 0;
auto & node = *system.add_node (node_config);
bool done{ false };
auto callback_failure = [&done](boost::optional<uint64_t> work_a) {
ASSERT_FALSE (work_a.is_initialized ());
done = true;
};
node.distributed_work.make (nano::block_hash (), node.config.work_peers, callback_failure, nano::network_constants::publish_test_threshold);
while (!done)
{
ASSERT_NO_ERROR (system.poll ());
}
ASSERT_TRUE (node.distributed_work.make (nano::block_hash (), node.config.work_peers, {}, nano::network_constants::publish_test_threshold));
}

TEST (distributed_work, no_peers_cancel)
Expand All @@ -64,7 +62,7 @@ TEST (distributed_work, no_peers_cancel)
ASSERT_FALSE (work_a.is_initialized ());
done = true;
};
node.distributed_work.make (hash, node.config.work_peers, callback_to_cancel, nano::difficulty::from_multiplier (1e6, node.network_params.network.publish_threshold));
ASSERT_FALSE (node.distributed_work.make (hash, node.config.work_peers, callback_to_cancel, nano::difficulty::from_multiplier (1e6, node.network_params.network.publish_threshold)));
ASSERT_EQ (1, node.distributed_work.items.size ());
// cleanup should not cancel or remove an ongoing work
node.distributed_work.cleanup_finished ();
Expand All @@ -80,7 +78,7 @@ TEST (distributed_work, no_peers_cancel)

// now using observer
done = false;
node.distributed_work.make (hash, node.config.work_peers, callback_to_cancel, nano::difficulty::from_multiplier (1000000, node.network_params.network.publish_threshold));
ASSERT_FALSE (node.distributed_work.make (hash, node.config.work_peers, callback_to_cancel, nano::difficulty::from_multiplier (1000000, node.network_params.network.publish_threshold)));
ASSERT_EQ (1, node.distributed_work.items.size ());
node.observers.work_cancel.notify (hash);
system.deadline_set (20s);
Expand All @@ -104,7 +102,7 @@ TEST (distributed_work, no_peers_multi)
// Test many works for the same root
for (unsigned i{ 0 }; i < total; ++i)
{
node->distributed_work.make (hash, node->config.work_peers, callback, nano::difficulty::from_multiplier (10, node->network_params.network.publish_threshold));
ASSERT_FALSE (node->distributed_work.make (hash, node->config.work_peers, callback, nano::difficulty::from_multiplier (10, node->network_params.network.publish_threshold)));
}
// 1 root, and _total_ requests for that root are expected, but some may have already finished
ASSERT_EQ (1, node->distributed_work.items.size ());
Expand All @@ -129,7 +127,7 @@ TEST (distributed_work, no_peers_multi)
for (unsigned i{ 0 }; i < total; ++i)
{
nano::block_hash hash_i (i + 1);
node->distributed_work.make (hash_i, node->config.work_peers, callback, node->network_params.network.publish_threshold);
ASSERT_FALSE (node->distributed_work.make (hash_i, node->config.work_peers, callback, node->network_params.network.publish_threshold));
}
// 10 roots expected with 1 work each, but some may have completed so test for some
ASSERT_GT (node->distributed_work.items.size (), 5);
Expand Down
6 changes: 1 addition & 5 deletions nano/lib/work.cpp
Expand Up @@ -167,7 +167,7 @@ void nano::work_pool::cancel (nano::root const & root_a)
}
}
pending.remove_if ([&root_a](decltype (pending)::value_type const & item_a) {
bool result;
bool result{ false };
if (item_a.item == root_a)
{
if (item_a.callback)
Expand All @@ -176,10 +176,6 @@ void nano::work_pool::cancel (nano::root const & root_a)
}
result = true;
}
else
{
result = false;
}
return result;
});
}
Expand Down
79 changes: 53 additions & 26 deletions nano/node/distributed_work.cpp
Expand Up @@ -30,7 +30,7 @@ elapsed (nano::timer_state::started, "distributed work generation timer")

nano::distributed_work::~distributed_work ()
{
if (node.websocket_server && node.websocket_server->any_subscriber (nano::websocket::topic::work))
if (!node.stopped && node.websocket_server && node.websocket_server->any_subscriber (nano::websocket::topic::work))
{
nano::websocket::message_builder builder;
if (completed)
Expand Down Expand Up @@ -348,11 +348,12 @@ void nano::distributed_work::handle_failure (bool const last_a)
auto next_backoff (std::min (backoff * 2, (unsigned int)60 * 5));
// clang-format off
node.alarm.add (now + std::chrono::seconds (backoff), [ node_w, root_l = root, peers_l = peers, callback_l = callback, next_backoff, difficulty = difficulty, account_l = account ] {
bool error_l {true};
if (auto node_l = node_w.lock ())
{
node_l->distributed_work.make (next_backoff, root_l, peers_l, callback_l, difficulty, account_l);
error_l = node_l->distributed_work.make (next_backoff, root_l, peers_l, callback_l, difficulty, account_l);
}
else if (callback_l)
if (error_l && callback_l)
{
callback_l (boost::none);
}
Expand Down Expand Up @@ -384,46 +385,51 @@ node (node_a)
{
}

void nano::distributed_work_factory::make (nano::root const & root_a, std::vector<std::pair<std::string, uint16_t>> const & peers_a, std::function<void(boost::optional<uint64_t>)> const & callback_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a)
nano::distributed_work_factory::~distributed_work_factory ()
{
make (1, root_a, peers_a, callback_a, difficulty_a, account_a);
stop ();
}

void nano::distributed_work_factory::make (unsigned int backoff_a, nano::root const & root_a, std::vector<std::pair<std::string, uint16_t>> const & peers_a, std::function<void(boost::optional<uint64_t>)> const & callback_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a)
bool nano::distributed_work_factory::make (nano::root const & root_a, std::vector<std::pair<std::string, uint16_t>> const & peers_a, std::function<void(boost::optional<uint64_t>)> const & callback_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a)
{
cleanup_finished ();
if (node.work_generation_enabled ())
return make (1, root_a, peers_a, callback_a, difficulty_a, account_a);
}

bool nano::distributed_work_factory::make (unsigned int backoff_a, nano::root const & root_a, std::vector<std::pair<std::string, uint16_t>> const & peers_a, std::function<void(boost::optional<uint64_t>)> const & callback_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a)
{
bool error_l{ true };
if (!stopped)
{
auto distributed (std::make_shared<nano::distributed_work> (node, root_a, peers_a, backoff_a, callback_a, difficulty_a, account_a));
cleanup_finished ();
if (node.work_generation_enabled ())
{
nano::lock_guard<std::mutex> guard (mutex);
items[root_a].emplace_back (distributed);
auto distributed (std::make_shared<nano::distributed_work> (node, root_a, peers_a, backoff_a, callback_a, difficulty_a, account_a));
{
nano::lock_guard<std::mutex> guard (mutex);
items[root_a].emplace_back (distributed);
}
distributed->start ();
error_l = false;
}
distributed->start ();
}
else if (callback_a)
{
callback_a (boost::none);
}
return error_l;
}

void nano::distributed_work_factory::cancel (nano::root const & root_a, bool const local_stop)
{
nano::lock_guard<std::mutex> guard_l (mutex);
auto existing_l (items.find (root_a));
if (existing_l != items.end ())
{
nano::lock_guard<std::mutex> guard (mutex);
auto existing_l (items.find (root_a));
if (existing_l != items.end ())
for (auto & distributed_w : existing_l->second)
{
for (auto & distributed_w : existing_l->second)
if (auto distributed_l = distributed_w.lock ())
{
if (auto distributed_l = distributed_w.lock ())
{
// Send work_cancel to work peers and stop local work generation
distributed_l->cancel_once ();
}
// Send work_cancel to work peers and stop local work generation
distributed_l->cancel_once ();
}
items.erase (existing_l);
}
items.erase (existing_l);
}
}

Expand All @@ -448,6 +454,27 @@ void nano::distributed_work_factory::cleanup_finished ()
}
}

void nano::distributed_work_factory::stop ()
{
Comment on lines +457 to +458
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should items be cleared at the end of this function? Otherwise in the destructor they can be cancelled again

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each vector of work is erased in cancel() , but I've added a clear to make sure, and a stopped flag to avoid calling stop() twice or adding new work in make () after stopping

if (!stopped.exchange (true))
{
// Cancel any ongoing work
std::unordered_set<nano::root> roots_l;
nano::unique_lock<std::mutex> lock_l (mutex);
for (auto const & item_l : items)
{
roots_l.insert (item_l.first);
}
lock_l.unlock ();
for (auto const & root_l : roots_l)
{
cancel (root_l, true);
}
lock_l.lock ();
items.clear ();
}
cryptocode marked this conversation as resolved.
Show resolved Hide resolved
}

namespace nano
{
std::unique_ptr<seq_con_info_component> collect_seq_con_info (distributed_work_factory & distributed_work, const std::string & name)
Expand Down
7 changes: 5 additions & 2 deletions nano/node/distributed_work.hpp
Expand Up @@ -77,14 +77,17 @@ class distributed_work_factory final
{
public:
distributed_work_factory (nano::node &);
void make (nano::root const &, std::vector<std::pair<std::string, uint16_t>> const &, std::function<void(boost::optional<uint64_t>)> const &, uint64_t, boost::optional<nano::account> const & = boost::none);
void make (unsigned int, nano::root const &, std::vector<std::pair<std::string, uint16_t>> const &, std::function<void(boost::optional<uint64_t>)> const &, uint64_t, boost::optional<nano::account> const & = boost::none);
~distributed_work_factory ();
bool make (nano::root const &, std::vector<std::pair<std::string, uint16_t>> const &, std::function<void(boost::optional<uint64_t>)> const &, uint64_t, boost::optional<nano::account> const & = boost::none);
bool make (unsigned int, nano::root const &, std::vector<std::pair<std::string, uint16_t>> const &, std::function<void(boost::optional<uint64_t>)> const &, uint64_t, boost::optional<nano::account> const & = boost::none);
void cancel (nano::root const &, bool const local_stop = false);
void cleanup_finished ();
void stop ();

nano::node & node;
std::unordered_map<nano::root, std::vector<std::weak_ptr<nano::distributed_work>>> items;
std::mutex mutex;
std::atomic<bool> stopped{ false };
};

class seq_con_info_component;
Expand Down
16 changes: 10 additions & 6 deletions nano/node/node.cpp
Expand Up @@ -712,6 +712,7 @@ void nano::node::stop ()
wallets.stop ();
stats.stop ();
worker.stop ();
distributed_work.stop ();
// work pool is not stopped on purpose due to testing setup
}
}
Expand Down Expand Up @@ -999,7 +1000,11 @@ void nano::node::work_generate (nano::root const & root_a, std::function<void(bo
void nano::node::work_generate (nano::root const & root_a, std::function<void(boost::optional<uint64_t>)> callback_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a, bool secondary_work_peers_a)
{
auto const & peers_l (secondary_work_peers_a ? config.secondary_work_peers : config.work_peers);
distributed_work.make (root_a, peers_l, callback_a, difficulty_a, account_a);
if (distributed_work.make (root_a, peers_l, callback_a, difficulty_a, account_a))
{
// Error in creating the job (either stopped or work generation is not possible)
callback_a (boost::none);
}
}

boost::optional<uint64_t> nano::node::work_generate_blocking (nano::root const & root_a, boost::optional<nano::account> const & account_a)
Expand All @@ -1009,15 +1014,14 @@ boost::optional<uint64_t> nano::node::work_generate_blocking (nano::root const &

boost::optional<uint64_t> nano::node::work_generate_blocking (nano::root const & root_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a)
{
std::promise<uint64_t> promise;
std::future<uint64_t> future = promise.get_future ();
std::promise<boost::optional<uint64_t>> promise;
// clang-format off
work_generate (root_a, [&promise](boost::optional<uint64_t> work_a) {
promise.set_value (work_a.value_or (0));
work_generate (root_a, [&promise](boost::optional<uint64_t> opt_work_a) {
promise.set_value (opt_work_a);
},
difficulty_a, account_a);
// clang-format on
return future.get ();
return promise.get_future ().get ();
}

void nano::node::add_initial_peers ()
Expand Down
2 changes: 1 addition & 1 deletion nano/node/wallet.cpp
Expand Up @@ -1376,7 +1376,7 @@ void nano::wallet::work_cache_blocking (nano::account const & account_a, nano::r
work_update (transaction_l, account_a, root_a, *opt_work_l);
}
}
else
else if (!wallets.node.stopped)
{
wallets.node.logger.try_log (boost::str (boost::format ("Could not precache work for root %1 due to work generation failure") % root_a.to_string ()));
}
Expand Down