Skip to content

Commit

Permalink
Merge 'Do not run aborted tasks' from Aleksandra Martyniuk
Browse files Browse the repository at this point in the history
task_manager::task::impl contains an abort source which can
be used to check whether it is aborted and an abort method
which aborts the task (request_abort on abort_source) and all
its descendants recursively.

When the start method is called after the task was aborted,
then its state is set to failed and the task does not run.

Fixes: scylladb#11995

Closes scylladb#11996

* github.com:scylladb/scylladb:
  tasks: do not run tasks that are aborted
  tasks: delete unused variable
  tasks: add abort_source to task_manager::task::impl
  • Loading branch information
avikivity committed Nov 17, 2022
2 parents a396c27 + 4250bd9 commit 2779a17
Showing 1 changed file with 34 additions and 6 deletions.
40 changes: 34 additions & 6 deletions tasks/task_manager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#pragma once

#include <boost/range/algorithm/transform.hpp>
#include <seastar/core/gate.hh>
#include <seastar/core/sharded.hh>
#include <seastar/core/sleep.hh>
Expand Down Expand Up @@ -95,6 +96,7 @@ public:
foreign_task_vector _children;
shared_promise<> _done;
module_ptr _module;
abort_source _as;
public:
impl(module_ptr module, task_id id, uint64_t sequence_number, std::string keyspace, std::string table, std::string type, std::string entity, task_id parent_id)
: _status({
Expand Down Expand Up @@ -126,7 +128,24 @@ public:
}

virtual future<> abort() noexcept {
return make_ready_future<>();
if (!_as.abort_requested()) {
_as.request_abort();

std::vector<task_info> children_info{_children.size()};
boost::transform(_children, children_info.begin(), [] (const auto& child) {
return task_info{child->id(), child.get_owner_shard()};
});

co_await coroutine::parallel_for_each(children_info, [this] (auto info) {
return smp::submit_to(info.shard, [info, &tm = _module->get_task_manager().container()] {
auto& tasks = tm.local().get_all_tasks();
if (auto it = tasks.find(info.id); it != tasks.end()) {
return it->second->abort();
}
return make_ready_future<>();
});
});
}
}
protected:
virtual future<> run() = 0;
Expand Down Expand Up @@ -202,7 +221,6 @@ public:

void start() {
_impl->_status.start_time = db_clock::now();
_impl->_status.state = task_manager::task_state::running;

try {
// Background fiber does not capture task ptr, so the task can be unregistered and destroyed independently in the foreground.
Expand All @@ -215,6 +233,8 @@ public:
module->unregister_task(id);
});
});
_impl->_as.check();
_impl->_status.state = task_manager::task_state::running;
_impl->run_to_completion();
} catch (...) {
_impl->finish_failed(std::current_exception());
Expand Down Expand Up @@ -245,6 +265,10 @@ public:
return _impl->abort();
}

bool abort_requested() const noexcept {
return _impl->_as.abort_requested();
}

future<> done() const noexcept {
return _impl->_done.get_shared_future();
}
Expand Down Expand Up @@ -336,18 +360,22 @@ public:
// from a parent and set. Otherwise, it must be set by caller.
future<task_ptr> make_task(task::task_impl_ptr task_impl_ptr, task_info parent_d = task_info{}) {
auto task = make_lw_shared<task_manager::task>(std::move(task_impl_ptr));
foreign_task_ptr parent;
bool abort = false;
if (parent_d) {
task->get_status().sequence_number = co_await _tm.container().invoke_on(parent_d.shard, [id = parent_d.id, task = make_foreign(task)] (task_manager& tm) mutable {
task->get_status().sequence_number = co_await _tm.container().invoke_on(parent_d.shard, [id = parent_d.id, task = make_foreign(task), &abort] (task_manager& tm) mutable {
const auto& all_tasks = tm.get_all_tasks();
if (auto it = all_tasks.find(id); it != all_tasks.end()) {
it->second->add_child(std::move(task));
return make_ready_future<uint64_t>(it->second->get_sequence_number());
abort = it->second->abort_requested();
return it->second->get_sequence_number();
} else {
return make_exception_future<uint64_t>(task_manager::task_not_found(id));
throw task_manager::task_not_found(id);
}
});
}
if (abort) {
co_await task->abort();
}
co_return task;
}
};
Expand Down

0 comments on commit 2779a17

Please sign in to comment.