Skip to content

Commit

Permalink
Merge 'schema_mutations, migration_manager: Ignore empty partitions i…
Browse files Browse the repository at this point in the history
…n per-table digest' from Tomasz Grabiec

Schema digest is calculated by querying for mutations of all schema
tables, then compacting them so that all tombstones in them are
dropped. However, even if the mutation becomes empty after compaction,
we still feed its partition key. If the same mutations were compacted
prior to the query, because the tombstones expire, we won't get any
mutation at all and won't feed the partition key. So schema digest
will change once an empty partition of some schema table is compacted
away.

Tombstones expire 7 days after schema change which introduces them. If
one of the nodes is restarted after that, it will compute a different
table schema digest on boot. This may cause performance problems. When
sending a request from coordinator to replica, the replica needs
schema_ptr of exact schema version request by the coordinator. If it
doesn't know that version, it will request it from the coordinator and
perform a full schema merge. This adds latency to every such request.
Schema versions which are not referenced are currently kept in cache
for only 1 second, so if request flow has low-enough rate, this
situation results in perpetual schema pulls.

After ae8d2a5 (5.2.0), it is more liekly to
run into this situation, because table creation generates tombstones
for all schema tables relevant to the table, even the ones which
will be otherwise empty for the new table (e.g. computed_columns).

This change inroduces a cluster feature which when enabled will change
digest calculation to be insensitive to expiry by ignoring empty
partitions in digest calculation. When the feature is enabled,
schema_ptrs are reloaded so that the window of discrepancy during
transition is short and no rolling restart is required.

A similar problem was fixed for per-node digest calculation in
c2ba94dc39e4add9db213751295fb17b95e6b962. Per-table digest calculation
was not fixed at that time because we didn't persist enabled features
and they were not enabled early-enough on boot for us to depend on
them in digest calculation. Now they are enabled before non-system
tables are loaded so digest calculation can rely on cluster features.

Fixes #4485.

Manually tested using ccm on cluster upgrade scenarios and node restarts.

Closes #14441

* github.com:scylladb/scylladb:
  test: schema_change_test: Verify digests also with TABLE_DIGEST_INSENSITIVE_TO_EXPIRY enabled
  schema_mutations, migration_manager: Ignore empty partitions in per-table digest
  migration_manager, schema_tables: Implement migration_manager::reload_schema()
  schema_tables: Avoid crashing when table selector has only one kind of tables
  • Loading branch information
avikivity committed Jul 27, 2023
2 parents 8ee6f6e + 1ecd3c1 commit cf81eef
Show file tree
Hide file tree
Showing 14 changed files with 216 additions and 44 deletions.
2 changes: 2 additions & 0 deletions db/config.cc
Expand Up @@ -886,6 +886,8 @@ db::config::config(std::shared_ptr<db::extensions> exts)
, uuid_sstable_identifiers_enabled(this,
"uuid_sstable_identifiers_enabled", liveness::LiveUpdate, value_status::Used, true, "If set to true, each newly created sstable will have a UUID "
"based generation identifier, and such files are not readable by previous Scylla versions.")
, table_digest_insensitive_to_expiry(this, "table_digest_insensitive_to_expiry", liveness::MustRestart, value_status::Used, true,
"When enabled, per-table schema digest calculation ignores empty partitions.")
, enable_dangerous_direct_import_of_cassandra_counters(this, "enable_dangerous_direct_import_of_cassandra_counters", value_status::Used, false, "Only turn this option on if you want to import tables from Cassandra containing counters, and you are SURE that no counters in that table were created in a version earlier than Cassandra 2.1."
" It is not enough to have ever since upgraded to newer versions of Cassandra. If you EVER used a version earlier than 2.1 in the cluster where these SSTables come from, DO NOT TURN ON THIS OPTION! You will corrupt your data. You have been warned.")
, enable_shard_aware_drivers(this, "enable_shard_aware_drivers", value_status::Used, true, "Enable native transport drivers to use connection-per-shard for better performance")
Expand Down
1 change: 1 addition & 0 deletions db/config.hh
Expand Up @@ -358,6 +358,7 @@ public:
named_value<bool> enable_sstables_md_format;
named_value<sstring> sstable_format;
named_value<bool> uuid_sstable_identifiers_enabled;
named_value<bool> table_digest_insensitive_to_expiry;
named_value<bool> enable_dangerous_direct_import_of_cassandra_counters;
named_value<bool> enable_shard_aware_drivers;
named_value<bool> enable_ipv6_dns_lookup;
Expand Down
7 changes: 6 additions & 1 deletion db/schema_features.hh
Expand Up @@ -24,6 +24,10 @@ enum class schema_feature {
PER_TABLE_PARTITIONERS,
SCYLLA_KEYSPACES,
SCYLLA_AGGREGATES,

// When enabled, schema_mutations::digest() will skip empty mutations (with only tombstones),
// so that the digest remains the same after schema tables are compacted.
TABLE_DIGEST_INSENSITIVE_TO_EXPIRY,
};

