Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tablets: alter keyspace #16723

Merged
merged 18 commits into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
65deddd
tablet_allocator: make load_balancer_stats_manager configurable by name
KrzaQ Mar 24, 2024
c888945
tablets: tests for adding/removing replicas
KrzaQ Mar 24, 2024
cb40f13
Add storage service to query processor
ptrsmrn May 6, 2024
0600897
New raft cmd for both schema & topo changes
ptrsmrn May 6, 2024
c174eee
Introduce new global topo `keyspace_rf_change` req
ptrsmrn May 6, 2024
6fd0a49
Allow query_processor to check if global topo queue is empty
ptrsmrn Apr 19, 2024
59d3fd6
Extend system.topology with 3 new columns to store data required to p…
ptrsmrn Apr 19, 2024
80ed442
Introduce TABLET_KEYSPACE event to differentiate processing path of a…
ptrsmrn May 6, 2024
7081215
Parameterize migration_manager::announce by type to allow executing d…
ptrsmrn Apr 20, 2024
fbd75c5
Implement ALTER tablets KEYSPACE statement support
ptrsmrn Apr 20, 2024
b875151
Reject ALTER with 'replication_factor' tag
ptrsmrn May 14, 2024
951915e
cql3/alter_keyspace_statement: Do not allow for change of RF by more …
dawmd Jan 15, 2024
ec5708b
cql-pytest: Verify RF is changes by at most 1 when tablets on
dawmd Jan 22, 2024
39181c4
Return response only when tablets are reallocated
ptrsmrn Apr 30, 2024
1b913dd
docs: document ALTER KEYSPACE with tablets
ptrsmrn May 14, 2024
2567e30
test/tablets: Unmark RF-changing test with xfail
xemul May 13, 2024
6e0e267
test: Fix the way tablets RF-change test parses mutation_fragments
xemul May 27, 2024
66f6001
test: Do not check tablets mutations on nodes that don't have them
xemul May 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions cql3/query_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
#include <seastar/coroutine/parallel_for_each.hh>

#include "service/storage_proxy.hh"
#include "service/topology_mutation.hh"
#include "service/migration_manager.hh"
#include "service/forward_service.hh"
#include "service/raft/raft_group0_client.hh"
#include "service/storage_service.hh"
ptrsmrn marked this conversation as resolved.
Show resolved Hide resolved
#include "cql3/CqlParser.hpp"
#include "cql3/statements/batch_statement.hh"
#include "cql3/statements/modification_statement.hh"
Expand All @@ -42,16 +44,22 @@ const sstring query_processor::CQL_VERSION = "3.3.1";
const std::chrono::minutes prepared_statements_cache::entry_expiry = std::chrono::minutes(60);

struct query_processor::remote {
remote(service::migration_manager& mm, service::forward_service& fwd, service::raft_group0_client& group0_client)
: mm(mm), forwarder(fwd), group0_client(group0_client) {}
remote(service::migration_manager& mm, service::forward_service& fwd,
service::storage_service& ss, service::raft_group0_client& group0_client)
: mm(mm), forwarder(fwd), ss(ss), group0_client(group0_client) {}

service::migration_manager& mm;
service::forward_service& forwarder;
service::storage_service& ss;
service::raft_group0_client& group0_client;

seastar::gate gate;
};

bool query_processor::topology_global_queue_empty() {
ptrsmrn marked this conversation as resolved.
Show resolved Hide resolved
return remote().first.get().ss.topology_global_queue_empty();
}

static service::query_state query_state_for_internal_call() {
return {service::client_state::for_internal_calls(), empty_service_permit()};
}
Expand Down Expand Up @@ -498,8 +506,8 @@ query_processor::~query_processor() {
}

void query_processor::start_remote(service::migration_manager& mm, service::forward_service& forwarder,
service::raft_group0_client& group0_client) {
_remote = std::make_unique<struct remote>(mm, forwarder, group0_client);
service::storage_service& ss, service::raft_group0_client& group0_client) {
_remote = std::make_unique<struct remote>(mm, forwarder, ss, group0_client);
}

