Skip to content

Commit

Permalink
Merge 'task_manager: module: make_task: enter gate when the task is c…
Browse files Browse the repository at this point in the history
…reated' from Benny Halevy

Passing the gate_closed_exception to the task promise
ends up with abandoned exception since no-one is waiting
for it.

Instead, enter the gate when the task is made
so it will fail make_task if the gate is already closed.

Fixes #15211

In addition, this series adds a private abort_source for each task_manager module
(chained to the main task_manager::abort_source) and abort is requested on task_manager::module::stop().

gate holding in compaction_manager is hardened
and makes sure to stop compaction_manager and task_manager in sstable_compaction_test cases.

Closes #15213

* github.com:scylladb/scylladb:
  compaction_manager: stop: close compaction_state:s gates
  compaction_manager: gracefully handle gate close
  task_manager: task: start: fixup indentation
  task_manager: module: make_task: enter gate when the task is created
  task_manaer: module: stop: request abort
  task_manager: task::impl: subscribe to module about_source
  test: compaction_manager_stop_and_drain_race_test: stop compaction and task managers
  test: simple_backlog_controller_test: stop compaction and task managers
  • Loading branch information
nyh committed Sep 6, 2023
2 parents cfc7081 + cfecb68 commit 5930637
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 41 deletions.
57 changes: 44 additions & 13 deletions compaction/compaction_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,6 @@ requires (compaction_manager& cm, Args&&... args) {
}
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_compaction(throw_if_stopping do_throw_if_stopping, std::optional<tasks::task_info> parent_info, Args&&... args) {
auto task_executor = seastar::make_shared<TaskExecutor>(*this, std::forward<Args>(args)...);
gate::holder gate_holder = task_executor->_compaction_state.gate.hold();
_tasks.push_back(task_executor);
auto unregister_task = defer([this, task_executor] {
_tasks.remove(task_executor);
Expand All @@ -564,8 +563,22 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_com
co_return co_await perform_task(std::move(task_executor), do_throw_if_stopping);
}

future<> compaction_manager::perform_major_compaction(table_state& t, std::optional<tasks::task_info> info) {
std::optional<gate::holder> compaction_manager::start_compaction(table_state& t) {
if (_state != state::enabled) {
return std::nullopt;
}

auto it = _compaction_state.find(&t);
if (it == _compaction_state.end() || it->second.gate.is_closed()) {
return std::nullopt;
}

return it->second.gate.hold();
}

future<> compaction_manager::perform_major_compaction(table_state& t, std::optional<tasks::task_info> info) {
auto gh = start_compaction(t);
if (!gh) {
co_return;
}

Expand Down Expand Up @@ -621,11 +634,12 @@ class custom_compaction_task_executor : public compaction_task_executor, public
}

future<> compaction_manager::run_custom_job(table_state& t, sstables::compaction_type type, const char* desc, noncopyable_function<future<>(sstables::compaction_data&)> job, std::optional<tasks::task_info> info, throw_if_stopping do_throw_if_stopping) {
if (_state != state::enabled) {
return make_ready_future<>();
auto gh = start_compaction(t);
if (!gh) {
co_return;
}

return perform_compaction<custom_compaction_task_executor>(do_throw_if_stopping, info, &t, info.value_or(tasks::task_info{}).id, type, desc, std::move(job)).discard_result();
co_return co_await perform_compaction<custom_compaction_task_executor>(do_throw_if_stopping, info, &t, info.value_or(tasks::task_info{}).id, type, desc, std::move(job)).discard_result();
}

future<> compaction_manager::update_static_shares(float static_shares) {
Expand Down Expand Up @@ -1038,6 +1052,11 @@ future<> compaction_manager::really_do_stop() {
// Reset the metrics registry
_metrics.clear();
co_await stop_ongoing_compactions("shutdown");
co_await coroutine::parallel_for_each(_compaction_state | boost::adaptors::map_values, [] (compaction_state& cs) -> future<> {
if (!cs.gate.is_closed()) {
co_await cs.gate.close();
}
});
reevaluate_postponed_compactions();
co_await std::move(_waiting_reevalution);
_weight_tracker.clear();
Expand Down Expand Up @@ -1201,13 +1220,18 @@ class regular_compaction_task_executor : public compaction_task_executor, public
}

void compaction_manager::submit(table_state& t) {
if (_state != state::enabled || t.is_auto_compaction_disabled_by_user()) {
if (t.is_auto_compaction_disabled_by_user()) {
return;
}

auto gh = start_compaction(t);
if (!gh) {
return;
}

// OK to drop future.
// waited via compaction_task_executor::compaction_done()
(void)perform_compaction<regular_compaction_task_executor>(throw_if_stopping::no, tasks::task_info{}, t).then_wrapped([] (auto f) { f.ignore_ready_future(); });
(void)perform_compaction<regular_compaction_task_executor>(throw_if_stopping::no, tasks::task_info{}, t).then_wrapped([gh = std::move(gh)] (auto f) { f.ignore_ready_future(); });
}

bool compaction_manager::can_perform_regular_compaction(table_state& t) {
Expand Down Expand Up @@ -1411,7 +1435,8 @@ class offstrategy_compaction_task_executor : public compaction_task_executor, pu
}

future<bool> compaction_manager::perform_offstrategy(table_state& t, std::optional<tasks::task_info> info) {
if (_state != state::enabled) {
auto gh = start_compaction(t);
if (!gh) {
co_return false;
}

Expand Down Expand Up @@ -1507,7 +1532,8 @@ template<typename TaskType, typename... Args>
requires std::derived_from<TaskType, compaction_task_executor> &&
std::derived_from<TaskType, compaction_task_impl>
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_task_on_all_files(std::optional<tasks::task_info> info, table_state& t, sstables::compaction_type_options options, owned_ranges_ptr owned_ranges_ptr, get_candidates_func get_func, Args... args) {
if (_state != state::enabled) {
auto gh = start_compaction(t);
if (!gh) {
co_return std::nullopt;
}

Expand Down Expand Up @@ -1602,12 +1628,13 @@ static std::vector<sstables::shared_sstable> get_all_sstables(table_state& t) {
}

future<compaction_manager::compaction_stats_opt> compaction_manager::perform_sstable_scrub_validate_mode(table_state& t, std::optional<tasks::task_info> info) {
if (_state != state::enabled) {
return make_ready_future<compaction_manager::compaction_stats_opt>();
auto gh = start_compaction(t);
if (!gh) {
co_return compaction_stats_opt{};
}
// All sstables must be included, even the ones being compacted, such that everything in table is validated.
auto all_sstables = get_all_sstables(t);
return perform_compaction<validate_sstables_compaction_task_executor>(throw_if_stopping::no, info, &t, info.value_or(tasks::task_info{}).id, std::move(all_sstables));
co_return co_await perform_compaction<validate_sstables_compaction_task_executor>(throw_if_stopping::no, info, &t, info.value_or(tasks::task_info{}).id, std::move(all_sstables));
}

namespace compaction {
Expand Down Expand Up @@ -1936,7 +1963,11 @@ future<> compaction_manager::remove(table_state& t) noexcept {

// Wait for all compaction tasks running under gate to terminate
// and prevent new tasks from entering the gate.
co_await seastar::when_all_succeed(stop_ongoing_compactions("table removal", &t), c_state.gate.close()).discard_result();
if (!c_state.gate.is_closed()) {
auto close_gate = c_state.gate.close();
co_await stop_ongoing_compactions("table removal", &t);
co_await std::move(close_gate);
}

c_state.backlog_tracker.disable();

Expand Down
3 changes: 3 additions & 0 deletions compaction/compaction_manager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ private:
// Requires task->_compaction_state.gate to be held and task to be registered in _tasks.
future<compaction_stats_opt> perform_task(shared_ptr<compaction::compaction_task_executor> task, throw_if_stopping do_throw_if_stopping);

// Return nullopt if compaction cannot be started
std::optional<gate::holder> start_compaction(table_state& t);

// parent_info set to std::nullopt means that task manager should not register this task executor.
// To create a task manager task with no parent, parent_info argument should contain empty task_info.
template<typename TaskExecutor, typename... Args>
Expand Down
31 changes: 19 additions & 12 deletions tasks/task_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

#include <seastar/core/on_internal_error.hh>
#include <seastar/coroutine/parallel_for_each.hh>
#include <seastar/core/abort_source.hh>
#include <seastar/core/gate.hh>

#include "task_manager.hh"
#include "test_module.hh"

Expand All @@ -31,7 +34,7 @@ task_manager::task::impl::impl(module_ptr module, task_id id, uint64_t sequence_
{
// Child tasks do not need to subscribe to abort source because they will be aborted recursively by their parents.
if (!parent_id) {
_shutdown_subscription = module->get_task_manager()._as.subscribe([this] () noexcept {
_shutdown_subscription = module->abort_source().subscribe([this] () noexcept {
(void)abort();
});
}
Expand Down Expand Up @@ -143,7 +146,7 @@ void task_manager::task::impl::finish_failed(std::exception_ptr ex) {
finish_failed(ex, fmt::format("{}", ex));
}

task_manager::task::task(task_impl_ptr&& impl) noexcept : _impl(std::move(impl)) {
task_manager::task::task(task_impl_ptr&& impl, gate::holder gh) noexcept : _impl(std::move(impl)), _gate_holder(std::move(gh)) {
register_task();
}

Expand Down Expand Up @@ -184,13 +187,12 @@ void task_manager::task::start() {
try {
// Background fiber does not capture task ptr, so the task can be unregistered and destroyed independently in the foreground.
// After the ttl expires, the task id will be used to unregister the task if that didn't happen in any other way.
(void)with_gate(_impl->_module->async_gate(), [f = done(), module = _impl->_module, id = id()] () mutable {
return std::move(f).finally([module] {
return sleep_abortable(module->get_task_manager().get_task_ttl(), module->abort_source());
}).then_wrapped([module, id] (auto f) {
f.ignore_ready_future();
module->unregister_task(id);
});
auto module = _impl->_module;
(void)done().finally([module] {
return sleep_abortable(module->get_task_manager().get_task_ttl(), module->abort_source());
}).then_wrapped([module, id = id()] (auto f) {
f.ignore_ready_future();
module->unregister_task(id);
});
_impl->_as.check();
_impl->_status.state = task_manager::task_state::running;
Expand Down Expand Up @@ -248,7 +250,11 @@ bool task_manager::task::is_complete() const noexcept {
return _impl->is_complete();
}

task_manager::module::module(task_manager& tm, std::string name) noexcept : _tm(tm), _name(std::move(name)) {}
task_manager::module::module(task_manager& tm, std::string name) noexcept : _tm(tm), _name(std::move(name)) {
_abort_subscription = _tm.abort_source().subscribe([this] () noexcept {
abort_source().request_abort();
});
}

uint64_t task_manager::module::new_sequence_number() noexcept {
return ++_sequence_number;
Expand All @@ -259,7 +265,7 @@ task_manager& task_manager::module::get_task_manager() noexcept {
}

abort_source& task_manager::module::abort_source() noexcept {
return _tm.abort_source();
return _as;
}

gate& task_manager::module::async_gate() noexcept {
Expand Down Expand Up @@ -295,12 +301,13 @@ void task_manager::module::unregister_task(task_id id) noexcept {

future<> task_manager::module::stop() noexcept {
tmlogger.info("Stopping module {}", _name);
abort_source().request_abort();
co_await _gate.close();
_tm.unregister_module(_name);
}

future<task_manager::task_ptr> task_manager::module::make_task(task::task_impl_ptr task_impl_ptr, task_info parent_d) {
auto task = make_lw_shared<task_manager::task>(std::move(task_impl_ptr));
auto task = make_lw_shared<task_manager::task>(std::move(task_impl_ptr), async_gate().hold());
bool abort = false;
if (parent_d) {
task->get_status().sequence_number = co_await _tm.container().invoke_on(parent_d.shard, [id = parent_d.id, task = make_foreign(task), &abort] (task_manager& tm) mutable {
Expand Down
9 changes: 7 additions & 2 deletions tasks/task_manager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,10 @@ public:
using task_impl_ptr = shared_ptr<impl>;
protected:
task_impl_ptr _impl;
private:
gate::holder _gate_holder;
public:
task(task_impl_ptr&& impl) noexcept;
task(task_impl_ptr&& impl, gate::holder) noexcept;

task_id id();
std::string type() const;
Expand Down Expand Up @@ -174,13 +176,16 @@ public:
task_map _tasks;
gate _gate;
uint64_t _sequence_number = 0;
private:
abort_source _as;
optimized_optional<abort_source::subscription> _abort_subscription;
public:
module(task_manager& tm, std::string name) noexcept;
virtual ~module() = default;

uint64_t new_sequence_number() noexcept;
task_manager& get_task_manager() noexcept;
virtual seastar::abort_source& abort_source() noexcept;
seastar::abort_source& abort_source() noexcept;
gate& async_gate() noexcept;
const std::string& get_name() const noexcept;
task_manager::task_map& get_tasks() noexcept;
Expand Down
11 changes: 0 additions & 11 deletions tasks/test_module.hh
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,8 @@
namespace tasks {

class test_module : public task_manager::module {
private:
seastar::abort_source _as;
public:
test_module(task_manager& tm) noexcept : module(tm, "test") {}

seastar::abort_source& abort_source() noexcept override {
return _as;
}

future<> stop() noexcept override {
_as.request_abort();
co_await task_manager::module::stop();
}
};

class test_task_impl : public task_manager::task::impl {
Expand Down
10 changes: 7 additions & 3 deletions test/boost/sstable_compaction_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4454,12 +4454,14 @@ SEASTAR_TEST_CASE(simple_backlog_controller_test) {
auto as = abort_source();

auto task_manager = tasks::task_manager({}, as);
auto stop_task_manager = deferred_stop(task_manager);
compaction_manager::config cfg = {
.compaction_sched_group = { default_scheduling_group() },
.maintenance_sched_group = { default_scheduling_group() },
.available_memory = available_memory,
};
auto manager = compaction_manager(std::move(cfg), as, task_manager);
auto stop_manager = deferred_stop(manager);

auto add_sstable = [&env] (table_for_tests& t, uint64_t data_size, int level) {
auto sst = env.make_sstable(t.schema());
Expand Down Expand Up @@ -4800,22 +4802,24 @@ SEASTAR_TEST_CASE(check_table_sstable_set_includes_maintenance_sstables) {
}

// Without commit aba475fe1d24d5c, scylla will fail miserably (either with abort or segfault; depends on the version).
SEASTAR_TEST_CASE(compaction_manager_stop_and_drain_race_test) {
SEASTAR_THREAD_TEST_CASE(compaction_manager_stop_and_drain_race_test) {
abort_source as;

auto cfg = compaction_manager::config{ .available_memory = 1 };
auto task_manager = tasks::task_manager({}, as);
auto stop_task_manager = deferred_stop(task_manager);
auto cm = compaction_manager(cfg, as, task_manager);
auto stop_cm = deferred_stop(cm);
cm.enable();

testlog.info("requesting abort");
as.request_abort();

testlog.info("draining compaction manager");
co_await cm.drain();
cm.drain().get();

testlog.info("stopping compaction manager");
co_await cm.stop();
stop_cm.stop_now();
}

SEASTAR_TEST_CASE(test_print_shared_sstables_vector) {
Expand Down

0 comments on commit 5930637

Please sign in to comment.