Skip to content

Commit

Permalink
commit topology_change directly from alter_ks_stmt
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrsmrn committed Jan 24, 2024
1 parent ea6f965 commit 8f00651
Showing 1 changed file with 49 additions and 34 deletions.
83 changes: 49 additions & 34 deletions cql3/statements/alter_keyspace_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,33 +96,6 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce
const auto& tm = qp.proxy().get_token_metadata_ptr();
auto m = service::prepare_keyspace_update_announcement(qp.db().real_database(), _attrs->as_ks_metadata_update(old_ksm, *tm, feat), ts);

auto&& replication_strategy = qp.db().find_keyspace(_name).get_replication_strategy();
if (replication_strategy.uses_tablets()) {
// TODO: check if new RF differs by at most 1 from the old RF. Fail the query otherwise
// Check if topology_state_machine handles other global request
// TODO: - #include topology_state_machine to this module
// TOOD: change coordinator changes can only be applied from shard 0

if (!qp.topology_global_queue_empty())
{
auto topology = qp.proxy().get_token_metadata_ptr()->get_topology_change_info();
// TODO: make this function accept group0_guard instead of timestamp_type
// this will require changing signature in the whole class hierarchy
// service::group0_guard guard;
// service::topology_mutation_builder builder(guard.write_timestamp());
// builder.set_global_topology_request(service::global_topology_request::keyspace_rf_change);
// builder.set_keyspace_rf_change_data(_name, rf_per_dc); // TODO: implement
// service::topology_change change{{builder.build()}};
// std::move(change.mutations.begin(), change.mutations.end(), std::back_inserter(m));
}
// else {
// TODO: before returning, wait until the current global topology request is done. Blocker: #16374
return make_exception_future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>(
exceptions::invalid_request_exception(
"topology mutation cannot be performed while other request is ongoing"));
// }
}

using namespace cql_transport;
auto ret = ::make_shared<event::schema_change>(
event::schema_change::change_type::UPDATED,
Expand All @@ -142,14 +115,56 @@ cql3::statements::alter_keyspace_statement::prepare(data_dictionary::database db

static logging::logger mylogger("alter_keyspace");

future<> alter_tablets_keyspace(cql3::query_processor& qp, service::group0_guard& guard) {
// TODO: check if new RF differs by at most 1 from the old RF. Fail the query otherwise
if (this_shard_id() != 0) {
// change coordinator changes can only be applied from shard 0
co_return;
}

if (!qp.topology_global_queue_empty()) {
auto topology = qp.proxy().get_token_metadata_ptr()->get_topology_change_info();
service::topology_mutation_builder builder(guard.write_timestamp());
builder.set_global_topology_request(service::global_topology_request::keyspace_rf_change);
builder.set_keyspace_rf_change_data(_name, rf_per_dc); // TODO: implement
service::topology_change change{{builder.build()}};
auto& abort_source = guard.get_abort_source();
sstring reason = format("TBD Alter tablets ks");
// TODO: get group0 client from qp.remote().ss._group0->client()
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, reason);
while (true) {
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &abort_source);
break;
} catch (group0_concurrent_modification &) {
mylogger.debug("alter tablets ks: concurrent modification, retrying");

This comment has been minimized.

Copy link
@tgrabiec

tgrabiec Jan 26, 2024

Contributor

You need to reobtain the guard after this.

}
}
}
else {
// TODO: before returning, wait until the current global topology request is done. Blocker: #16374
co_return make_exception_future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>(
exceptions::invalid_request_exception(
"topology mutation cannot be performed while other request is ongoing"));
}
}

future<::shared_ptr<cql_transport::messages::result_message>>
cql3::statements::alter_keyspace_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
std::vector<sstring> warnings = check_against_restricted_replication_strategies(qp, keyspace(), *_attrs, qp.get_cql_stats());
return schema_altering_statement::execute(qp, state, options, std::move(guard)).then([warnings = std::move(warnings)] (::shared_ptr<messages::result_message> msg) {
for (const auto& warning : warnings) {
msg->add_warning(warning);
mylogger.warn("{}", warning);
}
return msg;
});

auto&& replication_strategy = qp.db().find_keyspace(_name).get_replication_strategy();
if (replication_strategy.uses_tablets()) {
alter_tablets_keyspace(qp, *guard); // TODO guard may be null
}
else {
return schema_altering_statement::execute(qp, state, options, std::move(guard)).then(
[warnings = std::move(warnings)](::shared_ptr<messages::result_message> msg) {
for (const auto &warning: warnings) {
msg->add_warning(warning);
mylogger.warn("{}", warning);
}
return msg;
});
}
}

0 comments on commit 8f00651

Please sign in to comment.