future<> query_processor::stop_remote() {
Expand Down Expand Up @@ -1018,16 +1026,29 @@ query_processor::execute_schema_statement(const statements::schema_altering_stat

cql3::cql_warnings_vec warnings;

auto request_id = guard->new_group0_state_id();
stmt.global_req_id = request_id;

auto [ret, m, cql_warnings] = co_await stmt.prepare_schema_mutations(*this, options, guard->write_timestamp());
warnings = std::move(cql_warnings);

ce = std::move(ret);
if (!m.empty()) {
auto description = format("CQL DDL statement: \"{}\"", stmt.raw_cql_statement);
co_await remote_.get().mm.announce(std::move(m), std::move(*guard), description);
if (ce && ce->target == cql_transport::event::schema_change::target_type::TABLET_KEYSPACE) {
co_await remote_.get().mm.announce<service::topology_change>(std::move(m), std::move(*guard), description);
ptrsmrn marked this conversation as resolved.
Show resolved Hide resolved
// TODO: eliminate timeout from alter ks statement on the cqlsh/driver side
auto error = co_await remote_.get().ss.wait_for_topology_request_completion(request_id);
ptrsmrn marked this conversation as resolved.
Show resolved Hide resolved
co_await remote_.get().ss.wait_for_topology_not_busy();
ptrsmrn marked this conversation as resolved.
Show resolved Hide resolved
if (!error.empty()) {
log.error("CQL statement \"{}\" with topology request_id \"{}\" failed with error: \"{}\"", stmt.raw_cql_statement, request_id, error);
throw exceptions::request_execution_exception(exceptions::exception_code::INVALID, error);
}
} else {
co_await remote_.get().mm.announce<service::schema_change>(std::move(m), std::move(*guard), description);
}
}

ce = std::move(ret);

// If an IF [NOT] EXISTS clause was used, this may not result in an actual schema change. To avoid doing
// extra work in the drivers to handle schema changes, we return an empty message in this case. (CASSANDRA-7600)
::shared_ptr<messages::result_message> result;
Expand Down
5 changes: 4 additions & 1 deletion cql3/query_processor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ public:

~query_processor();

void start_remote(service::migration_manager&, service::forward_service&, service::raft_group0_client&);
void start_remote(service::migration_manager&, service::forward_service&,
service::storage_service& ss, service::raft_group0_client&);
future<> stop_remote();

data_dictionary::database db() {
Expand Down Expand Up @@ -460,6 +461,8 @@ public:

void reset_cache();

bool topology_global_queue_empty();
ptrsmrn marked this conversation as resolved.
Show resolved Hide resolved

private:
// Keep the holder until you stop using the `remote` services.
std::pair<std::reference_wrapper<remote>, gate::holder> remote();
Expand Down
87 changes: 80 additions & 7 deletions cql3/statements/alter_keyspace_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@
* SPDX-License-Identifier: (AGPL-3.0-or-later and Apache-2.0)
*/

#include <boost/range/algorithm.hpp>
#include <fmt/format.h>
#include <seastar/core/coroutine.hh>
#include <stdexcept>
#include "alter_keyspace_statement.hh"
#include "prepared_statement.hh"
#include "service/migration_manager.hh"
#include "service/storage_proxy.hh"
#include "service/topology_mutation.hh"
#include "db/system_keyspace.hh"
#include "data_dictionary/data_dictionary.hh"
#include "data_dictionary/keyspace_metadata.hh"
Expand All @@ -21,6 +25,8 @@
#include "create_keyspace_statement.hh"
#include "gms/feature_service.hh"

static logging::logger mylogger("alter_keyspace");

bool is_system_keyspace(std::string_view keyspace);

cql3::statements::alter_keyspace_statement::alter_keyspace_statement(sstring name, ::shared_ptr<ks_prop_defs> attrs)
Expand All @@ -36,6 +42,20 @@ future<> cql3::statements::alter_keyspace_statement::check_access(query_processo
return state.has_keyspace_access(_name, auth::permission::ALTER);
}

static bool validate_rf_difference(const std::string_view curr_rf, const std::string_view new_rf) {
auto to_number = [] (const std::string_view rf) {
int result;
// We assume the passed string view represents a valid decimal number,
// so we don't need the error code.
(void) std::from_chars(rf.begin(), rf.end(), result);
return result;
};

// We want to ensure that each DC's RF is going to change by at most 1
// because in that case the old and new quorums must overlap.
return std::abs(to_number(curr_rf) - to_number(new_rf)) <= 1;
}

void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, const service::client_state& state) const {
auto tmp = _name;
std::transform(tmp.begin(), tmp.end(), tmp.begin(), ::tolower);
Expand All @@ -61,6 +81,17 @@ void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, c
}

auto new_ks = _attrs->as_ks_metadata_update(ks.metadata(), *qp.proxy().get_token_metadata_ptr(), qp.proxy().features());

if (ks.get_replication_strategy().uses_tablets()) {
const std::map<sstring, sstring>& current_rfs = ks.metadata()->strategy_options();
for (const auto& [new_dc, new_rf] : _attrs->get_replication_options()) {
auto it = current_rfs.find(new_dc);
if (it != current_rfs.end() && !validate_rf_difference(it->second, new_rf)) {
throw exceptions::invalid_request_exception("Cannot modify replication factor of any DC by more than 1 at a time.");
}
}
}

locator::replication_strategy_params params(new_ks->strategy_options(), new_ks->initial_tablets());
auto new_rs = locator::abstract_replication_strategy::create_replication_strategy(new_ks->strategy_name(), params);
if (new_rs->is_per_table() != ks.get_replication_strategy().is_per_table()) {
Expand All @@ -83,20 +114,63 @@ void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, c

future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>
cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_processor& qp, const query_options&, api::timestamp_type ts) const {
using namespace cql_transport;
try {
auto old_ksm = qp.db().find_keyspace(_name).metadata();
event::schema_change::target_type target_type = event::schema_change::target_type::KEYSPACE;
auto ks = qp.db().find_keyspace(_name);
auto ks_md = ks.metadata();
const auto& tm = *qp.proxy().get_token_metadata_ptr();
const auto& feat = qp.proxy().features();
auto ks_md_update = _attrs->as_ks_metadata_update(ks_md, tm, feat);
std::vector<mutation> muts;
std::vector<sstring> warnings;
auto ks_options = _attrs->get_all_options_flattened(feat);

// we only want to run the tablets path if there are actually any tablets changes, not only schema changes
if (ks.get_replication_strategy().uses_tablets() && !_attrs->get_replication_options().empty()) {
if (!qp.topology_global_queue_empty()) {
return make_exception_future<std::tuple<::shared_ptr<::cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>(
exceptions::invalid_request_exception("Another global topology request is ongoing, please retry."));
}
if (_attrs->get_replication_options().contains(ks_prop_defs::REPLICATION_FACTOR_KEY)) {
return make_exception_future<std::tuple<::shared_ptr<::cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>(
exceptions::invalid_request_exception("'replication_factor' tag is not allowed when executing ALTER KEYSPACE with tablets, please list the DCs explicitly"));
}
qp.db().real_database().validate_keyspace_update(*ks_md_update);

service::topology_mutation_builder builder(ts);
builder.set_global_topology_request(service::global_topology_request::keyspace_rf_change);
builder.set_global_topology_request_id(this->global_req_id);
builder.set_new_keyspace_rf_change_data(_name, ks_options);
service::topology_change change{{builder.build()}};

auto topo_schema = qp.db().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY);
boost::transform(change.mutations, std::back_inserter(muts), [topo_schema] (const canonical_mutation& cm) {
return cm.to_mutation(topo_schema);
});

service::topology_request_tracking_mutation_builder rtbuilder{utils::UUID{this->global_req_id}};
rtbuilder.set("done", false)
.set("start_time", db_clock::now());
service::topology_change req_change{{rtbuilder.build()}};

auto topo_req_schema = qp.db().find_schema(db::system_keyspace::NAME, db::system_keyspace::TOPOLOGY_REQUESTS);
boost::transform(req_change.mutations, std::back_inserter(muts), [topo_req_schema] (const canonical_mutation& cm) {
return cm.to_mutation(topo_req_schema);
});

target_type = event::schema_change::target_type::TABLET_KEYSPACE;
} else {
auto schema_mutations = service::prepare_keyspace_update_announcement(qp.db().real_database(), ks_md_update, ts);
muts.insert(muts.begin(), schema_mutations.begin(), schema_mutations.end());
}

auto m = service::prepare_keyspace_update_announcement(qp.db().real_database(), _attrs->as_ks_metadata_update(old_ksm, tm, feat), ts);

using namespace cql_transport;
auto ret = ::make_shared<event::schema_change>(
event::schema_change::change_type::UPDATED,
event::schema_change::target_type::KEYSPACE,
target_type,
keyspace());

return make_ready_future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>(std::make_tuple(std::move(ret), std::move(m), std::vector<sstring>()));
return make_ready_future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>(std::make_tuple(std::move(ret), std::move(muts), warnings));
} catch (data_dictionary::no_such_keyspace& e) {
return make_exception_future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>(exceptions::invalid_request_exception("Unknown keyspace " + _name));
}
Expand All @@ -107,7 +181,6 @@ cql3::statements::alter_keyspace_statement::prepare(data_dictionary::database db
return std::make_unique<prepared_statement>(make_shared<alter_keyspace_statement>(*this));
}

static logging::logger mylogger("alter_keyspace");

future<::shared_ptr<cql_transport::messages::result_message>>
cql3::statements::alter_keyspace_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
Expand Down
49 changes: 49 additions & 0 deletions cql3/statements/ks_prop_defs.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,35 @@ static std::map<sstring, sstring> prepare_options(
return options;
}

ks_prop_defs::ks_prop_defs(std::map<sstring, sstring> options) {
std::map<sstring, sstring> replication_opts, storage_opts, tablets_opts, durable_writes_opts;

auto read_property_into = [] (auto& map, const sstring& name, const sstring& value, const sstring& tag) {
map[name.substr(sstring(tag).size() + 1)] = value;
};

for (const auto& [name, value] : options) {
if (name.starts_with(KW_DURABLE_WRITES)) {
read_property_into(durable_writes_opts, name, value, KW_DURABLE_WRITES);
} else if (name.starts_with(KW_REPLICATION)) {
read_property_into(replication_opts, name, value, KW_REPLICATION);
} else if (name.starts_with(KW_TABLETS)) {
read_property_into(tablets_opts, name, value, KW_TABLETS);
} else if (name.starts_with(KW_STORAGE)) {
read_property_into(storage_opts, name, value, KW_STORAGE);
}
}

if (!replication_opts.empty())
add_property(KW_REPLICATION, replication_opts);
if (!storage_opts.empty())
add_property(KW_STORAGE, storage_opts);
if (!tablets_opts.empty())
add_property(KW_TABLETS, tablets_opts);
if (!durable_writes_opts.empty())
add_property(KW_DURABLE_WRITES, durable_writes_opts.begin()->second);
}

void ks_prop_defs::validate() {
// Skip validation if the strategy class is already set as it means we've already
// prepared (and redoing it would set strategyClass back to null, which we don't want)
Expand Down Expand Up @@ -159,6 +188,26 @@ std::optional<sstring> ks_prop_defs::get_replication_strategy_class() const {
return _strategy_class;
}

bool ks_prop_defs::get_durable_writes() const {
return get_boolean(KW_DURABLE_WRITES, true);
}

std::map<sstring, sstring> ks_prop_defs::get_all_options_flattened(const gms::feature_service& feat) const {
std::map<sstring, sstring> all_options;

auto ingest_flattened_options = [&all_options](const std::map<sstring, sstring>& options, const sstring& prefix) {
for (auto& option: options) {
all_options[prefix + ":" + option.first] = option.second;
}
};
ingest_flattened_options(get_replication_options(), KW_REPLICATION);
ingest_flattened_options(get_storage_options().to_map(), KW_STORAGE);
ingest_flattened_options(get_map(KW_TABLETS).value_or(std::map<sstring, sstring>{}), KW_TABLETS);
ingest_flattened_options({{sstring(KW_DURABLE_WRITES), to_sstring(get_boolean(KW_DURABLE_WRITES, true))}}, KW_DURABLE_WRITES);

return all_options;
}

lw_shared_ptr<data_dictionary::keyspace_metadata> ks_prop_defs::as_ks_metadata(sstring ks_name, const locator::token_metadata& tm, const gms::feature_service& feat) {
auto sc = get_replication_strategy_class().value();
std::optional<unsigned> initial_tablets = get_initial_tablets(sc, feat.tablets);
Expand Down
5 changes: 5 additions & 0 deletions cql3/statements/ks_prop_defs.hh
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,16 @@ public:
private:
std::optional<sstring> _strategy_class;
public:
ks_prop_defs() = default;
explicit ks_prop_defs(std::map<sstring, sstring> options);

void validate();
std::map<sstring, sstring> get_replication_options() const;
std::optional<sstring> get_replication_strategy_class() const;
std::optional<unsigned> get_initial_tablets(const sstring& strategy_class, bool enabled_by_default) const;
data_dictionary::storage_options get_storage_options() const;
bool get_durable_writes() const;
std::map<sstring, sstring> get_all_options_flattened(const gms::feature_service& feat) const;
lw_shared_ptr<data_dictionary::keyspace_metadata> as_ks_metadata(sstring ks_name, const locator::token_metadata&, const gms::feature_service&);
lw_shared_ptr<data_dictionary::keyspace_metadata> as_ks_metadata_update(lw_shared_ptr<data_dictionary::keyspace_metadata> old, const locator::token_metadata&, const gms::feature_service&);
};
Expand Down
1 change: 1 addition & 0 deletions cql3/statements/schema_altering_statement.hh
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ protected:

public:
virtual future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, const query_options& options, api::timestamp_type) const = 0;
mutable utils::UUID global_req_id;
};

}
Expand Down
12 changes: 12 additions & 0 deletions db/system_keyspace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,15 @@ schema_ptr system_keyspace::topology() {
.with_column("request_id", timeuuid_type)
.with_column("ignore_nodes", set_type_impl::get_instance(uuid_type, true), column_kind::static_column)
.with_column("new_cdc_generation_data_uuid", timeuuid_type, column_kind::static_column)
.with_column("new_keyspace_rf_change_ks_name", utf8_type, column_kind::static_column)
.with_column("new_keyspace_rf_change_data", map_type_impl::get_instance(utf8_type, utf8_type, false), column_kind::static_column)
.with_column("version", long_type, column_kind::static_column)
.with_column("fence_version", long_type, column_kind::static_column)
.with_column("transition_state", utf8_type, column_kind::static_column)
.with_column("committed_cdc_generations", set_type_impl::get_instance(cdc_generation_ts_id_type, true), column_kind::static_column)
.with_column("unpublished_cdc_generations", set_type_impl::get_instance(cdc_generation_ts_id_type, true), column_kind::static_column)
.with_column("global_topology_request", utf8_type, column_kind::static_column)
.with_column("global_topology_request_id", timeuuid_type, column_kind::static_column)
.with_column("enabled_features", set_type_impl::get_instance(utf8_type, true), column_kind::static_column)
.with_column("session", uuid_type, column_kind::static_column)
.with_column("tablet_balancing_enabled", boolean_type, column_kind::static_column)
Expand Down Expand Up @@ -3070,6 +3073,11 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
ret.committed_cdc_generations = decode_cdc_generations_ids(deserialize_set_column(*topology(), some_row, "committed_cdc_generations"));
}

if (some_row.has("new_keyspace_rf_change_data")) {
ret.new_keyspace_rf_change_ks_name = some_row.get_as<sstring>("new_keyspace_rf_change_ks_name");
ret.new_keyspace_rf_change_data = some_row.get_map<sstring,sstring>("new_keyspace_rf_change_data");
}

if (!ret.committed_cdc_generations.empty()) {
// Sanity check for CDC generation data consistency.
auto gen_id = ret.committed_cdc_generations.back();
Expand Down Expand Up @@ -3101,6 +3109,10 @@ future<service::topology> system_keyspace::load_topology_state(const std::unorde
ret.global_request.emplace(req);
}

if (some_row.has("global_topology_request_id")) {
ret.global_request_id = some_row.get_as<utils::UUID>("global_topology_request_id");
}

if (some_row.has("enabled_features")) {
ret.enabled_features = decode_features(deserialize_set_column(*topology(), some_row, "enabled_features"));
}
Expand Down
Loading
Loading