using schema_features = enum_set<super_enum<schema_feature,
Expand All @@ -33,7 +37,8 @@ using schema_features = enum_set<super_enum<schema_feature,
schema_feature::CDC_OPTIONS,
schema_feature::PER_TABLE_PARTITIONERS,
schema_feature::SCYLLA_KEYSPACES,
schema_feature::SCYLLA_AGGREGATES
schema_feature::SCYLLA_AGGREGATES,
schema_feature::TABLE_DIGEST_INSENSITIVE_TO_EXPIRY
>>;

}
50 changes: 37 additions & 13 deletions db/schema_tables.cc
Expand Up @@ -102,16 +102,18 @@ namespace {
});
}

schema_ctxt::schema_ctxt(const db::config& cfg, std::shared_ptr<data_dictionary::user_types_storage> uts, replica::database* db)
schema_ctxt::schema_ctxt(const db::config& cfg, std::shared_ptr<data_dictionary::user_types_storage> uts,
const gms::feature_service& features, replica::database* db)
: _db(db)
, _features(features)
, _extensions(cfg.extensions())
, _murmur3_partitioner_ignore_msb_bits(cfg.murmur3_partitioner_ignore_msb_bits())
, _schema_registry_grace_period(cfg.schema_registry_grace_period())
, _user_types(std::move(uts))
{}

schema_ctxt::schema_ctxt(replica::database& db)
: schema_ctxt(db.get_config(), db.as_user_types_storage(), &db)
: schema_ctxt(db.get_config(), db.as_user_types_storage(), db.features(), &db)
{}

schema_ctxt::schema_ctxt(distributed<replica::database>& db)
Expand Down Expand Up @@ -152,7 +154,8 @@ static future<> merge_tables_and_views(distributed<service::storage_proxy>& prox
std::map<table_id, schema_mutations>&& tables_before,
std::map<table_id, schema_mutations>&& tables_after,
std::map<table_id, schema_mutations>&& views_before,
std::map<table_id, schema_mutations>&& views_after);
std::map<table_id, schema_mutations>&& views_after,
bool reload);

