Skip to content

Commit

Permalink
Merge 'Compaction fix stall in perform cleanup' from Asias
Browse files Browse the repository at this point in the history
"
compaction_manager: Avoid stall in perform_cleanup

The following stall was seen during a cleanup operation:

    scylla: Reactor stalled for 16262 ms on shard 4.

    | std::_MakeUniq<locator::tokens_iterator_impl>::__single_object std::make_unique<locator::tokens_iterator_impl, locator::tokens_iterator_impl&>(locator::tokens_iterator_impl&) at /usr/include/fmt/format.h:1158
    |  (inlined by) locator::token_metadata::tokens_iterator::tokens_iterator(locator::token_metadata::tokens_iterator const&) at ./locator/token_metadata.cc:1602
    | locator::simple_strategy::calculate_natural_endpoints(dht::token const&, locator::token_metadata&) const at simple_strategy.cc:?
    |  (inlined by) locator::simple_strategy::calculate_natural_endpoints(dht::token const&, locator::token_metadata&) const at ./locator/simple_strategy.cc:56
    | locator::abstract_replication_strategy::get_ranges(gms::inet_address, locator::token_metadata&) const at /usr/include/fmt/format.h:1158
    | locator::abstract_replication_strategy::get_ranges(gms::inet_address) const at /usr/include/fmt/format.h:1158
    | service::storage_service::get_ranges_for_endpoint(seastar::basic_sstring<char, unsigned int, 15u, true> const&, gms::inet_address const&) const at /usr/include/fmt/format.h:1158
    | service::storage_service::get_local_ranges(seastar::basic_sstring<char, unsigned int, 15u, true> const&) const at /usr/include/fmt/format.h:1158
    |  (inlined by) operator() at ./sstables/compaction_manager.cc:691
    |  (inlined by) _M_invoke at /usr/include/c++/9/bits/std_function.h:286
    | std::function<std::vector<seastar::lw_shared_ptr<sstables::sstable>, std::allocator<seastar::lw_shared_ptr<sstables::sstable> > > (table const&)>::operator()(table const&) const at /usr/include/fmt/format.h:1158
    |  (inlined by) compaction_manager::rewrite_sstables(table*, sstables::compaction_options, std::function<std::vector<seastar::lw_shared_ptr<sstables::sstable>, std::allocator<seastar::lw_shared_ptr<sstables::sstable> > > (table const&)>) at ./sstables/compaction_manager.cc:604
    | compaction_manager::perform_cleanup(table*) at /usr/include/fmt/format.h:1158

To fix, we furturize the function to get sstables. If get_local_ranges()
is called inside a thread, get_local_ranges will yield automatically.

Fixes #6662
"

* asias-compaction_fix_stall_in_perform_cleanup:
  compaction_manager: Avoid stall in perform_cleanup
  compaction_manager: Return exception future in perform_cleanup
  abstract_replication_strategy: Add get_ranges_in_thread
  • Loading branch information
