Skip to content

Commit

Permalink
Merge 'db: config: move consistent-topology-changes out of experiment…
Browse files Browse the repository at this point in the history
…al and make it the default for new clusters' from Patryk Jędrzejczak

We move consistent cluster management out of experimental and
make it the default for new clusters in 6.0. In code, we make the
`consistent-topology-changes` flag unused and assumed to be true.

In 6.0, the topology upgrade procedure will be manual and
voluntary, so some clusters will still be using the gossip-based
topology even though they support the raft-based topology.
Therefore, we need to continue testing the gossip-based topology.
This is possible by using the `force-gossip-topology-changes` flag
introduced in #18284.

Ref #17802

Closes #18285

* github.com:scylladb/scylladb:
  docs: raft.rst: update after removing consistent-topology-changes
  treewide: fix indentation after the previous patch
  db: config: make consistent-topology-changes unused
  test: lib: single_node_cql_env: restart a node in noninitial run_in_thread calls
  test: test_read_required_hosts: run with force-gossip-topology-changes
  storage_service: join_cluster: replace force_gossip_based_join with force-gossip-topology-changes
  storage_service: join_token_ring: fix finish_setup_after_join calls
  • Loading branch information
kbr-scylla committed Apr 26, 2024
2 parents b96f283 + 55b0119 commit d8313dd
Show file tree
Hide file tree
Showing 46 changed files with 209 additions and 292 deletions.
29 changes: 11 additions & 18 deletions cdc/generation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -473,12 +473,11 @@ static std::optional<cdc::generation_id> get_generation_id_for(const gms::inet_a

static future<std::optional<cdc::topology_description>> retrieve_generation_data_v2(
cdc::generation_id_v2 id,
bool raft_experimental_topology,
db::system_keyspace& sys_ks,
db::system_distributed_keyspace& sys_dist_ks) {
auto cdc_gen = co_await sys_dist_ks.read_cdc_generation(id.id);

if (raft_experimental_topology && !cdc_gen) {
if (!cdc_gen) {
// If we entered legacy mode due to recovery, we (or some other node)
// might gossip about a generation that was previously propagated
// through raft. If that's the case, it will sit in
Expand All @@ -491,7 +490,6 @@ static future<std::optional<cdc::topology_description>> retrieve_generation_data

static future<std::optional<cdc::topology_description>> retrieve_generation_data(
cdc::generation_id gen_id,
bool raft_experimental_topology,
db::system_keyspace& sys_ks,
db::system_distributed_keyspace& sys_dist_ks,
db::system_distributed_keyspace::context ctx) {
Expand All @@ -500,14 +498,13 @@ static future<std::optional<cdc::topology_description>> retrieve_generation_data
return sys_dist_ks.read_cdc_topology_description(id, ctx);
},
[&] (const cdc::generation_id_v2& id) {
return retrieve_generation_data_v2(id, raft_experimental_topology, sys_ks, sys_dist_ks);
return retrieve_generation_data_v2(id, sys_ks, sys_dist_ks);
}
), gen_id);
}

static future<> do_update_streams_description(
cdc::generation_id gen_id,
bool raft_experimental_topology,
db::system_keyspace& sys_ks,
db::system_distributed_keyspace& sys_dist_ks,
db::system_distributed_keyspace::context ctx) {
Expand All @@ -518,7 +515,7 @@ static future<> do_update_streams_description(

// We might race with another node also inserting the description, but that's ok. It's an idempotent operation.

auto topo = co_await retrieve_generation_data(gen_id, raft_experimental_topology, sys_ks, sys_dist_ks, ctx);
auto topo = co_await retrieve_generation_data(gen_id, sys_ks, sys_dist_ks, ctx);
if (!topo) {
throw no_generation_data_exception(gen_id);
}
Expand All @@ -537,21 +534,19 @@ static future<> do_update_streams_description(
*/
static future<> update_streams_description(
cdc::generation_id gen_id,
bool raft_experimental_topology,
db::system_keyspace& sys_ks,
shared_ptr<db::system_distributed_keyspace> sys_dist_ks,
noncopyable_function<unsigned()> get_num_token_owners,
abort_source& abort_src) {
try {
co_await do_update_streams_description(gen_id, raft_experimental_topology, sys_ks, *sys_dist_ks, { get_num_token_owners() });
co_await do_update_streams_description(gen_id, sys_ks, *sys_dist_ks, { get_num_token_owners() });
} catch (...) {
cdc_log.warn(
"Could not update CDC description table with generation {}: {}. Will retry in the background.",
gen_id, std::current_exception());

// It is safe to discard this future: we keep system distributed keyspace alive.
(void)(([] (cdc::generation_id gen_id,
bool raft_experimental_topology,
db::system_keyspace& sys_ks,
shared_ptr<db::system_distributed_keyspace> sys_dist_ks,
noncopyable_function<unsigned()> get_num_token_owners,
Expand All @@ -564,15 +559,15 @@ static future<> update_streams_description(
co_return;
}
try {
co_await do_update_streams_description(gen_id, raft_experimental_topology, sys_ks, *sys_dist_ks, { get_num_token_owners() });
co_await do_update_streams_description(gen_id, sys_ks, *sys_dist_ks, { get_num_token_owners() });
co_return;
} catch (...) {
cdc_log.warn(
"Could not update CDC description table with generation {}: {}. Will try again.",
gen_id, std::current_exception());
}
}
})(gen_id, raft_experimental_topology, sys_ks, std::move(sys_dist_ks), std::move(get_num_token_owners), abort_src));
})(gen_id, sys_ks, std::move(sys_dist_ks), std::move(get_num_token_owners), abort_src));
}
}