struct [[nodiscard]] user_types_to_drop final {
seastar::noncopyable_function<future<> ()> drop;
Expand All @@ -165,7 +168,7 @@ static future<user_types_to_drop> merge_types(distributed<service::storage_proxy
static future<> merge_functions(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after);
static future<> merge_aggregates(distributed<service::storage_proxy>& proxy, schema_result before, schema_result after, schema_result scylla_before, schema_result scylla_after);

static future<> do_merge_schema(distributed<service::storage_proxy>&, sharded<db::system_keyspace>& sys_ks, std::vector<mutation>, bool do_flush);
static future<> do_merge_schema(distributed<service::storage_proxy>&, sharded<db::system_keyspace>& sys_ks, std::vector<mutation>, bool do_flush, bool reload);

using computed_columns_map = std::unordered_map<bytes, column_computation_ptr>;
static computed_columns_map get_computed_columns(const schema_mutations& sm);
Expand Down Expand Up @@ -970,7 +973,7 @@ future<> update_schema_version_and_announce(sharded<db::system_keyspace>& sys_ks
* @throws ConfigurationException If one of metadata attributes has invalid value
* @throws IOException If data was corrupted during transportation or failed to apply fs operations
*/
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, gms::feature_service& feat, std::vector<mutation> mutations)
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, gms::feature_service& feat, std::vector<mutation> mutations, bool reload)
{
if (this_shard_id() != 0) {
// mutations must be applied on the owning shard (0).
Expand All @@ -981,7 +984,7 @@ future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service:
}
co_await with_merge_lock([&] () mutable -> future<> {
bool flush_schema = proxy.local().get_db().local().get_config().flush_schema_tables_after_modification();
co_await do_merge_schema(proxy, sys_ks, std::move(mutations), flush_schema);
co_await do_merge_schema(proxy, sys_ks, std::move(mutations), flush_schema, reload);
co_await update_schema_version_and_announce(sys_ks, proxy, feat.cluster_schema_features());
});
}
Expand Down Expand Up @@ -1068,6 +1071,9 @@ read_tables_for_keyspaces(distributed<service::storage_proxy>& proxy, const std:
{
std::map<table_id, schema_mutations> result;
for (auto&& [keyspace_name, sel] : tables_per_keyspace) {
if (!sel.tables.contains(kind)) {
continue;
}
for (auto&& table_name : sel.tables.find(kind)->second) {
auto qn = qualified_name(keyspace_name, table_name);
auto muts = co_await read_table_mutations(proxy, qn, get_table_holder(kind));
Expand Down Expand Up @@ -1220,7 +1226,7 @@ table_selector get_affected_tables(const sstring& keyspace_name, const mutation&
return result;
}

static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, sharded<db::system_keyspace>& sys_ks, std::vector<mutation> mutations, bool do_flush)
static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, sharded<db::system_keyspace>& sys_ks, std::vector<mutation> mutations, bool do_flush, bool reload)
{
slogger.trace("do_merge_schema: {}", mutations);
schema_ptr s = keyspaces();
Expand All @@ -1247,6 +1253,15 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, shar
delete_schema_version(mutation);
}

if (reload) {
for (auto&& ks : proxy.local().get_db().local().get_non_system_keyspaces()) {
keyspaces.emplace(ks);
table_selector sel;
sel.all_in_keyspace = true;
affected_tables[ks] = sel;
}
}

// Resolve sel.all_in_keyspace == true to the actual list of tables and views.
for (auto&& [keyspace_name, sel] : affected_tables) {
if (sel.all_in_keyspace) {
Expand Down Expand Up @@ -1303,7 +1318,7 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, shar
auto types_to_drop = co_await merge_types(proxy, std::move(old_types), std::move(new_types));
co_await merge_tables_and_views(proxy, sys_ks,
std::move(old_column_families), std::move(new_column_families),
std::move(old_views), std::move(new_views));
std::move(old_views), std::move(new_views), reload);
co_await merge_functions(proxy, std::move(old_functions), std::move(new_functions));
co_await merge_aggregates(proxy, std::move(old_aggregates), std::move(new_aggregates), std::move(old_scylla_aggregates), std::move(new_scylla_aggregates));
co_await types_to_drop.drop();
Expand Down Expand Up @@ -1407,6 +1422,7 @@ enum class schema_diff_side {
static schema_diff diff_table_or_view(distributed<service::storage_proxy>& proxy,
std::map<table_id, schema_mutations>&& before,
std::map<table_id, schema_mutations>&& after,
bool reload,
noncopyable_function<schema_ptr (schema_mutations sm, schema_diff_side)> create_schema)
{
schema_diff d;
Expand All @@ -1427,6 +1443,13 @@ static schema_diff diff_table_or_view(distributed<service::storage_proxy>& proxy
slogger.info("Altering {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
d.altered.emplace_back(schema_diff::altered_schema{s_before, s});
}
if (reload) {
for (auto&& key: diff.entries_in_common) {
auto s = create_schema(std::move(after.at(key)), schema_diff_side::right);
slogger.info("Reloading {}.{} id={} version={}", s->ks_name(), s->cf_name(), s->id(), s->version());
d.altered.emplace_back(schema_diff::altered_schema {s, s});
}
}
return d;
}

Expand All @@ -1440,12 +1463,13 @@ static future<> merge_tables_and_views(distributed<service::storage_proxy>& prox
std::map<table_id, schema_mutations>&& tables_before,
std::map<table_id, schema_mutations>&& tables_after,
std::map<table_id, schema_mutations>&& views_before,
std::map<table_id, schema_mutations>&& views_after)
std::map<table_id, schema_mutations>&& views_after,
bool reload)
{
auto tables_diff = diff_table_or_view(proxy, std::move(tables_before), std::move(tables_after), [&] (schema_mutations sm, schema_diff_side) {
auto tables_diff = diff_table_or_view(proxy, std::move(tables_before), std::move(tables_after), reload, [&] (schema_mutations sm, schema_diff_side) {
return create_table_from_mutations(proxy, std::move(sm));
});
auto views_diff = diff_table_or_view(proxy, std::move(views_before), std::move(views_after), [&] (schema_mutations sm, schema_diff_side side) {
auto views_diff = diff_table_or_view(proxy, std::move(views_before), std::move(views_after), reload, [&] (schema_mutations sm, schema_diff_side side) {
// The view schema mutation should be created with reference to the base table schema because we definitely know it by now.
// If we don't do it we are leaving a window where write commands to this schema are illegal.
// There are 3 possibilities:
Expand Down Expand Up @@ -3107,7 +3131,7 @@ schema_ptr create_table_from_mutations(const schema_ctxt& ctxt, schema_mutations
if (version) {
builder.with_version(*version);
} else {
builder.with_version(sm.digest());
builder.with_version(sm.digest(ctxt.features().cluster_schema_features()));
}

if (auto partitioner = sm.partitioner()) {
Expand Down Expand Up @@ -3327,7 +3351,7 @@ view_ptr create_view_from_mutations(const schema_ctxt& ctxt, schema_mutations sm
if (version) {
builder.with_version(*version);
} else {
builder.with_version(sm.digest());
builder.with_version(sm.digest(ctxt.features().cluster_schema_features()));
}

auto base_id = table_id(row.get_nonnull<utils::UUID>("base_table_id"));
Expand Down
13 changes: 10 additions & 3 deletions db/schema_tables.hh
Expand Up @@ -14,6 +14,7 @@
#include "schema/schema_fwd.hh"
#include "schema_features.hh"
#include "utils/hashing.hh"
#include "gms/feature_service.hh"
#include "schema_mutations.hh"
#include "types/map.hh"
#include "query-result-set.hh"
Expand Down Expand Up @@ -66,7 +67,8 @@ class config;

class schema_ctxt {
public:
schema_ctxt(const config&, std::shared_ptr<data_dictionary::user_types_storage> uts, replica::database* = nullptr);
schema_ctxt(const config&, std::shared_ptr<data_dictionary::user_types_storage> uts, const gms::feature_service&,
replica::database* = nullptr);
schema_ctxt(replica::database&);
schema_ctxt(distributed<replica::database>&);
schema_ctxt(distributed<service::storage_proxy>&);
Expand All @@ -87,11 +89,16 @@ public:
return *_user_types;
}

replica::database* get_db() {
const gms::feature_service& features() const {
return _features;
}

replica::database* get_db() const {
return _db;
}
private:
replica::database* _db;
const gms::feature_service& _features;
const db::extensions& _extensions;
const unsigned _murmur3_partitioner_ignore_msb_bits;
const uint32_t _schema_registry_grace_period;
Expand Down Expand Up @@ -189,7 +196,7 @@ future<mutation> read_keyspace_mutation(distributed<service::storage_proxy>&, co
// Must be called on shard 0.
future<semaphore_units<>> hold_merge_lock() noexcept;

future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, gms::feature_service& feat, std::vector<mutation> mutations);
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, gms::feature_service& feat, std::vector<mutation> mutations, bool reload = false);

// Recalculates the local schema version.
//
Expand Down
4 changes: 4 additions & 0 deletions gms/feature_service.cc
Expand Up @@ -92,6 +92,9 @@ feature_config feature_config_from_db_config(const db::config& cfg, std::set<sst
if (!cfg.uuid_sstable_identifiers_enabled()) {
fcfg._disabled_features.insert("UUID_SSTABLE_IDENTIFIERS"s);
}
if (!cfg.table_digest_insensitive_to_expiry()) {
fcfg._disabled_features.insert("TABLE_DIGEST_INSENSITIVE_TO_EXPIRY"s);
}

if (!is_test_only_feature_enabled()) {
fcfg._disabled_features.insert("TEST_ONLY_FEATURE"s);
Expand Down Expand Up @@ -198,6 +201,7 @@ db::schema_features feature_service::cluster_schema_features() const {
f.set_if<db::schema_feature::PER_TABLE_PARTITIONERS>(per_table_partitioners);
f.set_if<db::schema_feature::SCYLLA_KEYSPACES>(keyspace_storage_options);
f.set_if<db::schema_feature::SCYLLA_AGGREGATES>(aggregate_storage_options);
f.set_if<db::schema_feature::TABLE_DIGEST_INSENSITIVE_TO_EXPIRY>(table_digest_insensitive_to_expiry);
return f;
}

Expand Down
1 change: 1 addition & 0 deletions gms/feature_service.hh
Expand Up @@ -118,6 +118,7 @@ public:
gms::feature secondary_indexes_on_static_columns { *this, "SECONDARY_INDEXES_ON_STATIC_COLUMNS"sv };
gms::feature tablets { *this, "TABLETS"sv };
gms::feature uuid_sstable_identifiers { *this, "UUID_SSTABLE_IDENTIFIERS"sv };
gms::feature table_digest_insensitive_to_expiry { *this, "TABLE_DIGEST_INSENSITIVE_TO_EXPIRY"sv };

// A feature just for use in tests. It must not be advertised unless
// the "features_enable_test_feature" injection is enabled.
Expand Down
25 changes: 14 additions & 11 deletions schema_mutations.cc
Expand Up @@ -49,7 +49,7 @@ void schema_mutations::copy_to(std::vector<mutation>& dst) const {
}
}

table_schema_version schema_mutations::digest() const {
table_schema_version schema_mutations::digest(db::schema_features sf) const {
if (_scylla_tables) {
auto rs = query::result_set(*_scylla_tables);
if (!rs.empty()) {
Expand All @@ -62,16 +62,19 @@ table_schema_version schema_mutations::digest() const {
}

md5_hasher h;
db::schema_features sf = db::schema_features::full();

// Disable this feature so that the digest remains compactible with Scylla
// versions prior to this feature.
// This digest affects the table schema version calculation and it's important
// that all nodes arrive at the same table schema version to avoid needless schema version
// pulls. Table schema versions are calculated on boot when we don't yet
// know all the cluster features, so we could get different table versions after reboot
// in an already upgraded cluster.
sf.remove<db::schema_feature::DIGEST_INSENSITIVE_TO_EXPIRY>();

if (!sf.contains<db::schema_feature::TABLE_DIGEST_INSENSITIVE_TO_EXPIRY>()) {
// Disable this feature so that the digest remains compactible with Scylla
// versions prior to this feature.
// This digest affects the table schema version calculation and it's important
// that all nodes arrive at the same table schema version to avoid needless schema version
// pulls. It used to be the case that when table schema versions were calculated on boot we
// didn't yet know all the cluster features, so we could get different table versions after reboot
// in an already upgraded cluster. However, they are now available, and if
// TABLE_DIGEST_INSENSITIVE_TO_EXPIRY is enabled, we can compute with DIGEST_INSENSITIVE_TO_EXPIRY
// enabled.
sf.remove<db::schema_feature::DIGEST_INSENSITIVE_TO_EXPIRY>();
}

db::schema_tables::feed_hash_for_schema_digest(h, _columnfamilies, sf);
db::schema_tables::feed_hash_for_schema_digest(h, _columns, sf);
Expand Down
3 changes: 2 additions & 1 deletion schema_mutations.hh
Expand Up @@ -12,6 +12,7 @@
#include "mutation/mutation.hh"
#include "schema/schema_fwd.hh"
#include "mutation/canonical_mutation.hh"
#include "db/schema_features.hh"

// Commutative representation of table schema
// Equality ignores tombstones.
Expand Down Expand Up @@ -124,7 +125,7 @@ public:

bool is_view() const;

table_schema_version digest() const;
table_schema_version digest(db::schema_features) const;
std::optional<sstring> partitioner() const;

bool operator==(const schema_mutations&) const;
Expand Down
14 changes: 14 additions & 0 deletions service/migration_manager.cc
Expand Up @@ -109,6 +109,14 @@ void migration_manager::init_messaging_service()
_feature_listeners.push_back(_feat.cdc.when_enabled(update_schema));
_feature_listeners.push_back(_feat.per_table_partitioners.when_enabled(update_schema));
_feature_listeners.push_back(_feat.computed_columns.when_enabled(update_schema));

if (!_feat.table_digest_insensitive_to_expiry) {
_feature_listeners.push_back(_feat.table_digest_insensitive_to_expiry.when_enabled([this] {
(void) with_gate(_background_tasks, [this] {
return reload_schema();
});
}));
}
}

_messaging.register_definitions_update([this] (const rpc::client_info& cinfo, std::vector<frozen_mutation> fm, rpc::optional<std::vector<canonical_mutation>> cm) {
Expand Down Expand Up @@ -379,6 +387,12 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr
return db::schema_tables::merge_schema(_sys_ks, proxy.container(), _feat, std::move(mutations));
}

future<> migration_manager::reload_schema() {
mlogger.info("Reloading schema");
std::vector<mutation> mutations;
return db::schema_tables::merge_schema(_sys_ks, _storage_proxy.container(), _feat, std::move(mutations), true);
}

future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr src, const std::vector<frozen_mutation>& mutations)
{
if (_as.abort_requested()) {
Expand Down
1 change: 1 addition & 0 deletions service/migration_manager.hh
Expand Up @@ -112,6 +112,7 @@ public:
// Coalesces requests.
future<> merge_schema_from(netw::msg_addr);
future<> do_merge_schema_from(netw::msg_addr);
future<> reload_schema();

// Merge mutations received from src.
// Keep mutations alive around whole async operation.
Expand Down

0 comments on commit cf81eef

Please sign in to comment.