avikivity committed Jul 1, 2020
2 parents 7e9a3b0 + 07e2535 commit c84217a
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 11 deletions.
4 changes: 2 additions & 2 deletions api/storage_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,8 @@ void set_storage_service(http_context& ctx, routes& r) {
for (auto cf : column_families) {
column_families_vec.push_back(&db.find_column_family(keyspace, cf));
}
return parallel_for_each(column_families_vec, [&cm] (column_family* cf) {
return cm.perform_cleanup(cf);
return parallel_for_each(column_families_vec, [&cm, &db] (column_family* cf) {
return cm.perform_cleanup(db, cf);
});
}).then([]{
return make_ready_future<json::json_return_type>(0);
Expand Down
20 changes: 19 additions & 1 deletion locator/abstract_replication_strategy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -168,15 +168,33 @@ insert_token_range_to_sorted_container_while_unwrapping(

dht::token_range_vector
abstract_replication_strategy::get_ranges(inet_address ep) const {
return get_ranges(ep, _token_metadata);
return do_get_ranges(ep, _token_metadata, false);
}

dht::token_range_vector
abstract_replication_strategy::get_ranges_in_thread(inet_address ep) const {
return do_get_ranges(ep, _token_metadata, true);
}

dht::token_range_vector
abstract_replication_strategy::get_ranges(inet_address ep, token_metadata& tm) const {
return do_get_ranges(ep, tm, false);
}

dht::token_range_vector
abstract_replication_strategy::get_ranges_in_thread(inet_address ep, token_metadata& tm) const {
return do_get_ranges(ep, tm, true);
}

dht::token_range_vector
abstract_replication_strategy::do_get_ranges(inet_address ep, token_metadata& tm, bool can_yield) const {
dht::token_range_vector ret;
auto prev_tok = tm.sorted_tokens().back();
for (auto tok : tm.sorted_tokens()) {
for (inet_address a : calculate_natural_endpoints(tok, tm)) {
if (can_yield) {
seastar::thread::maybe_yield();
}
if (a == ep) {
insert_token_range_to_sorted_container_while_unwrapping(prev_tok, tok, ret);
break;
Expand Down
5 changes: 5 additions & 0 deletions locator/abstract_replication_strategy.hh
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,15 @@ public:
// It the analogue of Origin's getAddressRanges().get(endpoint).
// This function is not efficient, and not meant for the fast path.
dht::token_range_vector get_ranges(inet_address ep) const;
dht::token_range_vector get_ranges_in_thread(inet_address ep) const;

// Use the token_metadata provided by the caller instead of _token_metadata
dht::token_range_vector get_ranges(inet_address ep, token_metadata& tm) const;
dht::token_range_vector get_ranges_in_thread(inet_address ep, token_metadata& tm) const;
private:
dht::token_range_vector do_get_ranges(inet_address ep, token_metadata& tm, bool can_yield) const;

public:
// get_primary_ranges() returns the list of "primary ranges" for the given
// endpoint. "Primary ranges" are the ranges that the node is responsible
// for storing replica primarily, which means this is the first node
Expand Down
19 changes: 12 additions & 7 deletions sstables/compaction_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -732,20 +732,25 @@ bool needs_cleanup(const sstables::shared_sstable& sst,
return true;
}

future<> compaction_manager::perform_cleanup(column_family* cf) {
future<> compaction_manager::perform_cleanup(database& db, column_family* cf) {
if (check_for_cleanup(cf)) {
throw std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}",
cf->schema()->ks_name(), cf->schema()->cf_name()));
return make_exception_future<>(std::runtime_error(format("cleanup request failed: there is an ongoing cleanup on {}.{}",
cf->schema()->ks_name(), cf->schema()->cf_name())));
}
return rewrite_sstables(cf, sstables::compaction_options::make_cleanup(), [this] (const table& table) {
auto schema = table.schema();
auto sorted_owned_ranges = service::get_local_storage_service().get_local_ranges(schema->ks_name());
return seastar::async([this, cf, &db] {
auto schema = cf->schema();
auto& rs = db.find_keyspace(schema->ks_name()).get_replication_strategy();
auto sorted_owned_ranges = rs.get_ranges_in_thread(utils::fb_utilities::get_broadcast_address());
auto sstables = std::vector<sstables::shared_sstable>{};
const auto candidates = table.candidates_for_compaction();
const auto candidates = cf->candidates_for_compaction();
std::copy_if(candidates.begin(), candidates.end(), std::back_inserter(sstables), [&sorted_owned_ranges, schema] (const sstables::shared_sstable& sst) {
seastar::thread::maybe_yield();
return sorted_owned_ranges.empty() || needs_cleanup(sst, sorted_owned_ranges, schema);
});
return sstables;
}).then([this, cf] (std::vector<sstables::shared_sstable> sstables) {
return rewrite_sstables(cf, sstables::compaction_options::make_cleanup(),
[sstables = std::move(sstables)] (const table&) { return sstables; });
});
}

Expand Down
2 changes: 1 addition & 1 deletion sstables/compaction_manager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public:
// Cleanup is about discarding keys that are no longer relevant for a
// given sstable, e.g. after node loses part of its token range because
// of a newly added node.
future<> perform_cleanup(column_family* cf);
future<> perform_cleanup(database& db, column_family* cf);

// Submit a column family to be upgraded and wait for its termination.
future<> perform_sstable_upgrade(column_family* cf, bool exclude_current_version);
Expand Down

0 comments on commit c84217a

Please sign in to comment.