Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve distributed_work stopping with ongoing worker tasks #2369

6 changes: 1 addition & 5 deletions nano/lib/work.cpp
Expand Up @@ -167,7 +167,7 @@ void nano::work_pool::cancel (nano::root const & root_a)
}
}
pending.remove_if ([&root_a](decltype (pending)::value_type const & item_a) {
bool result;
bool result{ false };
if (item_a.item == root_a)
{
if (item_a.callback)
Expand All @@ -176,10 +176,6 @@ void nano::work_pool::cancel (nano::root const & root_a)
}
result = true;
}
else
{
result = false;
}
return result;
});
}
Expand Down
42 changes: 31 additions & 11 deletions nano/node/distributed_work.cpp
Expand Up @@ -30,7 +30,7 @@ elapsed (nano::timer_state::started, "distributed work generation timer")

nano::distributed_work::~distributed_work ()
{
if (node.websocket_server && node.websocket_server->any_subscriber (nano::websocket::topic::work))
if (!node.stopped && node.websocket_server && node.websocket_server->any_subscriber (nano::websocket::topic::work))
{
nano::websocket::message_builder builder;
if (completed)
Expand Down Expand Up @@ -384,6 +384,11 @@ node (node_a)
{
}

nano::distributed_work_factory::~distributed_work_factory ()
{
stop ();
}

void nano::distributed_work_factory::make (nano::root const & root_a, std::vector<std::pair<std::string, uint16_t>> const & peers_a, std::function<void(boost::optional<uint64_t>)> const & callback_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a)
{
make (1, root_a, peers_a, callback_a, difficulty_a, account_a);
Expand All @@ -409,21 +414,19 @@ void nano::distributed_work_factory::make (unsigned int backoff_a, nano::root co

void nano::distributed_work_factory::cancel (nano::root const & root_a, bool const local_stop)
{
nano::lock_guard<std::mutex> guard_l (mutex);
auto existing_l (items.find (root_a));
if (existing_l != items.end ())
{
nano::lock_guard<std::mutex> guard (mutex);
auto existing_l (items.find (root_a));
if (existing_l != items.end ())
for (auto & distributed_w : existing_l->second)
{
for (auto & distributed_w : existing_l->second)
if (auto distributed_l = distributed_w.lock ())
{
if (auto distributed_l = distributed_w.lock ())
{
// Send work_cancel to work peers and stop local work generation
distributed_l->cancel_once ();
}
// Send work_cancel to work peers and stop local work generation
distributed_l->cancel_once ();
}
items.erase (existing_l);
}
items.erase (existing_l);
}
}

Expand All @@ -448,6 +451,23 @@ void nano::distributed_work_factory::cleanup_finished ()
}
}

void nano::distributed_work_factory::stop ()
{
Comment on lines +457 to +458
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should items be cleared at the end of this function? Otherwise in the destructor they can be cancelled again

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each vector of work is erased in cancel() , but I've added a clear to make sure, and a stopped flag to avoid calling stop() twice or adding new work in make () after stopping

// Cancel any ongoing work
std::unordered_set<nano::root> roots_l;
{
nano::lock_guard<std::mutex> guard_l (mutex);
for (auto const & item_l : items)
{
roots_l.insert (item_l.first);
}
}
for (auto const & root_l : roots_l)
{
cancel (root_l, true);
}
}

namespace nano
{
std::unique_ptr<seq_con_info_component> collect_seq_con_info (distributed_work_factory & distributed_work, const std::string & name)
Expand Down
2 changes: 2 additions & 0 deletions nano/node/distributed_work.hpp
Expand Up @@ -77,10 +77,12 @@ class distributed_work_factory final
{
public:
distributed_work_factory (nano::node &);
~distributed_work_factory ();
void make (nano::root const &, std::vector<std::pair<std::string, uint16_t>> const &, std::function<void(boost::optional<uint64_t>)> const &, uint64_t, boost::optional<nano::account> const & = boost::none);
void make (unsigned int, nano::root const &, std::vector<std::pair<std::string, uint16_t>> const &, std::function<void(boost::optional<uint64_t>)> const &, uint64_t, boost::optional<nano::account> const & = boost::none);
void cancel (nano::root const &, bool const local_stop = false);
void cleanup_finished ();
void stop ();

nano::node & node;
std::unordered_map<nano::root, std::vector<std::weak_ptr<nano::distributed_work>>> items;
Expand Down
9 changes: 5 additions & 4 deletions nano/node/node.cpp
Expand Up @@ -692,6 +692,7 @@ void nano::node::stop ()
{
logger.always_log ("Node stopping");
write_database_queue.stop ();
distributed_work.stop ();
block_processor.stop ();
if (block_processor_thread.joinable ())
{
Expand Down Expand Up @@ -1009,11 +1010,11 @@ boost::optional<uint64_t> nano::node::work_generate_blocking (nano::root const &

boost::optional<uint64_t> nano::node::work_generate_blocking (nano::root const & root_a, uint64_t difficulty_a, boost::optional<nano::account> const & account_a)
{
std::promise<uint64_t> promise;
std::future<uint64_t> future = promise.get_future ();
std::promise<boost::optional<uint64_t>> promise;
auto future (promise.get_future ());
// clang-format off
work_generate (root_a, [&promise](boost::optional<uint64_t> work_a) {
promise.set_value (work_a.value_or (0));
work_generate (root_a, [&promise](boost::optional<uint64_t> opt_work_a) {
promise.set_value (opt_work_a);
},
difficulty_a, account_a);
// clang-format on
Expand Down
2 changes: 1 addition & 1 deletion nano/node/wallet.cpp
Expand Up @@ -1376,7 +1376,7 @@ void nano::wallet::work_cache_blocking (nano::account const & account_a, nano::r
work_update (transaction_l, account_a, root_a, *opt_work_l);
}
}
else
else if (!wallets.node.stopped)
{
wallets.node.logger.try_log (boost::str (boost::format ("Could not precache work for root %1 due to work generation failure") % root_a.to_string ()));
}
Expand Down