Skip to content

Commit

Permalink
various small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrsmrn committed Feb 19, 2024
1 parent 6ce11dd commit cf47b4b
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 20 deletions.
5 changes: 3 additions & 2 deletions cql3/query_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ struct query_processor::remote {
seastar::gate gate;
};

future<> query_processor::alter_tablets_keyspace(sstring ks_name, std::map<sstring, sstring> replication_options) {
return remote().first.get().ss.alter_tablets_keyspace(ks_name, replication_options);
future<> query_processor::alter_tablets_keyspace(sstring ks_name, std::map<sstring, sstring> replication_options,
std::optional<service::group0_guard>& guard) {
return remote().first.get().ss.alter_tablets_keyspace(ks_name, replication_options, guard);
}

static service::query_state query_state_for_internal_call() {
Expand Down
3 changes: 2 additions & 1 deletion cql3/query_processor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,8 @@ public:

void reset_cache();

future<> alter_tablets_keyspace(sstring ks_name, std::map<sstring, sstring> replication_options);
future<> alter_tablets_keyspace(sstring ks_name, std::map<sstring, sstring> replication_options,
std::optional<service::group0_guard>& guard);

private:
// Keep the holder until you stop using the `remote` services.
Expand Down
9 changes: 7 additions & 2 deletions cql3/statements/alter_keyspace_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,17 @@ future<::shared_ptr<cql_transport::messages::result_message>>
cql3::statements::alter_keyspace_statement::execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
std::vector<sstring> warnings = check_against_restricted_replication_strategies(qp, keyspace(), *_attrs, qp.get_cql_stats());

mylogger.warn("entering");
auto&& replication_strategy = qp.db().find_keyspace(_name).get_replication_strategy();
if (replication_strategy.uses_tablets()) {
// TODO: check if new RF differs by at most 1 from the old RF. Fail the query otherwise
qp.alter_tablets_keyspace(_name, _attrs->get_replication_options()).get();
// always bounce to shard 0?
// w ctorze alter statement wziac needs_guard
mylogger.warn("if (replication_strategy.uses_tablets()): {}, {}", _name, _attrs->get_replication_options());
co_await qp.alter_tablets_keyspace(_name, _attrs->get_replication_options(), guard);
}
return schema_altering_statement::execute(qp, state, options, std::move(guard)).then(
mylogger.warn("leaving");
co_return co_await schema_altering_statement::execute(qp, state, options, std::move(guard)).then(
[warnings = std::move(warnings)](::shared_ptr<messages::result_message> msg) {
for (const auto &warning: warnings) {
msg->add_warning(warning);
Expand Down
9 changes: 7 additions & 2 deletions db/system_keyspace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ schema_ptr system_keyspace::topology() {
.with_column("supported_features", set_type_impl::get_instance(utf8_type, true))
.with_column("request_id", timeuuid_type)
.with_column("new_cdc_generation_data_uuid", timeuuid_type, column_kind::static_column)
// TODO: add new columns
.with_column("new_keyspace_rf_change_ks_name", utf8_type, column_kind::static_column)
.with_column("new_keyspace_rf_change_rf_per_dc", map_type_impl::get_instance(utf8_type, utf8_type, false), column_kind::static_column)
.with_column("version", long_type, column_kind::static_column)
.with_column("fence_version", long_type, column_kind::static_column)
.with_column("transition_state", utf8_type, column_kind::static_column)
Expand Down Expand Up @@ -2841,7 +2842,11 @@ future<service::topology> system_keyspace::load_topology_state() {
ret.new_cdc_generation_data_uuid = some_row.get_as<utils::UUID>("new_cdc_generation_data_uuid");
}

// TODO: load new data
if (some_row.has("new_keyspace_rf_change_rf_per_dc")) {
ret.new_keyspace_rf_change_ks_name = some_row.get_as<sstring>("new_keyspace_rf_change_ks_name");
ret.new_keyspace_rf_change_rf_per_dc = some_row.get_map<sstring,sstring>("new_keyspace_rf_change_rf_per_dc");
slogger.info("if (some_row.has(\"new_keyspace_rf_change_rf_per_dc\")) {} {}", ret.new_keyspace_rf_change_ks_name, ret.new_keyspace_rf_change_rf_per_dc);
}

if (some_row.has("current_cdc_generation_uuid")) {
auto gen_uuid = some_row.get_as<utils::UUID>("current_cdc_generation_uuid");
Expand Down
28 changes: 22 additions & 6 deletions service/storage_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5541,35 +5541,51 @@ future<> storage_service::move_tablet(table_id table, dht::token token, locator:
// TODO: remove this header. It's currently needed to throw cql3 exception if topology_global_queue_empty() == true
#include "cql3/cql_statement.hh"
namespace service {
future<> storage_service::alter_tablets_keyspace(sstring ks_name, std::map<sstring, sstring> replication_options) {
future<> storage_service::alter_tablets_keyspace(sstring ks_name, std::map<sstring, sstring> replication_options,
std::optional<service::group0_guard>& guard) {
utils::UUID request_id;
if (this_shard_id() != 0) {
rtlogger.info("if (this_shard_id() != 0)");
co_return;
}

bool had_guard = guard.has_value();

while (true) {
rtlogger.info("while (true)");
if (topology_global_queue_empty()) {
auto guard = co_await _group0->client().start_operation(&_group0_as);
request_id = guard.new_group0_state_id();
rtlogger.info("if (topology_global_queue_empty())");
if (not guard.has_value())
guard = co_await _group0->client().start_operation(&_group0_as);
rtlogger.info("acquiring guard done");

topology_mutation_builder builder(guard.write_timestamp());
topology_mutation_builder builder(guard->write_timestamp());
builder.set_global_topology_request(global_topology_request::keyspace_rf_change);
builder.set_new_keyspace_rf_change_data(ks_name, replication_options);
topology_change change{{builder.build()}};
rtlogger.info("building topology_change done");

group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, "alter_tablets_keyspace");
group0_command g0_cmd = _group0->client().prepare_command(std::move(change), *guard, "alter_tablets_keyspace");
request_id = guard->new_group0_state_id();
rtlogger.info("prepare_command done");
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source);
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(*guard), &_abort_source);
rtlogger.info("co_await _group0->client().add_entry done");
if (had_guard)
guard = co_await _group0->client().start_operation(&_group0_as);
break;
} catch (group0_concurrent_modification &) {
rtlogger.info("alter_tablets_keyspace: concurrent modification, retrying");
}
} else {
rtlogger.info("else");
throw make_exception_future<std::tuple<::shared_ptr<::cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>(
exceptions::invalid_request_exception("alter_tablets_keyspace: topology mutation cannot be performed while other request is ongoing"));
}
}
rtlogger.info("before co_await wait_for_topology_request_completion");
auto error = co_await wait_for_topology_request_completion(request_id);
rtlogger.info("co_await wait_for_topology_request_completion done");

if (!error.empty()) {
throw std::runtime_error(fmt::format("alter_tablets_keyspace failed with: {}", error));
Expand Down
3 changes: 2 additions & 1 deletion service/storage_service.hh
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,8 @@ public:
future<> topology_transition();

future<> do_cluster_cleanup();
future<> alter_tablets_keyspace(sstring ks_name, std::map<sstring, sstring> replication_options);
future<> alter_tablets_keyspace(sstring ks_name, std::map<sstring, sstring> replication_options,
std::optional<service::group0_guard>& guard);

// Starts the upgrade procedure to topology on raft.
// Must be called on shard 0.
Expand Down
12 changes: 7 additions & 5 deletions service/topology_coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
}

future<locator::tablet_map> reallocate_tablets_for_new_rf(schema_ptr s, locator::token_metadata_ptr tm,
std::map<sstring, sstring> new_rf_per_dc) {
std::unordered_map<sstring, sstring> new_rf_per_dc) {
// TODO: include https://github.com/scylladb/scylladb/pull/17116
co_return locator::tablet_map{8};
}
Expand Down Expand Up @@ -721,11 +721,12 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
rtlogger.info("raft topology: new keyspace RF change requested");
auto tmptr = get_token_metadata_ptr();
sstring ks_name = *_topo_sm._topology.new_keyspace_rf_change_ks_name;
std::map<sstring, sstring> new_rf_per_dc = *_topo_sm._topology.new_keyspace_rf_change_rf_per_dc;
std::unordered_map<sstring, sstring> new_rf_per_dc = *_topo_sm._topology.new_keyspace_rf_change_rf_per_dc;
rtlogger.info("global_topology_request::keyspace_rf_change {} {}", ks_name, new_rf_per_dc);
std::vector<canonical_mutation> updates;
for (const auto& table : _db.find_keyspace(ks_name).metadata()->tables()) {
auto new_tablet_map = co_await reallocate_tablets_for_new_rf(table, tmptr, new_rf_per_dc);
rtlogger.debug("Updating tablet map for {}.{}", ks_name, table->cf_name());
rtlogger.info("Updating tablet map for {}.{}", ks_name, table->cf_name());
auto tablet_id = new_tablet_map.first_tablet();
while (true) {
auto& tablet_info = new_tablet_map.get_tablet_info(tablet_id);
Expand All @@ -746,17 +747,18 @@ class topology_coordinator : public endpoint_lifecycle_subscriber {
updates.push_back(canonical_mutation(topology_mutation_builder(guard.write_timestamp())
.set_transition_state(topology::transition_state::tablet_migration)
.set_version(_topo_sm._topology.version + 1)
.del_global_topology_request()
.build()));
sstring reason = format("TODO Provide exhaustive reason");
rtlogger.info("{}", reason);
rtlogger.trace("do update {} reason {}", updates, reason);
rtlogger.info("do update {} reason {}", updates, reason);
topology_change change{std::move(updates)};
group0_command g0_cmd = _group0.client().prepare_command(std::move(change), guard, reason);
try {
co_await _group0.client().add_entry(std::move(g0_cmd), std::move(guard), &_as);
break;
} catch (group0_concurrent_modification&) {
rtlogger.debug("move_tablet(): concurrent modification, retrying");
rtlogger.info("move_tablet(): concurrent modification, retrying");
}
}
break;
Expand Down
2 changes: 1 addition & 1 deletion service/topology_state_machine.hh
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ struct topology {

// TODO: provide descriptions
std::optional<sstring> new_keyspace_rf_change_ks_name;
std::optional<std::map<sstring, sstring>> new_keyspace_rf_change_rf_per_dc;
std::optional<std::unordered_map<sstring, sstring>> new_keyspace_rf_change_rf_per_dc;

// The IDs of the committed yet unpublished CDC generations sorted by timestamps.
std::vector<cdc::generation_id_v2> unpublished_cdc_generations;
Expand Down

0 comments on commit cf47b4b

Please sign in to comment.