Skip to content

Commit

Permalink
Merge 'tablets: alter keyspace' from Piotr Smaron
Browse files Browse the repository at this point in the history
This change supports changing replication factor in tablets-enabled keyspaces.
This covers both increasing and decreasing the number of tablets replicas through
first building topology mutations (`alter_keyspace_statement.cc`) and then
tablets/topology/schema mutations (`topology_coordinator.cc`).
For the limitations of the current solution, please see the docs changes attached to this PR.

Fixes: #16129

Closes #16723

* github.com:scylladb/scylladb:
  test: Do not check tablets mutations on nodes that don't have them
  test: Fix the way tablets RF-change test parses mutation_fragments
  test/tablets: Unmark RF-changing test with xfail
  docs: document ALTER KEYSPACE with tablets
  Return response only when tablets are reallocated
  cql-pytest: Verify RF is changes by at most 1 when tablets on
  cql3/alter_keyspace_statement: Do not allow for change of RF by more than 1
  Reject ALTER with 'replication_factor' tag
  Implement ALTER tablets KEYSPACE statement support
  Parameterize migration_manager::announce by type to allow executing different raft commands
  Introduce TABLET_KEYSPACE event to differentiate processing path of a vnode vs tablets ks
  Extend system.topology with 3 new columns to store data required to process alter ks global topo req
  Allow query_processor to check if global topo queue is empty
  Introduce new global topo `keyspace_rf_change` req
  New raft cmd for both schema & topo changes
  Add storage service to query processor
  tablets: tests for adding/removing replicas
  tablet_allocator: make load_balancer_stats_manager configurable by name
  • Loading branch information
xemul committed May 29, 2024
2 parents 1b1bc6f + 66f6001 commit e74a4b0
Show file tree
Hide file tree
Showing 40 changed files with 852 additions and 70 deletions.
35 changes: 28 additions & 7 deletions cql3/query_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
#include <seastar/coroutine/parallel_for_each.hh>

#include "service/storage_proxy.hh"
#include "service/topology_mutation.hh"
#include "service/migration_manager.hh"
#include "service/forward_service.hh"
#include "service/raft/raft_group0_client.hh"
#include "service/storage_service.hh"
#include "cql3/CqlParser.hpp"
#include "cql3/statements/batch_statement.hh"
#include "cql3/statements/modification_statement.hh"
Expand All @@ -42,16 +44,22 @@ const sstring query_processor::CQL_VERSION = "3.3.1";
const std::chrono::minutes prepared_statements_cache::entry_expiry = std::chrono::minutes(60);

struct query_processor::remote {
remote(service::migration_manager& mm, service::forward_service& fwd, service::raft_group0_client& group0_client)
: mm(mm), forwarder(fwd), group0_client(group0_client) {}
remote(service::migration_manager& mm, service::forward_service& fwd,
service::storage_service& ss, service::raft_group0_client& group0_client)
: mm(mm), forwarder(fwd), ss(ss), group0_client(group0_client) {}

service::migration_manager& mm;
service::forward_service& forwarder;
service::storage_service& ss;
service::raft_group0_client& group0_client;

seastar::gate gate;
};

bool query_processor::topology_global_queue_empty() {
return remote().first.get().ss.topology_global_queue_empty();
}

static service::query_state query_state_for_internal_call() {
return {service::client_state::for_internal_calls(), empty_service_permit()};
}
Expand Down Expand Up @@ -498,8 +506,8 @@ query_processor::~query_processor() {
}

void query_processor::start_remote(service::migration_manager& mm, service::forward_service& forwarder,
service::raft_group0_client& group0_client) {
_remote = std::make_unique<struct remote>(mm, forwarder, group0_client);
service::storage_service& ss, service::raft_group0_client& group0_client) {
_remote = std::make_unique<struct remote>(mm, forwarder, ss, group0_client);
}

