From 35849fc901cb2d70f6aa18a590c0963ded17d1b7 Mon Sep 17 00:00:00 2001 From: Avi Kivity Date: Wed, 11 Oct 2023 00:32:05 +0300 Subject: [PATCH] Revert "Merge 'Don't calculate hashes for schema versions in Raft mode' from Kamil Braun" This reverts commit 3d4398d1b2b9bc2139718e49a806214171ee081f, reversing changes made to 45dfce66323049941c09fcff9a0a7420f2b02018. The commit causes some schema changes to be lost due to incorrect timestamps in some mutations. More information is available in [1]. Reopens: scylladb/scylladb#7620 Reopens: scylladb/scylladb#13957 Fixes scylladb/scylladb#15530. [1] https://github.com/scylladb/scylladb/pull/15687 --- db/schema_features.hh | 6 +- db/schema_tables.cc | 84 +--- db/schema_tables.hh | 2 +- db/system_keyspace.cc | 17 - db/system_keyspace.hh | 8 +- gms/feature_service.cc | 2 - gms/feature_service.hh | 8 - replica/database.cc | 2 +- service/migration_manager.cc | 78 +--- test/boost/schema_change_test.cc | 34 +- test/boost/schema_registry_test.cc | 2 +- test/pylib/log_browsing.py | 2 +- test/topology_custom/suite.yaml | 2 - .../test_group0_schema_versioning.py | 367 ------------------ 14 files changed, 48 insertions(+), 566 deletions(-) delete mode 100644 test/topology_custom/test_group0_schema_versioning.py diff --git a/db/schema_features.hh b/db/schema_features.hh index 9d98721c5a1a..0c48ad260d92 100644 --- a/db/schema_features.hh +++ b/db/schema_features.hh @@ -28,9 +28,6 @@ enum class schema_feature { // When enabled, schema_mutations::digest() will skip empty mutations (with only tombstones), // so that the digest remains the same after schema tables are compacted. TABLE_DIGEST_INSENSITIVE_TO_EXPIRY, - - // When enabled we'll add a new column to the `system_schema.scylla_tables` table. - GROUP0_SCHEMA_VERSIONING, }; using schema_features = enum_set>; } diff --git a/db/schema_tables.cc b/db/schema_tables.cc index b22faa1ec976..60ae3214976f 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -330,49 +330,31 @@ schema_ptr tables() { // Holds Scylla-specific table metadata. schema_ptr scylla_tables(schema_features features) { - static thread_local schema_ptr schemas[2][2][2]{}; - - bool has_cdc_options = features.contains(schema_feature::CDC_OPTIONS); - bool has_per_table_partitioners = features.contains(schema_feature::PER_TABLE_PARTITIONERS); - bool has_group0_schema_versioning = features.contains(schema_feature::GROUP0_SCHEMA_VERSIONING); - - schema_ptr& s = schemas[has_cdc_options][has_per_table_partitioners][has_group0_schema_versioning]; - if (!s) { + static auto make = [] (bool has_cdc_options, bool has_per_table_partitioners) -> schema_ptr { auto id = generate_legacy_id(NAME, SCYLLA_TABLES); auto sb = schema_builder(NAME, SCYLLA_TABLES, std::make_optional(id)) .with_column("keyspace_name", utf8_type, column_kind::partition_key) .with_column("table_name", utf8_type, column_kind::clustering_key) .with_column("version", uuid_type) .set_gc_grace_seconds(schema_gc_grace); - // Each bit in `offset` denotes a different schema feature, - // so different values of `offset` are used for different combinations of features. - uint16_t offset = 0; + // 0 - false, false + // 1 - true, false + // 2 - false, true + // 3 - true, true + int offset = 0; if (has_cdc_options) { sb.with_column("cdc", map_type_impl::get_instance(utf8_type, utf8_type, false)); - offset |= 0b1; + ++offset; } if (has_per_table_partitioners) { sb.with_column("partitioner", utf8_type); - offset |= 0b10; - } - if (has_group0_schema_versioning) { - // If true, this table's latest schema was committed by group 0. - // In this case `version` column is non-null and will be used for `schema::version()` instead of calculating a hash. - // - // If false, this table's latest schema was committed outside group 0 (e.g. during RECOVERY mode). - // In this case `version` is null and `schema::version()` will be a hash. - // - // If null, this is either a system table, or the latest schema was committed - // before the GROUP0_SCHEMA_VERSIONING feature was enabled (either inside or outside group 0). - // In this case, for non-system tables, `version` is null and `schema::version()` will be a hash. - sb.with_column("committed_by_group0", boolean_type); - offset |= 0b100; + offset += 2; } sb.with_version(system_keyspace::generate_schema_version(id, offset)); - s = sb.build(); - } - - return s; + return sb.build(); + }; + static thread_local schema_ptr schemas[2][2] = { {make(false, false), make(false, true)}, {make(true, false), make(true, true)} }; + return schemas[features.contains(schema_feature::CDC_OPTIONS)][features.contains(schema_feature::PER_TABLE_PARTITIONERS)]; } // The "columns" table lists the definitions of all columns in all tables @@ -968,8 +950,8 @@ static future<> with_merge_lock(noncopyable_function ()> func) { } static -future<> update_schema_version_and_announce(sharded& sys_ks, distributed& proxy, schema_features features, std::optional version_from_group0) { - auto uuid = version_from_group0 ? *version_from_group0 : co_await calculate_schema_digest(proxy, features); +future<> update_schema_version_and_announce(sharded& sys_ks, distributed& proxy, schema_features features) { + auto uuid = co_await calculate_schema_digest(proxy, features); co_await sys_ks.local().update_schema_version(uuid); co_await proxy.local().get_db().invoke_on_all([uuid] (replica::database& db) { db.update_version(uuid); @@ -977,14 +959,6 @@ future<> update_schema_version_and_announce(sharded& sys_ks slogger.info("Schema version changed to {}", uuid); } -static future> get_group0_schema_version(db::system_keyspace& sys_ks) { - auto version = co_await sys_ks.get_scylla_local_param_as("group0_schema_version"); - if (!version) { - co_return std::nullopt; - } - co_return table_schema_version{*version}; -} - /** * Merge remote schema in form of mutations with local and mutate ks/cf metadata objects * (which also involves fs operations on add/drop ks/cf) @@ -999,22 +973,20 @@ future<> merge_schema(sharded& sys_ks, distributed future<> { - return merge_schema(sys_ks, proxy, feat, unfreeze(fmuts), reload); + return merge_schema(sys_ks, proxy, feat, unfreeze(fmuts)); }); co_return; } co_await with_merge_lock([&] () mutable -> future<> { bool flush_schema = proxy.local().get_db().local().get_config().flush_schema_tables_after_modification(); co_await do_merge_schema(proxy, sys_ks, std::move(mutations), flush_schema, reload); - auto version_from_group0 = co_await get_group0_schema_version(sys_ks.local()); - co_await update_schema_version_and_announce(sys_ks, proxy, feat.cluster_schema_features(), version_from_group0); + co_await update_schema_version_and_announce(sys_ks, proxy, feat.cluster_schema_features()); }); } future<> recalculate_schema_version(sharded& sys_ks, distributed& proxy, gms::feature_service& feat) { co_await with_merge_lock([&] () -> future<> { - auto version_from_group0 = co_await get_group0_schema_version(sys_ks.local()); - co_await update_schema_version_and_announce(sys_ks, proxy, feat.cluster_schema_features(), version_from_group0); + co_await update_schema_version_and_announce(sys_ks, proxy, feat.cluster_schema_features()); }); } @@ -1126,27 +1098,14 @@ void feed_hash_for_schema_digest(hasher& h, const mutation& m, schema_features f } } -// Applies deletion of the "version" column to system_schema.scylla_tables mutation rows -// which weren't committed by group 0. -static void maybe_delete_schema_version(mutation& m) { +// Applies deletion of the "version" column to a system_schema.scylla_tables mutation. +static void delete_schema_version(mutation& m) { if (m.column_family_id() != scylla_tables()->id()) { return; } - const column_definition& origin_col = *m.schema()->get_column_definition(to_bytes("committed_by_group0")); const column_definition& version_col = *m.schema()->get_column_definition(to_bytes("version")); for (auto&& row : m.partition().clustered_rows()) { auto&& cells = row.row().cells(); - if (auto&& origin_cell = cells.find_cell(origin_col.id); origin_cell) { - auto&& ac = origin_cell->as_atomic_cell(origin_col); - if (ac.is_live()) { - auto dv = origin_col.type->deserialize(managed_bytes_view(ac.value())); - auto committed_by_group0 = value_cast(dv); - if (committed_by_group0) { - // Don't delete "version" for this entry. - continue; - } - } - } auto&& cell = cells.find_cell(version_col.id); api::timestamp_type t = api::new_timestamp(); if (cell) { @@ -1285,9 +1244,8 @@ static future<> do_merge_schema(distributed& proxy, shar keyspaces.emplace(std::move(keyspace_name)); column_families.emplace(mutation.column_family_id()); // We must force recalculation of schema version after the merge, since the resulting - // schema may be a mix of the old and new schemas, with the exception of entries - // that originate from group 0. - maybe_delete_schema_version(mutation); + // schema may be a mix of the old and new schemas. + delete_schema_version(mutation); } if (reload) { diff --git a/db/schema_tables.hh b/db/schema_tables.hh index b57bb21c8f02..196d7b0e24f3 100644 --- a/db/schema_tables.hh +++ b/db/schema_tables.hh @@ -194,7 +194,7 @@ future read_keyspace_mutation(distributed&, co // Must be called on shard 0. future> hold_merge_lock() noexcept; -future<> merge_schema(sharded& sys_ks, distributed& proxy, gms::feature_service& feat, std::vector mutations, bool reload); +future<> merge_schema(sharded& sys_ks, distributed& proxy, gms::feature_service& feat, std::vector mutations, bool reload = false); // Recalculates the local schema version. // diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index e3a8936f9cdf..74c65b8b3e29 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -2404,23 +2404,6 @@ future system_keyspace::get_group0_history(distributed system_keyspace::get_group0_schema_version() { - auto s = _db.find_schema(db::system_keyspace::NAME, db::system_keyspace::SCYLLA_LOCAL); - - partition_key pk = partition_key::from_singular(*s, "group0_schema_version"); - dht::partition_range pr = dht::partition_range::make_singular(dht::decorate_key(*s, pk)); - - auto rs = co_await replica::query_mutations(_db.container(), s, pr, s->full_slice(), db::no_timeout); - assert(rs); - auto& ps = rs->partitions(); - for (auto& p: ps) { - auto mut = p.mut().unfreeze(s); - co_return std::move(mut); - } - - co_return mutation(s, pk); -} - static constexpr auto GROUP0_UPGRADE_STATE_KEY = "group0_upgrade_state"; future> system_keyspace::load_group0_upgrade_state() { diff --git a/db/system_keyspace.hh b/db/system_keyspace.hh index d7c068fd0bea..a729220af8d2 100644 --- a/db/system_keyspace.hh +++ b/db/system_keyspace.hh @@ -275,6 +275,7 @@ public: future<> set_scylla_local_param(const sstring& key, const sstring& value, bool visible_before_cl_replay); future> get_scylla_local_param(const sstring& key); +private: // Saves the key-value pair into system.scylla_local table. // Pass visible_before_cl_replay = true iff the data should be available before // schema commitlog replay. We do table.flush in this case, so it's rather slow and heavyweight. @@ -283,6 +284,7 @@ public: template future> get_scylla_local_param_as(const sstring& key); +public: static std::vector all_tables(const db::config& cfg); future<> make( locator::effective_replication_map_factory&, @@ -500,13 +502,9 @@ public: utils::UUID state_id, std::optional gc_older_than, std::string_view description); // Obtain the contents of the group 0 history table in mutation form. - // Assumes that the history table exists, i.e. Raft feature is enabled. + // Assumes that the history table exists, i.e. Raft experimental feature is enabled. static future get_group0_history(distributed&); - // If the `group0_schema_version` key in `system.scylla_local` is present (either live or tombstone), - // returns the corresponding mutation. Otherwise returns an empty mutation for that key. - future get_group0_schema_version(); - future<> sstables_registry_create_entry(sstring location, utils::UUID uuid, sstring status, sstables::entry_descriptor desc); future sstables_registry_lookup_entry(sstring location, sstables::generation_type gen); future<> sstables_registry_update_entry_status(sstring location, sstables::generation_type gen, sstring status); diff --git a/gms/feature_service.cc b/gms/feature_service.cc index 1f2b03650f07..75a2f9dfa951 100644 --- a/gms/feature_service.cc +++ b/gms/feature_service.cc @@ -81,7 +81,6 @@ feature_config feature_config_from_db_config(const db::config& cfg, std::set(keyspace_storage_options); f.set_if(aggregate_storage_options); f.set_if(table_digest_insensitive_to_expiry); - f.set_if(group0_schema_versioning); return f; } diff --git a/gms/feature_service.hh b/gms/feature_service.hh index 3df6e92190c3..f753da36a098 100644 --- a/gms/feature_service.hh +++ b/gms/feature_service.hh @@ -130,14 +130,6 @@ public: gms::feature tablets { *this, "TABLETS"sv }; gms::feature uuid_sstable_identifiers { *this, "UUID_SSTABLE_IDENTIFIERS"sv }; gms::feature table_digest_insensitive_to_expiry { *this, "TABLE_DIGEST_INSENSITIVE_TO_EXPIRY"sv }; - // If this feature is enabled, schema versions are persisted by the group 0 command - // that modifies schema instead of being calculated as a digest (hash) by each node separately. - // The feature controls both the 'global' schema version (the one gossiped as application_state::SCHEMA) - // and the per-table schema versions (schema::version()). - // The feature affects non-Raft mode as well (e.g. during RECOVERY), where we send additional - // tombstones and flags to schema tables when performing schema changes, allowing us to - // revert to the digest method when necessary (if we must perform a schema change during RECOVERY). - gms::feature group0_schema_versioning { *this, "GROUP0_SCHEMA_VERSIONING"sv }; // A feature just for use in tests. It must not be advertised unless // the "features_enable_test_feature" injection is enabled. diff --git a/replica/database.cc b/replica/database.cc index d4f3b35a2458..83a8ef65dce7 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -860,7 +860,7 @@ future<> database::parse_system_tables(distributed& prox v_to_add = fixed_v; auto&& keyspace = find_keyspace(v->ks_name()).metadata(); auto mutations = db::schema_tables::make_update_view_mutations(keyspace, view_ptr(v), fixed_v, api::new_timestamp(), true); - co_await db::schema_tables::merge_schema(sys_ks, proxy, _feat, std::move(mutations), false); + co_await db::schema_tables::merge_schema(sys_ks, proxy, _feat, std::move(mutations)); } }); })); diff --git a/service/migration_manager.cc b/service/migration_manager.cc index e289ff7ce656..bad089fafd99 100644 --- a/service/migration_manager.cc +++ b/service/migration_manager.cc @@ -146,17 +146,18 @@ void migration_manager::init_messaging_service() [] (migration_manager& self, const rpc::client_info& cinfo, rpc::optional options) -> future, std::vector>> { const auto cm_retval_supported = options && options->remote_supports_canonical_mutation_retval; - if (!cm_retval_supported) { - // Canonical mutations support was added way back in scylla-3.2 and we don't support - // skipping versions during upgrades (certainly not a 3.2 -> 5.4 upgrade). - on_internal_error(mlogger, "canonical mutations not supported by remote node"); - } auto features = self._feat.cluster_schema_features(); auto& proxy = self._storage_proxy.container(); auto& db = proxy.local().get_db(); auto cm = co_await db::schema_tables::convert_schema_to_mutations(proxy, features); if (options->group0_snapshot_transfer) { + // if `group0_snapshot_transfer` is `true`, the sender must also understand canonical mutations + // (`group0_snapshot_transfer` was added more recently). + if (!cm_retval_supported) { + on_internal_error(mlogger, + "migration request handler: group0 snapshot transfer requested, but canonical mutations not supported"); + } cm.emplace_back(co_await db::system_keyspace::get_group0_history(db)); if (proxy.local().local_db().get_config().check_experimental(db::experimental_features_t::feature::TABLETS)) { for (auto&& m: co_await replica::read_tablet_mutations(db)) { @@ -164,16 +165,13 @@ void migration_manager::init_messaging_service() } } } - - // If the schema we're returning was last modified in group 0 mode, we also need to return - // the persisted schema version so the pulling node uses it instead of calculating a schema digest. - // - // If it was modified in RECOVERY mode, we still need to return the mutation as it may contain a tombstone - // that will force the pulling node to revert to digest calculation instead of using a version that it - // could've persisted earlier. - cm.emplace_back(co_await self._sys_ks.local().get_group0_schema_version()); - - co_return rpc::tuple(std::vector{}, std::move(cm)); + if (cm_retval_supported) { + co_return rpc::tuple(std::vector{}, std::move(cm)); + } + auto fm = boost::copy_range>(cm | boost::adaptors::transformed([&db = db.local()] (const canonical_mutation& cm) { + return cm.to_mutation(db.find_column_family(cm.column_family_id()).schema()); + })); + co_return rpc::tuple(std::move(fm), std::move(cm)); }, std::ref(*this))); _messaging.register_schema_check([this] { return make_ready_future(_storage_proxy.get_db().local().get_version()); @@ -387,7 +385,7 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr return make_exception_future<>(std::make_exception_ptr( std::runtime_error(fmt::format("Error while applying schema mutations: {}", e)))); } - return db::schema_tables::merge_schema(_sys_ks, proxy.container(), _feat, std::move(mutations), false); + return db::schema_tables::merge_schema(_sys_ks, proxy.container(), _feat, std::move(mutations)); } future<> migration_manager::reload_schema() { @@ -413,7 +411,7 @@ future<> migration_manager::merge_schema_from(netw::messaging_service::msg_addr all.emplace_back(std::move(m)); return std::move(all); }).then([this](std::vector schema) { - return db::schema_tables::merge_schema(_sys_ks, _storage_proxy.container(), _feat, std::move(schema), false); + return db::schema_tables::merge_schema(_sys_ks, _storage_proxy.container(), _feat, std::move(schema)); }); } @@ -937,7 +935,7 @@ future<> migration_manager::push_schema_mutation(const gms::inet_address& endpoi future<> migration_manager::announce_with_raft(std::vector schema, group0_guard guard, std::string_view description) { assert(this_shard_id() == 0); auto schema_features = _feat.cluster_schema_features(); - auto adjusted_schema = db::schema_tables::adjust_schema_for_schema_features(std::move(schema), schema_features); + auto adjusted_schema = db::schema_tables::adjust_schema_for_schema_features(schema, schema_features); auto group0_cmd = _group0_client.prepare_command( schema_change{ @@ -949,7 +947,7 @@ future<> migration_manager::announce_with_raft(std::vector schema, gro } future<> migration_manager::announce_without_raft(std::vector schema, group0_guard guard) { - auto f = db::schema_tables::merge_schema(_sys_ks, _storage_proxy.container(), _feat, schema, false); + auto f = db::schema_tables::merge_schema(_sys_ks, _storage_proxy.container(), _feat, schema); try { using namespace std::placeholders; @@ -969,50 +967,8 @@ future<> migration_manager::announce_without_raft(std::vector schema, co_return co_await std::move(f); } -static mutation make_group0_schema_version_mutation(const data_dictionary::database db, const group0_guard& guard) { - auto s = db.find_schema(db::system_keyspace::NAME, db::system_keyspace::SCYLLA_LOCAL); - auto* cdef = s->get_column_definition("value"); - assert(cdef); - - mutation m(s, partition_key::from_singular(*s, "group0_schema_version")); - auto cell = guard.with_raft() - ? atomic_cell::make_live(*cdef->type, guard.write_timestamp(), - cdef->type->decompose(guard.new_group0_state_id().to_sstring())) - : atomic_cell::make_dead(guard.write_timestamp(), gc_clock::now()); - m.set_clustered_cell(clustering_key::make_empty(), *cdef, std::move(cell)); - return m; -} - -// Precondition: GROUP0_SCHEMA_VERSIONING feature is enabled in the cluster. -// -// See the description of this column in db/schema_tables.cc. -static void add_committed_by_group0_flag(std::vector& schema, const group0_guard& guard) { - auto committed_by_group0 = guard.with_raft(); - auto timestamp = guard.write_timestamp(); - - for (auto& mut: schema) { - if (mut.schema()->cf_name() != db::schema_tables::v3::SCYLLA_TABLES) { - continue; - } - - auto& scylla_tables_schema = *mut.schema(); - auto cdef = scylla_tables_schema.get_column_definition("committed_by_group0"); - assert(cdef); - - for (auto& cr: mut.partition().clustered_rows()) { - cr.row().cells().apply(*cdef, atomic_cell::make_live( - *cdef->type, timestamp, cdef->type->decompose(committed_by_group0))); - } - } -} - // Returns a future on the local application of the schema future<> migration_manager::announce(std::vector schema, group0_guard guard, std::string_view description) { - if (_feat.group0_schema_versioning) { - schema.push_back(make_group0_schema_version_mutation(_storage_proxy.data_dictionary(), guard)); - add_committed_by_group0_flag(schema, guard); - } - if (guard.with_raft()) { return announce_with_raft(std::move(schema), std::move(guard), std::move(description)); } else { diff --git a/test/boost/schema_change_test.cc b/test/boost/schema_change_test.cc index a3a286e0255b..c74c89b72e0b 100644 --- a/test/boost/schema_change_test.cc +++ b/test/boost/schema_change_test.cc @@ -31,12 +31,6 @@ #include "cdc/cdc_extension.hh" #include "utils/UUID_gen.hh" -static cql_test_config disable_raft_schema_config() { - cql_test_config c; - c.db_config->consistent_cluster_management(false); - return c; -} - SEASTAR_TEST_CASE(test_new_schema_with_no_structural_change_is_propagated) { return do_with_cql_env([](cql_test_env& e) { return seastar::async([&] { @@ -145,23 +139,9 @@ SEASTAR_TEST_CASE(test_tombstones_are_ignored_in_version_calculation) { auto new_node_version = e.db().local().get_version(); BOOST_REQUIRE_EQUAL(new_table_version, old_table_version); - - // With group 0 schema changes and GROUP0_SCHEMA_VERSIONING, this check wouldn't pass, - // because the version after the first schema change is not a digest, but taken - // to be the version sent in the schema change mutations; in this case, - // `prepare_new_column_family_announcement` took `table_schema->version()` - // - // On the other hand, the second schema change mutations do not contain - // the ususal `system_schema.scylla_tables` mutation (which would contain the version); - // they are 'incomplete' schema mutations created by the above piece of code, - // not by the usual `prepare_..._announcement` functions. This causes - // a digest to be calculated when applying the schema change, and the digest - // will be different than the first version sent. - // - // Hence we use `disable_raft_schema_config()` in this test. BOOST_REQUIRE_EQUAL(new_node_version, old_node_version); }); - }, disable_raft_schema_config()); + }); } SEASTAR_TEST_CASE(test_concurrent_column_addition) { @@ -211,19 +191,9 @@ SEASTAR_TEST_CASE(test_concurrent_column_addition) { BOOST_REQUIRE(new_schema->get_column_definition(to_bytes("v3")) != nullptr); BOOST_REQUIRE(new_schema->version() != old_version); - - // With group 0 schema changes and GROUP0_SCHEMA_VERSIONING, this check wouldn't pass, - // because the version resulting after schema change is not a digest, but taken to be - // the version sent in the schema change mutations; in this case, `make_update_table_mutations` - // takes `s2->version()`. - // - // This is fine with group 0 where all schema changes are linearized, so this scenario - // of merging concurrent schema changes doesn't happen. - // - // Hence we use `disable_raft_schema_config()` in this test. BOOST_REQUIRE(new_schema->version() != s2->version()); }); - }, disable_raft_schema_config()); + }); } SEASTAR_TEST_CASE(test_sort_type_in_update) { diff --git a/test/boost/schema_registry_test.cc b/test/boost/schema_registry_test.cc index 505be0db2eed..862430258b13 100644 --- a/test/boost/schema_registry_test.cc +++ b/test/boost/schema_registry_test.cc @@ -166,7 +166,7 @@ SEASTAR_THREAD_TEST_CASE(test_table_is_attached) { std::vector muts; sm0.copy_to(muts); db::schema_tables::merge_schema(e.get_system_keyspace(), e.get_storage_proxy(), - e.get_feature_service().local(), muts, false).get(); + e.get_feature_service().local(), muts).get(); } // This should attach the table diff --git a/test/pylib/log_browsing.py b/test/pylib/log_browsing.py index 1fa9bf0d9d27..d195892c983b 100644 --- a/test/pylib/log_browsing.py +++ b/test/pylib/log_browsing.py @@ -75,7 +75,7 @@ async def wait_for(self, pattern: str | re.Pattern, from_mark: Optional[int] = N await asyncio.sleep(0.01) async def grep(self, expr: str | re.Pattern, filter_expr: Optional[str | re.Pattern] = None, - from_mark: Optional[int] = None) -> list[tuple[str, re.Match[str]]]: + from_mark: Optional[int] = None) -> list[(str, re.Match[str])]: """ Returns a list of lines matching the regular expression in the Scylla log. The list contains tuples of (line, match), where line is the full line diff --git a/test/topology_custom/suite.yaml b/test/topology_custom/suite.yaml index 4823454d0b6b..762e9e01d6d9 100644 --- a/test/topology_custom/suite.yaml +++ b/test/topology_custom/suite.yaml @@ -10,10 +10,8 @@ skip_in_release: - test_replace_ignore_nodes - test_old_ip_notification_repro - test_different_group0_ids - - test_group0_schema_versioning skip_in_debug: - test_shutdown_hang - test_replace_ignore_nodes - test_old_ip_notification_repro - test_different_group0_ids - - test_group0_schema_versioning diff --git a/test/topology_custom/test_group0_schema_versioning.py b/test/topology_custom/test_group0_schema_versioning.py deleted file mode 100644 index 93aad83db962..000000000000 --- a/test/topology_custom/test_group0_schema_versioning.py +++ /dev/null @@ -1,367 +0,0 @@ -# -# Copyright (C) 2023-present ScyllaDB -# -# SPDX-License-Identifier: AGPL-3.0-or-later -# -import asyncio -import time -import pytest -import logging -import re -from uuid import UUID - -from cassandra.cluster import Session, ConsistencyLevel # type: ignore -from cassandra.query import SimpleStatement # type: ignore -from cassandra.pool import Host # type: ignore - -from test.pylib.manager_client import ManagerClient, ServerInfo -from test.pylib.util import wait_for_cql_and_get_hosts -from test.pylib.log_browsing import ScyllaLogFile -from test.topology.util import wait_for_token_ring_and_group0_consistency, reconnect_driver -from test.topology_raft_disabled.util import delete_raft_data, wait_until_upgrade_finishes, enable_raft_and_restart - - -logger = logging.getLogger(__name__) - - -async def get_local_schema_version(cql: Session, h: Host) -> UUID: - rs = await cql.run_async("select schema_version from system.local where key = 'local'", host=h) - assert(rs) - return rs[0].schema_version - - -async def get_group0_schema_version(cql: Session, h: Host) -> UUID | None: - rs = await cql.run_async("select value from system.scylla_local where key = 'group0_schema_version'", host=h) - if rs: - return UUID(rs[0].value) - return None - - -async def get_scylla_tables_versions(cql: Session, h: Host) -> list[tuple[str, str, UUID | None]]: - rs = await cql.run_async("select keyspace_name, table_name, version from system_schema.scylla_tables", host=h) - return [(r.keyspace_name, r.table_name, r.version) for r in rs] - - -async def get_scylla_tables_version(cql: Session, h: Host, keyspace_name: str, table_name: str) -> UUID | None: - rs = await cql.run_async( - f"select version from system_schema.scylla_tables" - f" where keyspace_name = '{keyspace_name}' and table_name = '{table_name}'", - host=h) - if not rs: - pytest.fail(f"No scylla_tables row found for {keyspace_name}.{table_name}") - return rs[0].version - - -async def verify_local_schema_verions_synced(cql: Session, hs: list[Host]) -> None: - versions = {h: await get_local_schema_version(cql, h) for h in hs} - logger.info(f"system.local schema_versions: {versions}") - h1, v1 = next(iter(versions.items())) - for h, v in versions.items(): - if v != v1: - pytest.fail(f"{h1}'s system.local schema_version {v1} is different than {h}'s version {v}") - - -async def verify_group0_schema_versions_synced(cql: Session, hs: list[Host]) -> None: - versions = {h: await get_group0_schema_version(cql, h) for h in hs} - logger.info(f"system.scylla_local group0_schema_versions: {versions}") - h1, v1 = next(iter(versions.items())) - for h, v in versions.items(): - if v != v1: - pytest.fail(f"{h1}'s system.scylla_local group0_schema_version {v1} is different than {h}'s version {v}") - - -async def verify_scylla_tables_versions_synced(cql: Session, hs: list[Host], ignore_system_tables: bool) -> None: - versions = {h: set(await get_scylla_tables_versions(cql, h)) for h in hs} - logger.info(f"system_schema.scylla_tables: {versions}") - h1, v1 = next(iter(versions.items())) - for h, v in versions.items(): - diff = v.symmetric_difference(v1) - if ignore_system_tables: - diff = {(k, t, v) for k, t, v in diff if k != "system"} - if diff: - pytest.fail(f"{h1}'s system_schema.scylla_tables contents is different than {h}'s, symmetric diff: {diff}") - - -async def verify_table_versions_synced(cql: Session, hs: list[Host], ignore_system_tables: bool = False) -> None: - logger.info("Verifying that versions stored in tables are in sync") - await verify_local_schema_verions_synced(cql, hs) - await verify_group0_schema_versions_synced(cql, hs) - await verify_scylla_tables_versions_synced(cql, hs, ignore_system_tables) - - -async def verify_in_memory_table_versions(srvs: list[ServerInfo], logs: list[ScyllaLogFile], marks: list[int]): - """ - Assumes that `logs` are log files of servers `srvs`, correspondingly in order. - Assumes that `marks` are log markers (obtained by `ScyllaLogFile.mark()`) corresponding to `logs` in order. - Assumes that an 'alter table ks.t ...' statement was performed after obtaining `marks`. - Checks that every server printed the same version in `Altering ks.t...' log message. - """ - logger.info("Verifying that in-memory table schema versions are in sync") - matches = [await log.grep("Altering ks.t.*version=(.*)", from_mark=mark) for log, mark in zip(logs, marks)] - - def get_version(srv: ServerInfo, matches: list[tuple[str, re.Match[str]]]): - if not matches: - pytest.fail(f"Server {srv} didn't log 'Altering' message") - _, match = matches[0] - return UUID(match.group(1)) - - versions = {srv: get_version(srv, m) for srv, m in zip(srvs, matches)} - logger.info(f"In-memory table versions: {versions}") - - s1, v1 = next(iter(versions.items())) - for s, v in versions.items(): - if v != v1: - pytest.fail(f"{s1}'s in-memory table version {v1} is different than {s}'s version {v}") - - -@pytest.mark.asyncio -async def test_schema_versioning_with_recovery(manager: ManagerClient): - """ - Perform schema changes while mixing nodes in RECOVERY mode with nodes in group 0 mode. - Schema changes originating from RECOVERY node use digest-based schema versioning. - Schema changes originating from group 0 nodes use persisted versions committed through group 0. - - Verify that schema versions are in sync after each schema change. - """ - cfg = {'enable_user_defined_functions': False, - 'experimental_features': list[str]()} - logger.info("Booting cluster") - servers = [await manager.server_add(config=cfg) for _ in range(3)] - await wait_for_token_ring_and_group0_consistency(manager, time.time() + 60) - cql = manager.get_cql() - - logger.info("Creating keyspace and table") - await cql.run_async("create keyspace ks with replication = " - "{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") - await cql.run_async("create table ks.t (pk int primary key)") - - logger.info("Waiting for driver") - hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) - - await verify_table_versions_synced(cql, hosts) - ks_t_version = await get_scylla_tables_version(cql, hosts[0], 'ks', 't') - assert ks_t_version - - logs = [await manager.server_open_log(srv.server_id) for srv in servers] - marks = [await log.mark() for log in logs] - - logger.info("Altering table") - await cql.run_async("alter table ks.t with comment = ''") - - await verify_table_versions_synced(cql, hosts) - await verify_in_memory_table_versions(servers, logs, marks) - - new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], 'ks', 't') - assert new_ks_t_version - assert new_ks_t_version != ks_t_version - ks_t_version = new_ks_t_version - - # We still have a group 0 majority, don't do this at home. - srv1 = servers[0] - logger.info(f"Rebooting {srv1} in RECOVERY mode") - h1 = next(h for h in hosts if h.address == srv1.ip_addr) - await cql.run_async("update system.scylla_local set value = 'recovery' where key = 'group0_upgrade_state'", host=h1) - await manager.server_restart(srv1.server_id) - - cql = await reconnect_driver(manager) - logger.info(f"Waiting for driver") - await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) - - await verify_table_versions_synced(cql, hosts) - - # We're doing a schema change on RECOVERY node while we have two nodes running in group 0 mode. - # Don't do this at home. - # - # Now, the two nodes are not doing any schema changes right now, so this doesn't actually break anything: - # the RECOVERY node is operating using the old schema change procedure, which means - # that it pushes the schema mutations to other nodes directly with RPC, modifying - # the group 0 state machine on other two nodes. - # - # There is one problem with this however. If the RECOVERY node considers some other node - # as DOWN, it will silently *not* push the schema change, completing the operation - # "successfully" nevertheless (it will return to the driver without error). - # Usually in this case we rely on eventual convergence of schema through gossip, - # which will not happen here, because the group 0 nodes are not doing schema pulls! - # So we need to make sure that the RECOVERY node sees the other nodes as UP before - # we perform the schema change, so it pushes the mutations to them. - logger.info(f"Waiting until RECOVERY node ({srv1}) sees other servers as UP") - await manager.server_sees_others(srv1.server_id, 2) - - marks = [await log.mark() for log in logs] - logger.info(f"Altering table on RECOVERY node ({srv1})") - await cql.run_async("alter table ks.t with comment = ''", host=h1) - - await verify_table_versions_synced(cql, hosts) - await verify_in_memory_table_versions(servers, logs, marks) - - new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], 'ks', 't') - assert not new_ks_t_version - ks_t_version = new_ks_t_version - - logger.info(f"Stopping {srv1} gracefully") - await manager.server_stop_gracefully(srv1.server_id) - - srv2 = servers[1] - logger.info(f"Waiting until {srv2} sees {srv1} as dead") - await manager.server_not_sees_other_server(srv2.ip_addr, srv1.ip_addr) - - # Now we modify schema through group 0 while the RECOVERY node is dead. - # Don't do this at home. - marks = [await log.mark() for log in logs] - h2 = next(h for h in hosts if h.address == srv2.ip_addr) - logger.info(f"Altering table on group 0 node {srv2}") - await cql.run_async("alter table ks.t with comment = ''", host=h2) - - await manager.server_start(srv1.server_id) - cql = await reconnect_driver(manager) - logger.info(f"Waiting for driver") - await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) - - logger.info(f"Waiting until {srv2} sees {srv1} as UP") - await manager.server_sees_other_server(srv2.ip_addr, srv1.ip_addr) - - # The RECOVERY node will pull schema when it gets a write. - # The other group 0 node will do a barrier so it will also sync schema before the write returns. - logger.info("Forcing schema sync through CL=ALL INSERT") - await cql.run_async(SimpleStatement("insert into ks.t (pk) values (0)", consistency_level=ConsistencyLevel.ALL), - host=h2) - - await verify_table_versions_synced(cql, hosts) - await verify_in_memory_table_versions(servers, logs, marks) - - new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], 'ks', 't') - assert new_ks_t_version - ks_t_version = new_ks_t_version - - srv3 = servers[2] - h3 = next(h for h in hosts if h.address == srv3.ip_addr) - logger.info("Finishing recovery") - for h in [h2, h3]: - await cql.run_async( - "update system.scylla_local set value = 'recovery' where key = 'group0_upgrade_state'", host=h) - await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in [srv2, srv3])) - - cql = await reconnect_driver(manager) - logger.info("Waiting for driver") - await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) - - for h in [h1, h2, h3]: - await delete_raft_data(cql, h) - await cql.run_async("delete from system.scylla_local where key = 'group0_upgrade_state'", host=h) - - logger.info("Restarting servers") - await asyncio.gather(*(manager.server_restart(srv.server_id) for srv in servers)) - - cql = await reconnect_driver(manager) - logger.info("Waiting for driver") - await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) - - logging.info(f"Waiting until upgrade finishes") - for h in [h1, h2, h3]: - await wait_until_upgrade_finishes(cql, h, time.time() + 60) - - await verify_table_versions_synced(cql, hosts) - - for change in [ - "alter table ks.t with comment = ''", - "alter table ks.t add v int", - "alter table ks.t alter v type blob"]: - - marks = [await log.mark() for log in logs] - logger.info(f"Altering table with \"{change}\"") - await cql.run_async(change) - - new_ks_t_version = await get_scylla_tables_version(cql, hosts[0], 'ks', 't') - assert new_ks_t_version - assert new_ks_t_version != ks_t_version - ks_t_version = new_ks_t_version - - await verify_table_versions_synced(cql, hosts) - await verify_in_memory_table_versions(servers, logs, marks) - - await cql.run_async("drop keyspace ks") - -@pytest.mark.asyncio -async def test_upgrade(manager: ManagerClient): - """ - While Raft is disabled, we use digest-based schema versioning. - Once Raft upgrade is complete, we use persisted versions committed through group 0. - """ - cfg = {'enable_user_defined_functions': False, - 'experimental_features': list[str](), - 'consistent_cluster_management': False} - logger.info("Booting cluster") - servers = [await manager.server_add(config=cfg) for _ in range(2)] - cql = manager.get_cql() - - logger.info("Creating keyspace and table") - await cql.run_async("create keyspace ks with replication = " - "{'class': 'NetworkTopologyStrategy', 'replication_factor': 3}") - await cql.run_async("create table ks.t (pk int primary key)") - - logger.info("Waiting for driver") - hosts = await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) - - logger.info(f"Upgrading {servers[0]}") - await enable_raft_and_restart(manager, servers[0]) - cql = await reconnect_driver(manager) - - logger.info("Waiting for driver") - await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) - - # We ignore system tables in this partially upgraded state. - # Indeed, the upgraded node already has some new tables that we only - # create when `consistent_cluster_management` is enabled. - await verify_table_versions_synced(cql, hosts, ignore_system_tables=True) - - logs = [await manager.server_open_log(srv.server_id) for srv in servers] - - marks = [await log.mark() for log in logs] - logger.info("Altering table") - await cql.run_async("alter table ks.t with comment = ''") - - await verify_table_versions_synced(cql, hosts, ignore_system_tables=True) - await verify_in_memory_table_versions(servers, logs, marks) - - # `group0_schema_version` key in system.scylla_local should be absent, - # version column in `system_schema.scylla_tables` for distributed tables should be null. - for h in hosts: - logger.info(f"Checking that `group0_schema_version` is missing on {h}") - assert (await get_group0_schema_version(cql, h)) is None - - for h in hosts: - logger.info(f"Checking that `version` column for `ks.t` is null on {h}") - versions = await get_scylla_tables_versions(cql, h) - for ks, _, v in versions: - if ks in ["system_distributed", "system_distributed_everywhere", "ks"]: - assert v is None - - logging.info(f"Upgrading {servers[1]}") - await enable_raft_and_restart(manager, servers[1]) - cql = await reconnect_driver(manager) - - logger.info("Waiting for driver") - await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) - - logging.info(f"Waiting until Raft upgrade procedure finishes") - await asyncio.gather(*(wait_until_upgrade_finishes(cql, h, time.time() + 60) for h in hosts)) - - marks = [await log.mark() for log in logs] - logger.info("Altering table") - await cql.run_async("alter table ks.t with comment = ''") - - await verify_table_versions_synced(cql, hosts) - await verify_in_memory_table_versions(servers, logs, marks) - - # `group0_schema_version` should be present - # and the version column for `ks.t` should be non-null. - for h in hosts: - logger.info(f"Checking that `group0_schema_version` is set on {h}") - assert (await get_group0_schema_version(cql, h)) is not None - - for h in hosts: - logger.info(f"Checking that `version` column for `ks.t` is set on {h}") - versions = await get_scylla_tables_versions(cql, h) - for ks, _, v in versions: - if ks == "ks": - assert v is not None