Skip to content

Commit

Permalink
Merge 'schema_tables: unfreeze frozen_mutation:s gently' from Avi Kivity
Browse files Browse the repository at this point in the history
With large schemas, unfreezing can stall, especially as it requires
a lot of memory. Switch to a gentle version that will not stall.

As a preparation step, we add unfreeze_gently() for a span of mutations.

Fixes #17841

Closes #17842

* github.com:scylladb/scylladb:
  schema_tables: unfreeze frozen_mutation:s gently
  frozen_mutation: add unfreeze_gently(span<frozen_mutation>)
  • Loading branch information
nyh committed Mar 18, 2024
2 parents fe28aac + 731b5c5 commit 680e37c
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 6 deletions.
12 changes: 6 additions & 6 deletions db/schema_tables.cc
Expand Up @@ -792,7 +792,7 @@ future<table_schema_version> calculate_schema_digest(distributed<service::storag
auto s = db.local().find_schema(NAME, table);
std::vector<mutation> mutations;
for (auto&& p : rs->partitions()) {
auto mut = p.mut().unfreeze(s);
auto mut = co_await p.mut().unfreeze_gently(s);
auto partition_key = value_cast<sstring>(utf8_type->deserialize(mut.key().get_component(*s, 0)));
if (!accept_keyspace(partition_key)) {
continue;
Expand Down Expand Up @@ -838,7 +838,7 @@ future<std::vector<canonical_mutation>> convert_schema_to_mutations(distributed<
auto s = db.local().find_schema(NAME, table);
std::vector<canonical_mutation> results;
for (auto&& p : rs->partitions()) {
auto mut = p.mut().unfreeze(s);
auto mut = co_await p.mut().unfreeze_gently(s);
auto partition_key = value_cast<sstring>(utf8_type->deserialize(mut.key().get_component(*s, 0)));
if (is_system_keyspace(partition_key)) {
continue;
Expand Down Expand Up @@ -890,7 +890,7 @@ future<mutation> query_partition_mutation(service::storage_proxy& proxy,
if (partitions.size() == 0) {
co_return mutation(s, std::move(dk));
} else if (partitions.size() == 1) {
co_return partitions[0].mut().unfreeze(s);
co_return co_await partitions[0].mut().unfreeze_gently(s);
} else {
auto&& ex = std::make_exception_ptr(std::invalid_argument("Results must have at most one partition"));
co_return coroutine::exception(std::move(ex));
Expand Down Expand Up @@ -1001,9 +1001,9 @@ future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service:
{
if (this_shard_id() != 0) {
// mutations must be applied on the owning shard (0).
co_await smp::submit_to(0, [&, fmuts = freeze(mutations)] () mutable -> future<> {
return merge_schema(sys_ks, proxy, feat, unfreeze(fmuts), reload);
});
co_await smp::submit_to(0, coroutine::lambda([&, fmuts = freeze(mutations)] () mutable -> future<> {
co_await merge_schema(sys_ks, proxy, feat, co_await unfreeze_gently(fmuts), reload);
}));
co_return;
}
co_await with_merge_lock([&] () mutable -> future<> {
Expand Down
10 changes: 10 additions & 0 deletions mutation/frozen_mutation.cc
Expand Up @@ -143,6 +143,16 @@ std::vector<mutation> unfreeze(const std::vector<frozen_mutation>& muts) {
}));
}


future<std::vector<mutation>> unfreeze_gently(std::span<frozen_mutation> muts) {
std::vector<mutation> result;
result.reserve(muts.size());
for (auto& fm : muts) {
result.push_back(co_await fm.unfreeze_gently(local_schema_registry().get(fm.schema_version())));
}
co_return result;
}

mutation_partition_view frozen_mutation::partition() const {
return mutation_partition_view::from_view(mutation_view().partition());
}
Expand Down
4 changes: 4 additions & 0 deletions mutation/frozen_mutation.hh
Expand Up @@ -17,6 +17,8 @@
#include "range_tombstone_change_generator.hh"
#include "schema/schema.hh"

#include <span>

class mutation;
class flat_mutation_reader_v2;

Expand Down Expand Up @@ -231,6 +233,8 @@ public:
frozen_mutation freeze(const mutation& m);
std::vector<frozen_mutation> freeze(const std::vector<mutation>&);
std::vector<mutation> unfreeze(const std::vector<frozen_mutation>&);
// Caller is responsible for keeping the argument stable in memory
future<std::vector<mutation>> unfreeze_gently(std::span<frozen_mutation>);

struct frozen_mutation_and_schema {
frozen_mutation fm;
Expand Down

0 comments on commit 680e37c

Please sign in to comment.