Expand Down Expand Up @@ -610,7 +605,6 @@ struct time_and_ttl {
*/
static future<std::optional<cdc::generation_id_v1>> rewrite_streams_descriptions(
std::vector<time_and_ttl> times_and_ttls,
bool raft_experimental_topology,
db::system_keyspace& sys_ks,
shared_ptr<db::system_distributed_keyspace> sys_dist_ks,
noncopyable_function<unsigned()> get_num_token_owners,
Expand Down Expand Up @@ -655,7 +649,7 @@ static future<std::optional<cdc::generation_id_v1>> rewrite_streams_descriptions
co_await max_concurrent_for_each(first, tss.end(), 10, [&] (db_clock::time_point ts) -> future<> {
while (true) {
try {
co_return co_await do_update_streams_description(cdc::generation_id_v1{ts}, raft_experimental_topology, sys_ks, *sys_dist_ks, { get_num_token_owners() });
co_return co_await do_update_streams_description(cdc::generation_id_v1{ts}, sys_ks, *sys_dist_ks, { get_num_token_owners() });
} catch (const no_generation_data_exception& e) {
cdc_log.error("Failed to rewrite streams for generation {}: {}. Giving up.", ts, e);
each_success = false;
Expand Down Expand Up @@ -731,7 +725,6 @@ future<> generation_service::maybe_rewrite_streams_descriptions() {
cdc_log.info("Rewriting stream tables in the background...");
auto last_rewritten = co_await rewrite_streams_descriptions(
std::move(times_and_ttls),
_cfg.raft_experimental_topology,
_sys_ks.local(),
_sys_dist_ks.local_shared(),
std::move(get_num_token_owners),
Expand Down Expand Up @@ -907,7 +900,7 @@ future<> generation_service::check_and_repair_cdc_streams() {

std::optional<topology_description> gen;
try {
gen = co_await retrieve_generation_data(*latest, _cfg.raft_experimental_topology, _sys_ks.local(), *sys_dist_ks, { tmptr->count_normal_token_owners() });
gen = co_await retrieve_generation_data(*latest, _sys_ks.local(), *sys_dist_ks, { tmptr->count_normal_token_owners() });
} catch (exceptions::request_timeout_exception& e) {
cdc_log.error("{}: \"{}\". {}.", timeout_msg, e.what(), exception_translating_msg);
throw exceptions::request_execution_exception(exceptions::exception_code::READ_TIMEOUT,
Expand Down Expand Up @@ -1023,7 +1016,7 @@ future<> generation_service::legacy_handle_cdc_generation(std::optional<cdc::gen

if (using_this_gen) {
cdc_log.info("Starting to use generation {}", *gen_id);
co_await update_streams_description(*gen_id, _cfg.raft_experimental_topology, _sys_ks.local(), get_sys_dist_ks(),
co_await update_streams_description(*gen_id, _sys_ks.local(), get_sys_dist_ks(),
[tmptr = _token_metadata.get()] { return tmptr->count_normal_token_owners(); },
_abort_src);
}
Expand All @@ -1040,7 +1033,7 @@ void generation_service::legacy_async_handle_cdc_generation(cdc::generation_id g
bool using_this_gen = co_await svc->legacy_do_handle_cdc_generation_intercept_nonfatal_errors(gen_id);
if (using_this_gen) {
cdc_log.info("Starting to use generation {}", gen_id);
co_await update_streams_description(gen_id, svc->_cfg.raft_experimental_topology, svc->_sys_ks.local(), svc->get_sys_dist_ks(),
co_await update_streams_description(gen_id, svc->_sys_ks.local(), svc->get_sys_dist_ks(),
[tmptr = svc->_token_metadata.get()] { return tmptr->count_normal_token_owners(); },
svc->_abort_src);
}
Expand Down Expand Up @@ -1109,7 +1102,7 @@ future<bool> generation_service::legacy_do_handle_cdc_generation(cdc::generation
assert_shard_zero(__PRETTY_FUNCTION__);

auto sys_dist_ks = get_sys_dist_ks();
auto gen = co_await retrieve_generation_data(gen_id, _cfg.raft_experimental_topology, _sys_ks.local(), *sys_dist_ks, { _token_metadata.get()->count_normal_token_owners() });
auto gen = co_await retrieve_generation_data(gen_id, _sys_ks.local(), *sys_dist_ks, { _token_metadata.get()->count_normal_token_owners() });
if (!gen) {
throw std::runtime_error(format(
"Could not find CDC generation {} in distributed system tables (current time: {}),"
Expand Down
1 change: 0 additions & 1 deletion cdc/generation_service.hh
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public:
unsigned ignore_msb_bits;
std::chrono::milliseconds ring_delay;
bool dont_rewrite_streams = false;
bool raft_experimental_topology = false;
};

private:
Expand Down
1 change: 0 additions & 1 deletion conf/scylla.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ batch_size_fail_threshold_in_kb: 1024
# experimental_features:
# - udf
# - alternator-streams
# - consistent-topology-changes
# - broadcast-tables
# - keyspace-storage-options
# - tablets
Expand Down
2 changes: 1 addition & 1 deletion db/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1345,7 +1345,7 @@ std::map<sstring, db::experimental_features_t::feature> db::experimental_feature
{"cdc", feature::UNUSED},
{"alternator-streams", feature::ALTERNATOR_STREAMS},
{"alternator-ttl", feature::UNUSED },
{"consistent-topology-changes", feature::CONSISTENT_TOPOLOGY_CHANGES},
{"consistent-topology-changes", feature::UNUSED},
{"broadcast-tables", feature::BROADCAST_TABLES},
{"keyspace-storage-options", feature::KEYSPACE_STORAGE_OPTIONS},
{"tablets", feature::TABLETS},
Expand Down
1 change: 0 additions & 1 deletion db/config.hh
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ struct experimental_features_t {
UNUSED,
UDF,
ALTERNATOR_STREAMS,
CONSISTENT_TOPOLOGY_CHANGES,
BROADCAST_TABLES,
KEYSPACE_STORAGE_OPTIONS,
TABLETS,
Expand Down
5 changes: 1 addition & 4 deletions db/schema_tables.cc
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,7 @@ future<> save_system_schema(cql3::query_processor& qp) {
co_await save_system_schema_to_keyspace(qp, schema_tables::NAME);
// #2514 - make sure "system" is written to system_schema.keyspaces.
co_await save_system_schema_to_keyspace(qp, system_keyspace::NAME);
if (qp.db().get_config().check_experimental(
db::experimental_features_t::feature::CONSISTENT_TOPOLOGY_CHANGES)) {
co_await save_system_schema_to_keyspace(qp, system_auth_keyspace::NAME);
}
co_await save_system_schema_to_keyspace(qp, system_auth_keyspace::NAME);
}

namespace v3 {
Expand Down
14 changes: 4 additions & 10 deletions db/system_keyspace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2144,18 +2144,12 @@ std::vector<schema_ptr> system_keyspace::all_tables(const db::config& cfg) {
v3::truncated(),
v3::commitlog_cleanups(),
v3::cdc_local(),
raft(), raft_snapshots(), raft_snapshot_config(), group0_history(), discovery(),
topology(), cdc_generations_v3(), topology_requests(), service_levels_v2(),
});

r.insert(r.end(), {raft(), raft_snapshots(), raft_snapshot_config(), group0_history(), discovery()});

if (cfg.check_experimental(db::experimental_features_t::feature::CONSISTENT_TOPOLOGY_CHANGES)) {
r.insert(r.end(), {topology(), cdc_generations_v3(), topology_requests(), service_levels_v2()});
}

if (cfg.check_experimental(db::experimental_features_t::feature::CONSISTENT_TOPOLOGY_CHANGES)) {
auto auth_tables = db::system_auth_keyspace::all_tables();
std::copy(auth_tables.begin(), auth_tables.end(), std::back_inserter(r));
}
auto auth_tables = db::system_auth_keyspace::all_tables();
std::copy(auth_tables.begin(), auth_tables.end(), std::back_inserter(r));

if (cfg.check_experimental(db::experimental_features_t::feature::BROADCAST_TABLES)) {
r.insert(r.end(), {broadcast_kv_store()});
Expand Down
20 changes: 6 additions & 14 deletions docs/architecture/raft.rst
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ In summary, Raft makes schema changes safe, but it requires that a quorum of nod

.. only:: opensource

Consistent Topology with Raft :label-caution:`Experimental`
Consistent Topology with Raft
-----------------------------------------------------------------

ScyllaDB can use Raft to manage cluster topology. With Raft-managed topology
Expand All @@ -250,19 +250,11 @@ In summary, Raft makes schema changes safe, but it requires that a quorum of nod
be bootstrapped concurrently, which couldn't be done with the old
gossip-based topology.

Support for Raft-managed topology is experimental and must be explicitly
enabled in the ``scylla.yaml`` configuration file by specifying
the ``consistent-topology-changes`` option:

.. code::
experimental_features:
- consistent-topology-changes
As with other experimental features in ScyllaDB, you should not enable this
feature in production clusters due to insufficient stability. The feature
is undergoing backward-incompatible changes that may prevent upgrading
the cluster.
Raft-managed topology is enabled by default in new cluster installations
starting from 6.0. After upgrading from 5.4 to 6.0, the cluster will
continue using the old gossip-based topology. You must perform manual action
to enable the Raft-based topology. See :doc:`the guide for enabling consistent topology changes</upgrade/upgrade-opensource/upgrade-guide-from-5.4-to-6.0/enable-consistent-topology>`
for more details.

.. _raft-handling-failures:

Expand Down
18 changes: 5 additions & 13 deletions gms/feature_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ feature_config feature_config_from_db_config(const db::config& cfg, std::set<sst
if (!cfg.check_experimental(db::experimental_features_t::feature::KEYSPACE_STORAGE_OPTIONS)) {
fcfg._disabled_features.insert("KEYSPACE_STORAGE_OPTIONS"s);
}
if (!cfg.check_experimental(db::experimental_features_t::feature::CONSISTENT_TOPOLOGY_CHANGES)) {
fcfg._disabled_features.insert("SUPPORTS_CONSISTENT_TOPOLOGY_CHANGES"s);
}
if (!cfg.check_experimental(db::experimental_features_t::feature::TABLETS)) {
fcfg._disabled_features.insert("TABLETS"s);
}
Expand Down Expand Up @@ -257,16 +254,11 @@ future<> feature_service::enable_features_on_startup(db::system_keyspace& sys_ks
std::set<sstring> persisted_features;
std::set<sstring> persisted_unsafe_to_disable_features;

bool fall_back_to_legacy = true;
if (_config.use_raft_cluster_features) {
auto topo_features = co_await sys_ks.load_topology_features_state();
if (topo_features) {
persisted_unsafe_to_disable_features = topo_features->calculate_not_yet_enabled_features();
persisted_features = std::move(topo_features->enabled_features);
fall_back_to_legacy = false;
}
}
if (fall_back_to_legacy) {
auto topo_features = co_await sys_ks.load_topology_features_state();
if (topo_features) {
persisted_unsafe_to_disable_features = topo_features->calculate_not_yet_enabled_features();
persisted_features = std::move(topo_features->enabled_features);
} else {
persisted_features = co_await sys_ks.load_local_enabled_features();
}

Expand Down
1 change: 0 additions & 1 deletion gms/feature_service.hh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ class feature_service;
class i_endpoint_state_change_subscriber;

struct feature_config {
bool use_raft_cluster_features = false;
private:
std::set<sstring> _disabled_features;
feature_config();
Expand Down
11 changes: 1 addition & 10 deletions main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -777,11 +777,8 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
throw bad_configuration_error();
}
}
const bool raft_topology_change_enabled =
cfg->check_experimental(db::experimental_features_t::feature::CONSISTENT_TOPOLOGY_CHANGES);

gms::feature_config fcfg = gms::feature_config_from_db_config(*cfg);
fcfg.use_raft_cluster_features = raft_topology_change_enabled;

debug::the_feature_service = &feature_service;
feature_service.start(fcfg).get();
Expand Down Expand Up @@ -1388,11 +1385,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
service::tablet_allocator::config tacfg;
tacfg.initial_tablets_scale = cfg->tablets_initial_scale_factor();
distributed<service::tablet_allocator> tablet_allocator;
if (cfg->check_experimental(db::experimental_features_t::feature::TABLETS) &&
!cfg->check_experimental(db::experimental_features_t::feature::CONSISTENT_TOPOLOGY_CHANGES)) {
startlog.error("Bad configuration: The consistent-topology-changes feature has to be enabled if tablets feature is enabled");
throw bad_configuration_error();
}
tablet_allocator.start(tacfg, std::ref(mm_notifier), std::ref(db)).get();
auto stop_tablet_allocator = defer_verbose_shutdown("tablet allocator", [&tablet_allocator] {
tablet_allocator.stop().get();
Expand Down Expand Up @@ -1636,7 +1628,6 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
cdc_config.ignore_msb_bits = cfg->murmur3_partitioner_ignore_msb_bits();
cdc_config.ring_delay = std::chrono::milliseconds(cfg->ring_delay_ms());
cdc_config.dont_rewrite_streams = cfg->cdc_dont_rewrite_streams();
cdc_config.raft_experimental_topology = raft_topology_change_enabled;
cdc_generation_service.start(std::move(cdc_config), std::ref(gossiper), std::ref(sys_dist_ks), std::ref(sys_ks),
std::ref(stop_signal.as_sharded_abort_source()), std::ref(token_metadata), std::ref(feature_service), std::ref(db),
[&ss] () -> bool { return ss.local().raft_topology_change_enabled(); }).get();
Expand Down Expand Up @@ -1808,7 +1799,7 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
});

// Set up group0 service earlier since it is needed by group0 setup just below
ss.local().set_group0(group0_service, raft_topology_change_enabled);
ss.local().set_group0(group0_service);

const auto generation_number = gms::generation_type(sys_ks.local().increment_and_get_generation().get());

Expand Down
Loading

0 comments on commit d8313dd

Please sign in to comment.