Skip to content

Commit

Permalink
task_manager: module: make_task: enter gate when the task is created
Browse files Browse the repository at this point in the history
Passing the gate_closed_exception to the task promise in start()
ends up with abandoned exception since no-one is waiting
for it.

Instead, enter the gate when the task is made
so it will fail make_task if the gate is already closed.

Fixes #15211

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
(cherry picked from commit f9a7635)
  • Loading branch information
bhalevy authored and avikivity committed Nov 30, 2023
1 parent bfeadae commit 1592a84
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
13 changes: 7 additions & 6 deletions tasks/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#include <seastar/core/on_internal_error.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/core/gate.hh>

#include "task_manager.hh"
#include "test_module.hh"

Expand Down Expand Up @@ -103,7 +105,7 @@ void task_manager::task::impl::finish_failed(std::exception_ptr ex) {
finish_failed(ex, fmt::format("{}", ex));
}

task_manager::task::task(task_impl_ptr&& impl) noexcept : _impl(std::move(impl)) {
task_manager::task::task(task_impl_ptr&& impl, gate::holder gh) noexcept : _impl(std::move(impl)), _gate_holder(std::move(gh)) {
register_task();
}

Expand Down Expand Up @@ -144,14 +146,13 @@ void task_manager::task::start() {
try {
// Background fiber does not capture task ptr, so the task can be unregistered and destroyed independently in the foreground.
// After the ttl expires, the task id will be used to unregister the task if that didn't happen in any other way.
(void)with_gate(_impl->_module->async_gate(), [f = done(), module = _impl->_module, id = id()] () mutable {
return std::move(f).finally([module, id] {
auto module = _impl->_module;
(void)done().finally([module] {
return sleep_abortable(module->get_task_manager().get_task_ttl(), module->abort_source());
}).then_wrapped([module, id] (auto f) {
}).then_wrapped([module, id = id()] (auto f) {
f.ignore_ready_future();
module->unregister_task(id);
});
});
_impl->_as.check();
_impl->_status.state = task_manager::task_state::running;
_impl->run_to_completion();
Expand Down Expand Up @@ -260,7 +261,7 @@ future<> task_manager::module::stop() noexcept {
}

future<task_manager::task_ptr> task_manager::module::make_task(task::task_impl_ptr task_impl_ptr, task_info parent_d) {
auto task = make_lw_shared<task_manager::task>(std::move(task_impl_ptr));
auto task = make_lw_shared<task_manager::task>(std::move(task_impl_ptr), async_gate().hold());
bool abort = false;
if (parent_d) {
task->get_status().sequence_number = co_await _tm.container().invoke_on(parent_d.shard, [id = parent_d.id, task = make_foreign(task), &abort] (task_manager& tm) mutable {
Expand Down
4 changes: 3 additions & 1 deletion tasks/task_manager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,10 @@ public:
using task_impl_ptr = std::unique_ptr<impl>;
protected:
task_impl_ptr _impl;
private:
gate::holder _gate_holder;
public:
task(task_impl_ptr&& impl) noexcept;
task(task_impl_ptr&& impl, gate::holder) noexcept;

task_id id();
std::string type() const;
Expand Down

0 comments on commit 1592a84

Please sign in to comment.