Skip to content

Commit

Permalink
Revert "Merge 'Don't calculate hashes for schema versions in Raft mod…
Browse files Browse the repository at this point in the history
…e' from Kamil Braun"

This reverts commit 3d4398d, reversing
changes made to 45dfce6. The commit
causes some schema changes to be lost due to incorrect timestamps
in some mutations. More information is available in [1].

Reopens: #7620
Reopens: #13957

Fixes #15530.

[1] #15687
  • Loading branch information
avikivity committed Oct 10, 2023
1 parent 05ede7a commit 35849fc
Show file tree
Hide file tree
Showing 14 changed files with 48 additions and 566 deletions.
6 changes: 1 addition & 5 deletions db/schema_features.hh
Expand Up @@ -28,9 +28,6 @@ enum class schema_feature {
// When enabled, schema_mutations::digest() will skip empty mutations (with only tombstones),
// so that the digest remains the same after schema tables are compacted.
TABLE_DIGEST_INSENSITIVE_TO_EXPIRY,

// When enabled we'll add a new column to the `system_schema.scylla_tables` table.
GROUP0_SCHEMA_VERSIONING,
};

using schema_features = enum_set<super_enum<schema_feature,
Expand All @@ -41,8 +38,7 @@ using schema_features = enum_set<super_enum<schema_feature,
schema_feature::PER_TABLE_PARTITIONERS,
schema_feature::SCYLLA_KEYSPACES,
schema_feature::SCYLLA_AGGREGATES,
schema_feature::TABLE_DIGEST_INSENSITIVE_TO_EXPIRY,
schema_feature::GROUP0_SCHEMA_VERSIONING
schema_feature::TABLE_DIGEST_INSENSITIVE_TO_EXPIRY
>>;

}
84 changes: 21 additions & 63 deletions db/schema_tables.cc
Expand Up @@ -330,49 +330,31 @@ schema_ptr tables() {

// Holds Scylla-specific table metadata.
schema_ptr scylla_tables(schema_features features) {
static thread_local schema_ptr schemas[2][2][2]{};

bool has_cdc_options = features.contains(schema_feature::CDC_OPTIONS);
bool has_per_table_partitioners = features.contains(schema_feature::PER_TABLE_PARTITIONERS);
bool has_group0_schema_versioning = features.contains(schema_feature::GROUP0_SCHEMA_VERSIONING);

schema_ptr& s = schemas[has_cdc_options][has_per_table_partitioners][has_group0_schema_versioning];
if (!s) {
static auto make = [] (bool has_cdc_options, bool has_per_table_partitioners) -> schema_ptr {
auto id = generate_legacy_id(NAME, SCYLLA_TABLES);
auto sb = schema_builder(NAME, SCYLLA_TABLES, std::make_optional(id))
.with_column("keyspace_name", utf8_type, column_kind::partition_key)
.with_column("table_name", utf8_type, column_kind::clustering_key)
.with_column("version", uuid_type)
.set_gc_grace_seconds(schema_gc_grace);
// Each bit in `offset` denotes a different schema feature,
// so different values of `offset` are used for different combinations of features.
uint16_t offset = 0;
// 0 - false, false
// 1 - true, false
// 2 - false, true
// 3 - true, true
int offset = 0;
if (has_cdc_options) {
sb.with_column("cdc", map_type_impl::get_instance(utf8_type, utf8_type, false));
offset |= 0b1;
++offset;
}
if (has_per_table_partitioners) {
sb.with_column("partitioner", utf8_type);
offset |= 0b10;
}
if (has_group0_schema_versioning) {
// If true, this table's latest schema was committed by group 0.
// In this case `version` column is non-null and will be used for `schema::version()` instead of calculating a hash.
//
// If false, this table's latest schema was committed outside group 0 (e.g. during RECOVERY mode).
// In this case `version` is null and `schema::version()` will be a hash.
//
// If null, this is either a system table, or the latest schema was committed
// before the GROUP0_SCHEMA_VERSIONING feature was enabled (either inside or outside group 0).
// In this case, for non-system tables, `version` is null and `schema::version()` will be a hash.
sb.with_column("committed_by_group0", boolean_type);
offset |= 0b100;
offset += 2;
}
sb.with_version(system_keyspace::generate_schema_version(id, offset));
s = sb.build();
}

return s;
return sb.build();
};
static thread_local schema_ptr schemas[2][2] = { {make(false, false), make(false, true)}, {make(true, false), make(true, true)} };
return schemas[features.contains(schema_feature::CDC_OPTIONS)][features.contains(schema_feature::PER_TABLE_PARTITIONERS)];
}

// The "columns" table lists the definitions of all columns in all tables
Expand Down Expand Up @@ -968,23 +950,15 @@ static future<> with_merge_lock(noncopyable_function<future<> ()> func) {
}

static
future<> update_schema_version_and_announce(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, schema_features features, std::optional<table_schema_version> version_from_group0) {
auto uuid = version_from_group0 ? *version_from_group0 : co_await calculate_schema_digest(proxy, features);
future<> update_schema_version_and_announce(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, schema_features features) {
auto uuid = co_await calculate_schema_digest(proxy, features);
co_await sys_ks.local().update_schema_version(uuid);
co_await proxy.local().get_db().invoke_on_all([uuid] (replica::database& db) {
db.update_version(uuid);
});
slogger.info("Schema version changed to {}", uuid);
}

static future<std::optional<table_schema_version>> get_group0_schema_version(db::system_keyspace& sys_ks) {
auto version = co_await sys_ks.get_scylla_local_param_as<utils::UUID>("group0_schema_version");
if (!version) {
co_return std::nullopt;
}
co_return table_schema_version{*version};
}

/**
* Merge remote schema in form of mutations with local and mutate ks/cf metadata objects
* (which also involves fs operations on add/drop ks/cf)
Expand All @@ -999,22 +973,20 @@ future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service:
if (this_shard_id() != 0) {
// mutations must be applied on the owning shard (0).
co_await smp::submit_to(0, [&, fmuts = freeze(mutations)] () mutable -> future<> {
return merge_schema(sys_ks, proxy, feat, unfreeze(fmuts), reload);
return merge_schema(sys_ks, proxy, feat, unfreeze(fmuts));
});
co_return;
}
co_await with_merge_lock([&] () mutable -> future<> {
bool flush_schema = proxy.local().get_db().local().get_config().flush_schema_tables_after_modification();
co_await do_merge_schema(proxy, sys_ks, std::move(mutations), flush_schema, reload);
auto version_from_group0 = co_await get_group0_schema_version(sys_ks.local());
co_await update_schema_version_and_announce(sys_ks, proxy, feat.cluster_schema_features(), version_from_group0);
co_await update_schema_version_and_announce(sys_ks, proxy, feat.cluster_schema_features());
});
}

future<> recalculate_schema_version(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, gms::feature_service& feat) {
co_await with_merge_lock([&] () -> future<> {
auto version_from_group0 = co_await get_group0_schema_version(sys_ks.local());
co_await update_schema_version_and_announce(sys_ks, proxy, feat.cluster_schema_features(), version_from_group0);
co_await update_schema_version_and_announce(sys_ks, proxy, feat.cluster_schema_features());
});
}

Expand Down Expand Up @@ -1126,27 +1098,14 @@ void feed_hash_for_schema_digest(hasher& h, const mutation& m, schema_features f
}
}

// Applies deletion of the "version" column to system_schema.scylla_tables mutation rows
// which weren't committed by group 0.
static void maybe_delete_schema_version(mutation& m) {
// Applies deletion of the "version" column to a system_schema.scylla_tables mutation.
static void delete_schema_version(mutation& m) {
if (m.column_family_id() != scylla_tables()->id()) {
return;
}
const column_definition& origin_col = *m.schema()->get_column_definition(to_bytes("committed_by_group0"));
const column_definition& version_col = *m.schema()->get_column_definition(to_bytes("version"));
for (auto&& row : m.partition().clustered_rows()) {
auto&& cells = row.row().cells();
if (auto&& origin_cell = cells.find_cell(origin_col.id); origin_cell) {
auto&& ac = origin_cell->as_atomic_cell(origin_col);
if (ac.is_live()) {
auto dv = origin_col.type->deserialize(managed_bytes_view(ac.value()));
auto committed_by_group0 = value_cast<bool>(dv);
if (committed_by_group0) {
// Don't delete "version" for this entry.
continue;
}
}
}
auto&& cell = cells.find_cell(version_col.id);
api::timestamp_type t = api::new_timestamp();
if (cell) {
Expand Down Expand Up @@ -1285,9 +1244,8 @@ static future<> do_merge_schema(distributed<service::storage_proxy>& proxy, shar
keyspaces.emplace(std::move(keyspace_name));
column_families.emplace(mutation.column_family_id());
// We must force recalculation of schema version after the merge, since the resulting
// schema may be a mix of the old and new schemas, with the exception of entries
// that originate from group 0.
maybe_delete_schema_version(mutation);
// schema may be a mix of the old and new schemas.
delete_schema_version(mutation);
}

if (reload) {
Expand Down
2 changes: 1 addition & 1 deletion db/schema_tables.hh
Expand Up @@ -194,7 +194,7 @@ future<mutation> read_keyspace_mutation(distributed<service::storage_proxy>&, co
// Must be called on shard 0.
future<semaphore_units<>> hold_merge_lock() noexcept;

future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, gms::feature_service& feat, std::vector<mutation> mutations, bool reload);
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, gms::feature_service& feat, std::vector<mutation> mutations, bool reload = false);

// Recalculates the local schema version.
//
Expand Down
17 changes: 0 additions & 17 deletions db/system_keyspace.cc
Expand Up @@ -2404,23 +2404,6 @@ future<mutation> system_keyspace::get_group0_history(distributed<replica::databa
co_return mutation(s, partition_key::from_singular(*s, GROUP0_HISTORY_KEY));
}

future<mutation> system_keyspace::get_group0_schema_version() {
auto s = _db.find_schema(db::system_keyspace::NAME, db::system_keyspace::SCYLLA_LOCAL);

partition_key pk = partition_key::from_singular(*s, "group0_schema_version");
dht::partition_range pr = dht::partition_range::make_singular(dht::decorate_key(*s, pk));

auto rs = co_await replica::query_mutations(_db.container(), s, pr, s->full_slice(), db::no_timeout);
assert(rs);
auto& ps = rs->partitions();
for (auto& p: ps) {
auto mut = p.mut().unfreeze(s);
co_return std::move(mut);
}

co_return mutation(s, pk);
}

static constexpr auto GROUP0_UPGRADE_STATE_KEY = "group0_upgrade_state";

future<std::optional<sstring>> system_keyspace::load_group0_upgrade_state() {
Expand Down
8 changes: 3 additions & 5 deletions db/system_keyspace.hh
Expand Up @@ -275,6 +275,7 @@ public:
future<> set_scylla_local_param(const sstring& key, const sstring& value, bool visible_before_cl_replay);
future<std::optional<sstring>> get_scylla_local_param(const sstring& key);

private:
// Saves the key-value pair into system.scylla_local table.
// Pass visible_before_cl_replay = true iff the data should be available before
// schema commitlog replay. We do table.flush in this case, so it's rather slow and heavyweight.
Expand All @@ -283,6 +284,7 @@ public:
template <typename T>
future<std::optional<T>> get_scylla_local_param_as(const sstring& key);

public:
static std::vector<schema_ptr> all_tables(const db::config& cfg);
future<> make(
locator::effective_replication_map_factory&,
Expand Down Expand Up @@ -500,13 +502,9 @@ public:
utils::UUID state_id, std::optional<gc_clock::duration> gc_older_than, std::string_view description);

// Obtain the contents of the group 0 history table in mutation form.
// Assumes that the history table exists, i.e. Raft feature is enabled.
// Assumes that the history table exists, i.e. Raft experimental feature is enabled.
static future<mutation> get_group0_history(distributed<replica::database>&);

// If the `group0_schema_version` key in `system.scylla_local` is present (either live or tombstone),
// returns the corresponding mutation. Otherwise returns an empty mutation for that key.
future<mutation> get_group0_schema_version();

future<> sstables_registry_create_entry(sstring location, utils::UUID uuid, sstring status, sstables::entry_descriptor desc);
future<utils::UUID> sstables_registry_lookup_entry(sstring location, sstables::generation_type gen);
future<> sstables_registry_update_entry_status(sstring location, sstables::generation_type gen, sstring status);
Expand Down
2 changes: 0 additions & 2 deletions gms/feature_service.cc
Expand Up @@ -81,7 +81,6 @@ feature_config feature_config_from_db_config(const db::config& cfg, std::set<sst
}
if (!cfg.consistent_cluster_management()) {
fcfg._disabled_features.insert("SUPPORTS_RAFT_CLUSTER_MANAGEMENT"s);
fcfg._disabled_features.insert("GROUP0_SCHEMA_VERSIONING"s);
}
if (!cfg.check_experimental(db::experimental_features_t::feature::KEYSPACE_STORAGE_OPTIONS)) {
fcfg._disabled_features.insert("KEYSPACE_STORAGE_OPTIONS"s);
Expand Down Expand Up @@ -202,7 +201,6 @@ db::schema_features feature_service::cluster_schema_features() const {
f.set_if<db::schema_feature::SCYLLA_KEYSPACES>(keyspace_storage_options);
f.set_if<db::schema_feature::SCYLLA_AGGREGATES>(aggregate_storage_options);
f.set_if<db::schema_feature::TABLE_DIGEST_INSENSITIVE_TO_EXPIRY>(table_digest_insensitive_to_expiry);
f.set_if<db::schema_feature::GROUP0_SCHEMA_VERSIONING>(group0_schema_versioning);
return f;
}

Expand Down
8 changes: 0 additions & 8 deletions gms/feature_service.hh
Expand Up @@ -130,14 +130,6 @@ public:
gms::feature tablets { *this, "TABLETS"sv };
gms::feature uuid_sstable_identifiers { *this, "UUID_SSTABLE_IDENTIFIERS"sv };
gms::feature table_digest_insensitive_to_expiry { *this, "TABLE_DIGEST_INSENSITIVE_TO_EXPIRY"sv };
// If this feature is enabled, schema versions are persisted by the group 0 command
// that modifies schema instead of being calculated as a digest (hash) by each node separately.
// The feature controls both the 'global' schema version (the one gossiped as application_state::SCHEMA)
// and the per-table schema versions (schema::version()).
// The feature affects non-Raft mode as well (e.g. during RECOVERY), where we send additional
// tombstones and flags to schema tables when performing schema changes, allowing us to
// revert to the digest method when necessary (if we must perform a schema change during RECOVERY).
gms::feature group0_schema_versioning { *this, "GROUP0_SCHEMA_VERSIONING"sv };

// A feature just for use in tests. It must not be advertised unless
// the "features_enable_test_feature" injection is enabled.
Expand Down
2 changes: 1 addition & 1 deletion replica/database.cc
Expand Up @@ -860,7 +860,7 @@ future<> database::parse_system_tables(distributed<service::storage_proxy>& prox
v_to_add = fixed_v;
auto&& keyspace = find_keyspace(v->ks_name()).metadata();
auto mutations = db::schema_tables::make_update_view_mutations(keyspace, view_ptr(v), fixed_v, api::new_timestamp(), true);
co_await db::schema_tables::merge_schema(sys_ks, proxy, _feat, std::move(mutations), false);
co_await db::schema_tables::merge_schema(sys_ks, proxy, _feat, std::move(mutations));
}
});
}));
Expand Down

0 comments on commit 35849fc

Please sign in to comment.