future<> query_processor::stop_remote() {
Expand Down Expand Up @@ -1018,16 +1026,29 @@ query_processor::execute_schema_statement(const statements::schema_altering_stat

cql3::cql_warnings_vec warnings;

auto request_id = guard->new_group0_state_id();
stmt.global_req_id = request_id;

auto [ret, m, cql_warnings] = co_await stmt.prepare_schema_mutations(*this, options, guard->write_timestamp());
warnings = std::move(cql_warnings);

ce = std::move(ret);
if (!m.empty()) {
auto description = format("CQL DDL statement: \"{}\"", stmt.raw_cql_statement);
co_await remote_.get().mm.announce(std::move(m), std::move(*guard), description);
if (ce && ce->target == cql_transport::event::schema_change::target_type::TABLET_KEYSPACE) {
co_await remote_.get().mm.announce<service::topology_change>(std::move(m), std::move(*guard), description);
// TODO: eliminate timeout from alter ks statement on the cqlsh/driver side
auto error = co_await remote_.get().ss.wait_for_topology_request_completion(request_id);
co_await remote_.get().ss.wait_for_topology_not_busy();
if (!error.empty()) {
log.error("CQL statement \"{}\" with topology request_id \"{}\" failed with error: \"{}\"", stmt.raw_cql_statement, request_id, error);
throw exceptions::request_execution_exception(exceptions::exception_code::INVALID, error);
}
} else {
co_await remote_.get().mm.announce<service::schema_change>(std::move(m), std::move(*guard), description);
}
}

ce = std::move(ret);

// If an IF [NOT] EXISTS clause was used, this may not result in an actual schema change. To avoid doing
// extra work in the drivers to handle schema changes, we return an empty message in this case. (CASSANDRA-7600)
::shared_ptr<messages::result_message> result;
Expand Down
5 changes: 4 additions & 1 deletion cql3/query_processor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ public:

~query_processor();

void start_remote(service::migration_manager&, service::forward_service&, service::raft_group0_client&);
void start_remote(service::migration_manager&, service::forward_service&,
service::storage_service& ss, service::raft_group0_client&);
future<> stop_remote();

data_dictionary::database db() {
Expand Down Expand Up @@ -460,6 +461,8 @@ public:

void reset_cache();

bool topology_global_queue_empty();

private:
// Keep the holder until you stop using the `remote` services.
std::pair<std::reference_wrapper<remote>, gate::holder> remote();
Expand Down
87 changes: 80 additions & 7 deletions cql3/statements/alter_keyspace_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
*/

#include <boost/range/algorithm.hpp>
#include <fmt/format.h>
#include <seastar/core/coroutine.hh>
#include <stdexcept>
#include "alter_keyspace_statement.hh"
#include "prepared_statement.hh"
#include "service/migration_manager.hh"
#include "service/storage_proxy.hh"
#include "service/topology_mutation.hh"
#include "db/system_keyspace.hh"
#include "data_dictionary/data_dictionary.hh"
#include "data_dictionary/keyspace_metadata.hh"
Expand All @@ -21,6 +25,8 @@
#include "create_keyspace_statement.hh"
#include "gms/feature_service.hh"

static logging::logger mylogger("alter_keyspace");

bool is_system_keyspace(std::string_view keyspace);

cql3::statements::alter_keyspace_statement::alter_keyspace_statement(sstring name, ::shared_ptr<ks_prop_defs> attrs)
Expand All @@ -36,6 +42,20 @@ future<> cql3::statements::alter_keyspace_statement::check_access(query_processo
return state.has_keyspace_access(_name, auth::permission::ALTER);
}

static bool validate_rf_difference(const std::string_view curr_rf, const std::string_view new_rf) {
auto to_number = [] (const std::string_view rf) {
int result;
// We assume the passed string view represents a valid decimal number,
// so we don't need the error code.
(void) std::from_chars(rf.begin(), rf.end(), result);
return result;
};

// We want to ensure that each DC's RF is going to change by at most 1
// because in that case the old and new quorums must overlap.
return std::abs(to_number(curr_rf) - to_number(new_rf)) <= 1;
}

void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, const service::client_state& state) const {
auto tmp = _name;
std::transform(tmp.begin(), tmp.end(), tmp.begin(), ::tolower);
Expand All @@ -61,6 +81,17 @@ void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, c
}

auto new_ks = _attrs->as_ks_metadata_update(ks.metadata(), *qp.proxy().get_token_metadata_ptr(), qp.proxy().features());

if (ks.get_replication_strategy().uses_tablets()) {
const std::map<sstring, sstring>& current_rfs = ks.metadata()->strategy_options();
for (const auto& [new_dc, new_rf] : _attrs->get_replication_options()) {
auto it = current_rfs.find(new_dc);
if (it != current_rfs.end() && !validate_rf_difference(it->second, new_rf)) {
throw exceptions::invalid_request_exception("Cannot modify replication factor of any DC by more than 1 at a time.");
}
}
}

locator::replication_strategy_params params(new_ks->strategy_options(), new_ks->initial_tablets());
auto new_rs = locator::abstract_replication_strategy::create_replication_strategy(new_ks->strategy_name(), params);
if (new_rs->is_per_table() != ks.get_replication_strategy().is_per_table()) {
Expand All @@ -83,20 +114,63 @@ void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, c

future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>
cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_processor& qp, const query_options&, api::timestamp_type ts) const {
using namespace cql_transport;
try {
auto old_ksm = qp.db().find_keyspace(_name).metadata();
event::schema_change::target_type target_type = event::schema_change::target_type::KEYSPACE;
auto ks = qp.db().find_keyspace(_name);
auto ks_md = ks.metadata();
const auto& tm = *qp.proxy().get_token_metadata_ptr();
const auto& feat = qp.proxy().features();
auto ks_md_update = _attrs->as_ks_metadata_update(ks_md, tm, feat);
std::vector<mutation> muts;
std::vector<sstring> warnings;
auto ks_options = _attrs->get_all_options_flattened(feat);

// we only want to run the tablets path if there are actually any tablets changes, not only schema changes
if (ks.get_replication_strategy().uses_tablets() && !_attrs->get_replication_options().empty()) {
if (!qp.topology_global_queue_empty()) {
return make_exception_future<std::tuple<::shared_ptr<::cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>(
exceptions::invalid_request_exception("Another global topology request is ongoing, please retry."));
}
if (_attrs->get_replication_options().contains(ks_prop_defs::REPLICATION_FACTOR_KEY)) {
return make_exception_future<std::tuple<::shared_ptr<::cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>(
exceptions::invalid_request_exception("'replication_factor' tag is not allowed when executing ALTER KEYSPACE with tablets, please list the DCs explicitly"));
}
qp.db().real_database().validate_keyspace_update(*ks_md_update);

service::topology_mutation_builder builder(ts);
builder.set_global_topology_request(service::global_topology_request::keyspace_rf_change);
builder.set_global_topology_request_id(this->global_req_id);
builder.set_new_keyspace_rf_change_data(_name, ks_options);
service::topology_change change{{builder.build()}};

auto topo_schema = qp.db().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
boost::transform(change.mutations, std::back_inserter(muts), [topo_schema] (const canonical_mutation& cm) {
return cm.to_mutation(topo_schema);
});

service::topology_request_tracking_mutation_builder rtbuilder{utils::UUID{this->global_req_id}};
rtbuilder.set("done", false)
.set("start_time", db_clock::now());
service::topology_change req_change{{rtbuilder.build()}};

auto topo_req_schema = qp.db().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY_REQUESTS);
boost::transform(req_change.mutations, std::back_inserter(muts), [topo_req_schema] (const canonical_mutation& cm) {
return cm.to_mutation(topo_req_schema);
});

target_type = event::schema_change::target_type::TABLET_KEYSPACE;
} else {
auto schema_mutations = service::prepare_keyspace_update_announcement(qp.db().real_database(), ks_md_update, ts);
muts.insert(muts.begin(), schema_mutations.begin(), schema_mutations.end());
}

auto m = service::prepare_keyspace_update_announcement(qp.db().real_database(), _attrs->as_ks_metadata_update(old_ksm, tm, feat), ts);

using namespace cql_transport;
auto ret = ::make_shared<event::schema_change>(
event::schema_change::change_type::UPDATED,
event::schema_change::target_type::KEYSPACE,
target_type,
keyspace());

return make_ready_future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>(std::make_tuple(std::move(ret), std::move(m), std::vector<sstring>()));
return make_ready_future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>(std::make_tuple(std::move(ret), std::move(muts), warnings));
} catch (data_dictionary::no_such_keyspace& e) {
return make_exception_future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>(exceptions::invalid_request_exception("Unknown keyspace " + _name));
}
Expand All @@ -107,7 +181,6 @@ cql3::statements::alter_keyspace_statement::prepare(data_dictionary::database db
return std::make_unique<prepared_statement>(make_shared<alter_keyspace_statement>(*this));
}

static logging::logger mylogger("alter_keyspace");

future<::shared_ptr<cql_transport::messages::result_message>>
cql3::statements::alter_keyspace_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
Expand Down
49 changes: 49 additions & 0 deletions cql3/statements/ks_prop_defs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,35 @@ static std::map<sstring, sstring> prepare_options(
return options;
}

ks_prop_defs::ks_prop_defs(std::map<sstring, sstring> options) {
std::map<sstring, sstring> replication_opts, storage_opts, tablets_opts, durable_writes_opts;

auto read_property_into = [] (auto& map, const sstring& name, const sstring& value, const sstring& tag) {
map[name.substr(sstring(tag).size() + 1)] = value;
};

for (const auto& [name, value] : options) {
if (name.starts_with(KW_DURABLE_WRITES)) {
read_property_into(durable_writes_opts, name, value, KW_DURABLE_WRITES);
} else if (name.starts_with(KW_REPLICATION)) {
read_property_into(replication_opts, name, value, KW_REPLICATION);
} else if (name.starts_with(KW_TABLETS)) {
read_property_into(tablets_opts, name, value, KW_TABLETS);
} else if (name.starts_with(KW_STORAGE)) {
read_property_into(storage_opts, name, value, KW_STORAGE);
}
}

if (!replication_opts.empty())
add_property(KW_REPLICATION, replication_opts);
if (!storage_opts.empty())
add_property(KW_STORAGE, storage_opts);
if (!tablets_opts.empty())
add_property(KW_TABLETS, tablets_opts);
if (!durable_writes_opts.empty())
add_property(KW_DURABLE_WRITES, durable_writes_opts.begin()->second);
}

void ks_prop_defs::validate() {
// Skip validation if the strategy class is already set as it means we've already
// prepared (and redoing it would set strategyClass back to null, which we don't want)
Expand Down Expand Up @@ -158,6 +187,26 @@ std::optional<sstring> ks_prop_defs::get_replication_strategy_class() const {
return _strategy_class;
}

bool ks_prop_defs::get_durable_writes() const {
return get_boolean(KW_DURABLE_WRITES, true);
}

std::map<sstring, sstring> ks_prop_defs::get_all_options_flattened(const gms::feature_service& feat) const {
std::map<sstring, sstring> all_options;

auto ingest_flattened_options = [&all_options](const std::map<sstring, sstring>& options, const sstring& prefix) {
for (auto& option: options) {
all_options[prefix + ":" + option.first] = option.second;
}
};
ingest_flattened_options(get_replication_options(), KW_REPLICATION);
ingest_flattened_options(get_storage_options().to_map(), KW_STORAGE);
ingest_flattened_options(get_map(KW_TABLETS).value_or(std::map<sstring, sstring>{}), KW_TABLETS);
ingest_flattened_options({{sstring(KW_DURABLE_WRITES), to_sstring(get_boolean(KW_DURABLE_WRITES, true))}}, KW_DURABLE_WRITES);

return all_options;
}

lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata(sstring ks_name, const locator::token_metadata& tm, const gms::feature_service& feat) {
auto sc = get_replication_strategy_class().value();
std::optional<unsigned> initial_tablets = get_initial_tablets(sc, feat.tablets);
Expand Down
5 changes: 5 additions & 0 deletions cql3/statements/ks_prop_defs.hh
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,16 @@ public:
private:
std::optional<sstring> _strategy_class;
public:
ks_prop_defs() = default;
explicit ks_prop_defs(std::map<sstring, sstring> options);

void validate();
std::map<sstring, sstring> get_replication_options() const;
std::optional<sstring> get_replication_strategy_class() const;
std::optional<unsigned> get_initial_tablets(const sstring& strategy_class, bool enabled_by_default) const;
data_dictionary::storage_options get_storage_options() const;
bool get_durable_writes() const;
std::map<sstring, sstring> get_all_options_flattened(const gms::feature_service& feat) const;
lw_shared_ptr<data_dictionary::keyspace_metadata> as_ks_metadata(sstring ks_name, const locator::token_metadata&, const gms::feature_service&);
lw_shared_ptr<data_dictionary::keyspace_metadata> as_ks_metadata_update(lw_shared_ptr<data_dictionary::keyspace_metadata> old, const locator::token_metadata&, const gms::feature_service&);
};
Expand Down
1 change: 1 addition & 0 deletions cql3/statements/schema_altering_statement.hh
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ protected:

public:
virtual future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, const query_options& options, api::timestamp_type) const = 0;
mutable utils::UUID global_req_id;
};

}
Expand Down
12 changes: 12 additions & 0 deletions db/system_keyspace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,15 @@ schema_ptr system_keyspace::topology() {
.with_column("request_id", timeuuid_type)
.with_column("ignore_nodes", set_type_impl::get_instance(uuid_type, true), column_kind::static_column)
.with_column("new_cdc_generation_data_uuid", timeuuid_type, column_kind::static_column)
.with_column("new_keyspace_rf_change_ks_name", utf8_type, column_kind::static_column)
.with_column("new_keyspace_rf_change_data", map_type_impl::get_instance(utf8_type, utf8_type, false), column_kind::static_column)
.with_column("version", long_type, column_kind::static_column)
.with_column("fence_version", long_type, column_kind::static_column)
.with_column("transition_state", utf8_type, column_kind::static_column)
.with_column("committed_cdc_generations", set_type_impl::get_instance(cdc_generation_ts_id_type, true), column_kind::static_column)
.with_column("unpublished_cdc_generations", set_type_impl::get_instance(cdc_generation_ts_id_type, true), column_kind::static_column)
.with_column("global_topology_request", utf8_type, column_kind::static_column)
.with_column("global_topology_request_id", timeuuid_type, column_kind::static_column)
.with_column("enabled_features", set_type_impl::get_instance(utf8_type, true), column_kind::static_column)
.with_column("session", uuid_type, column_kind::static_column)
.with_column("tablet_balancing_enabled", boolean_type, column_kind::static_column)
Expand Down Expand Up @@ -3070,6 +3073,11 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
ret.committed_cdc_generations = decode_cdc_generations_ids(deserialize_set_column(*topology(), some_row, "committed_cdc_generations"));
}

if (some_row.has("new_keyspace_rf_change_data")) {
ret.new_keyspace_rf_change_ks_name = some_row.get_as<sstring>("new_keyspace_rf_change_ks_name");
ret.new_keyspace_rf_change_data = some_row.get_map<sstring,sstring>("new_keyspace_rf_change_data");
}

if (!ret.committed_cdc_generations.empty()) {
// Sanity check for CDC generation data consistency.
auto gen_id = ret.committed_cdc_generations.back();
Expand Down Expand Up @@ -3101,6 +3109,10 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
ret.global_request.emplace(req);
}

if (some_row.has("global_topology_request_id")) {
ret.global_request_id = some_row.get_as<utils::UUID>("global_topology_request_id");
}

if (some_row.has("enabled_features")) {
ret.enabled_features = decode_features(deserialize_set_column(*topology(), some_row, "enabled_features"));
}
Expand Down
Loading

0 comments on commit e74a4b0

Please sign in to comment.