Skip to content

Commit

Permalink
table: coroutinize update_effective_replication_map
Browse files Browse the repository at this point in the history
It's better to wait on deregistering the
old main compaction_groups:s in handle_tablet_split_completion
rather than leaving work in the background.
Especially since their respective storage_groups
are being destroyed by handle_tablet_split_completion.

handle_tablet_split_completion keeps a continuation chain
for all non-ready compaction_group stop fibers.
and returns it so that update_effective_replication_map
can await it, leaving no cleanup work in the background.

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
  • Loading branch information
bhalevy committed Mar 6, 2024
1 parent 0a7854e commit f2ff701
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 13 deletions.
4 changes: 2 additions & 2 deletions replica/database.hh
Original file line number Diff line number Diff line change
Expand Up @@ -593,7 +593,7 @@ private:
// Called when coordinator executes tablet splitting, i.e. commit the new tablet map with
// each tablet split into two, so this replica will remap all of its compaction groups
// that were previously split.
void handle_tablet_split_completion(size_t old_tablet_count, const locator::tablet_map& new_tmap);
future<> handle_tablet_split_completion(size_t old_tablet_count, const locator::tablet_map& new_tmap);

sstables::compaction_type_options::split split_compaction_options() const noexcept;

Expand Down Expand Up @@ -846,7 +846,7 @@ public:
void set_schema(schema_ptr);
db::commitlog* commitlog() const;
const locator::effective_replication_map_ptr& get_effective_replication_map() const { return _erm; }
void update_effective_replication_map(locator::effective_replication_map_ptr);
future<> update_effective_replication_map(locator::effective_replication_map_ptr);
[[gnu::always_inline]] bool uses_tablets() const;
future<> cleanup_tablet(database&, db::system_keyspace&, locator::tablet_id);
future<const_mutation_partition_ptr> find_partition(schema_ptr, reader_permit permit, const dht::decorated_key& key) const;
Expand Down
25 changes: 16 additions & 9 deletions replica/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1903,7 +1903,7 @@ locator::table_load_stats table::table_load_stats(std::function<bool(locator::gl
return stats;
}

void table::handle_tablet_split_completion(size_t old_tablet_count, const locator::tablet_map& new_tmap) {
future<> table::handle_tablet_split_completion(size_t old_tablet_count, const locator::tablet_map& new_tmap) {
auto table_id = _schema->id();
storage_group_vector new_storage_groups;
new_storage_groups.resize(new_tmap.tablet_count());
Expand All @@ -1924,6 +1924,8 @@ void table::handle_tablet_split_completion(size_t old_tablet_count, const locato
table_id, new_tmap.tablet_count(), old_tablet_count*split_size));
}

// Stop the released main compaction groups asynchronously
future<> stop_fut = make_ready_future<>();
for (unsigned id = 0; id < _storage_groups.size(); id++) {
auto& sg = _storage_groups[id];
if (!sg) {
Expand All @@ -1934,6 +1936,16 @@ void table::handle_tablet_split_completion(size_t old_tablet_count, const locato
"therefore groups cannot be remapped with the new tablet count.",
id, table_id));
}
// Remove old main groups, they're unused, but they need to be deregistered properly
auto cg_ptr = std::move(sg->main_compaction_group());
auto f = cg_ptr->stop();
if (!f.available() || f.failed()) [[unlikely]] {
stop_fut = stop_fut.then([f = std::move(f), cg_ptr = std::move(cg_ptr)] () mutable {
return std::move(f).handle_exception([cg_ptr = std::move(cg_ptr)] (std::exception_ptr ex) {
tlogger.warn("Failed to stop compaction group: {}. Ignored", std::move(ex));
});
});
}
unsigned first_new_id = id << growth_factor;
auto split_ready_groups = std::move(*sg).split_ready_compaction_groups();
if (split_ready_groups.size() != split_size) {
Expand All @@ -1951,15 +1963,10 @@ void table::handle_tablet_split_completion(size_t old_tablet_count, const locato

auto old_groups = std::exchange(_storage_groups, std::move(new_storage_groups));

// Remove old main groups in background, they're unused, but they need to be deregistered properly
(void) do_with(std::move(old_groups), _async_gate.hold(), [] (storage_group_vector& groups, gate::holder&) {
return do_for_each(groups, [] (std::unique_ptr<storage_group>& sg) {
return sg->main_compaction_group()->stop();
});
});
return stop_fut;
}

void table::update_effective_replication_map(locator::effective_replication_map_ptr erm) {
future<> table::update_effective_replication_map(locator::effective_replication_map_ptr erm) {
auto old_erm = std::exchange(_erm, std::move(erm));

if (uses_tablets()) {
Expand All @@ -1974,7 +1981,7 @@ void table::update_effective_replication_map(locator::effective_replication_map_
if (new_tablet_count > old_tablet_count) {
tlogger.info0("Detected tablet split for table {}.{}, increasing from {} to {} tablets",
_schema->ks_name(), _schema->cf_name(), old_tablet_count, new_tablet_count);
handle_tablet_split_completion(old_tablet_count, _erm->get_token_metadata().tablets().get_tablet_map(table_id));
co_await handle_tablet_split_completion(old_tablet_count, _erm->get_token_metadata().tablets().get_tablet_map(table_id));
}
}
if (old_erm) {
Expand Down
4 changes: 2 additions & 2 deletions service/storage_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2864,7 +2864,7 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt

// Apply changes on all shards
try {
co_await container().invoke_on_all([&] (storage_service& ss) {
co_await container().invoke_on_all([&] (storage_service& ss) -> future<> {
ss._shared_token_metadata.set(std::move(pending_token_metadata_ptr[this_shard_id()]));
auto& db = ss._db.local();

Expand All @@ -2878,7 +2878,7 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
auto& table_erms = pending_table_erms[this_shard_id()];
for (auto it = table_erms.begin(); it != table_erms.end(); ) {
auto& cf = db.find_column_family(it->first);
cf.update_effective_replication_map(std::move(it->second));
co_await cf.update_effective_replication_map(std::move(it->second));
if (cf.uses_tablets()) {
register_tablet_split_candidate(it->first);
}
Expand Down

0 comments on commit f2ff701

Please sign in to comment.