Skip to content

Commit

Permalink
compaction: do not swallow compaction_stopped_exception for reshape
Browse files Browse the repository at this point in the history
Loop in shard_reshaping_compaction_task_impl::run relies on whether
sstables::compaction_stopped_exception is thrown from run_custom_job.
The exception is swallowed for each type of compaction
in compaction_manager::perform_task.

Rethrow an exception in perfrom task for reshape compaction.

Fixes: #15058.

(cherry picked from commit e0ce711)

Closes #15122
  • Loading branch information
Deexie authored and denesb committed Aug 23, 2023
1 parent 9a414d4 commit 29e6dc8
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 9 deletions.
9 changes: 6 additions & 3 deletions compaction/compaction_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ compaction_manager::task::task(compaction_manager& mgr, compaction::table_state*
, _description(std::move(desc))
{}

future<compaction_manager::compaction_stats_opt> compaction_manager::perform_task(shared_ptr<compaction_manager::task> task) {
future<compaction_manager::compaction_stats_opt> compaction_manager::perform_task(shared_ptr<compaction_manager::task> task, throw_if_stopping do_throw_if_stopping) {
_tasks.push_back(task);
auto unregister_task = defer([this, task] {
_tasks.remove(task);
Expand All @@ -311,6 +311,9 @@ future<compaction_manager::compaction_stats_opt> compaction_manager::perform_tas
co_return res;
} catch (sstables::compaction_stopped_exception& e) {
cmlog.info("{}: stopped, reason: {}", *task, e.what());
if (do_throw_if_stopping) {
throw;
}
} catch (sstables::compaction_aborted_exception& e) {
cmlog.error("{}: aborted, reason: {}", *task, e.what());
_stats.errors++;
Expand Down Expand Up @@ -475,12 +478,12 @@ class compaction_manager::custom_compaction_task : public compaction_manager::ta
}
};

future<> compaction_manager::run_custom_job(compaction::table_state& t, sstables::compaction_type type, const char* desc, noncopyable_function<future<>(sstables::compaction_data&)> job) {
future<> compaction_manager::run_custom_job(compaction::table_state& t, sstables::compaction_type type, const char* desc, noncopyable_function<future<>(sstables::compaction_data&)> job, throw_if_stopping do_throw_if_stopping) {
if (_state != state::enabled) {
return make_ready_future<>();
}

return perform_task(make_shared<custom_compaction_task>(*this, &t, type, desc, std::move(job))).discard_result();
return perform_task(make_shared<custom_compaction_task>(*this, &t, type, desc, std::move(job)), do_throw_if_stopping).discard_result();
}

future<> compaction_manager::update_static_shares(float static_shares) {
Expand Down
8 changes: 4 additions & 4 deletions compaction/compaction_manager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public:
boost::icl::interval_map<dht::token, gc_clock::time_point, boost::icl::partial_absorber, std::less, boost::icl::inplace_max> map;
};

using throw_if_stopping = bool_class<struct throw_if_stopping_tag>;

// Compaction manager provides facilities to submit and track compaction jobs on
// behalf of existing tables.
class compaction_manager {
Expand Down Expand Up @@ -152,8 +154,6 @@ public:
protected:
virtual future<compaction_stats_opt> do_run() = 0;

using throw_if_stopping = bool_class<struct throw_if_stopping_tag>;

state switch_state(state new_state);

future<semaphore_units<named_semaphore_exception_factory>> acquire_semaphore(named_semaphore& sem, size_t units = 1);
Expand Down Expand Up @@ -325,7 +325,7 @@ private:
per_table_history_maps _repair_history_maps;
tombstone_gc_state _tombstone_gc_state;
private:
future<compaction_stats_opt> perform_task(shared_ptr<task>);
future<compaction_stats_opt> perform_task(shared_ptr<task>, throw_if_stopping do_throw_if_stopping = throw_if_stopping::no);

future<> stop_tasks(std::vector<shared_ptr<task>> tasks, sstring reason);
future<> update_throughput(uint32_t value_mbs);
Expand Down Expand Up @@ -470,7 +470,7 @@ public:
// parameter type is the compaction type the operation can most closely be
// associated with, use compaction_type::Compaction, if none apply.
// parameter job is a function that will carry the operation
future<> run_custom_job(compaction::table_state& s, sstables::compaction_type type, const char *desc, noncopyable_function<future<>(sstables::compaction_data&)> job);
future<> run_custom_job(compaction::table_state& s, sstables::compaction_type type, const char *desc, noncopyable_function<future<>(sstables::compaction_data&)> job, throw_if_stopping do_throw_if_stopping);

class compaction_reenabler {
compaction_manager& _cm;
Expand Down
4 changes: 2 additions & 2 deletions sstables/sstable_directory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ future<uint64_t> sstable_directory::reshape(compaction_manager& cm, replica::tab
return collect_output_sstables_from_reshaping(std::move(new_sstables));
});
});
}).then_wrapped([&table] (future<> f) {
}, throw_if_stopping::yes).then_wrapped([&table] (future<> f) {
try {
f.get();
} catch (sstables::compaction_stopped_exception& e) {
Expand Down Expand Up @@ -443,7 +443,7 @@ sstable_directory::reshard(sstable_info_vector shared_info, compaction_manager&
// resharding them.
return when_all_succeed(collect_output_sstables_from_resharding(std::move(result.new_sstables)), remove_input_sstables_from_resharding(std::move(sstlist))).discard_result();
});
});
}, throw_if_stopping::no);
});
});
});
Expand Down

0 comments on commit 29e6dc8

Please sign in to comment.