Skip to content

Commit

Permalink
resolve minor comments
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrsmrn committed Jan 31, 2024
1 parent 13a65f8 commit c23761f
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 46 deletions.
5 changes: 3 additions & 2 deletions cql3/statements/alter_keyspace_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,10 @@ future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector
cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
try {
auto old_ksm = qp.db().find_keyspace(_name).metadata();
const auto& tm = *qp.proxy().get_token_metadata_ptr();
const auto& feat = qp.proxy().features();
const auto& tm = qp.proxy().get_token_metadata_ptr();
auto m = service::prepare_keyspace_update_announcement(qp.db().real_database(), _attrs->as_ks_metadata_update(old_ksm, *tm, feat), ts);

auto m = service::prepare_keyspace_update_announcement(qp.db().real_database(), _attrs->as_ks_metadata_update(old_ksm, tm, feat), ts);

using namespace cql_transport;
auto ret = ::make_shared<event::schema_change>(
Expand Down
2 changes: 1 addition & 1 deletion docs/dev/topology-over-raft.md
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ CREATE TABLE system.topology (
unpublished_cdc_generations set<tuple<timestamp, timeuuid>> static,
global_topology_request text static,
new_cdc_generation_data_uuid timeuuid static,
// TODO: extend ks
// TODO: add rf_change column
PRIMARY KEY (key, host_id)
)
```
Expand Down
2 changes: 1 addition & 1 deletion locator/tablets.cc
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ tablet_transition_stage tablet_transition_stage_from_string(const sstring& name)
static const std::unordered_map<tablet_transition_kind, sstring> tablet_transition_kind_to_name = {
{tablet_transition_kind::migration, "migration"},
{tablet_transition_kind::rebuild, "rebuild"},
{tablet_transition_kind::rebuild, "rf_change"},
{tablet_transition_kind::rf_change, "rf_change"},
};

static const std::unordered_map<sstring, tablet_transition_kind> tablet_transition_kind_from_name = std::invoke([] {
Expand Down
3 changes: 0 additions & 3 deletions service/migration_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -951,9 +951,6 @@ future<> migration_manager::announce_with_raft(std::vector<mutation> schema, gro
auto schema_features = _feat.cluster_schema_features();
auto adjusted_schema = db::schema_tables::adjust_schema_for_schema_features(std::move(schema), schema_features);

// we have access to group0 client here, so global_topology_request::keyspace_rf_change
// will be executed under group0 out of the box (?)

auto group0_cmd = _group0_client.prepare_command(
schema_change{
.mutations{adjusted_schema.begin(), adjusted_schema.end()},
Expand Down
77 changes: 38 additions & 39 deletions service/storage_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3004,44 +3004,6 @@ future<> storage_service::raft_decommission() {
throw std::runtime_error(err);
}
}
}
// TODO: remove. It's currently needed to throw cql3 exception if topology_global_queue_empty() == true
#include "cql3/cql_statement.hh"
namespace service {
future<> storage_service::alter_tablets_keyspace() {
utils::UUID request_id;
if (this_shard_id() != 0) {
co_return;
}

while (true) {
if (!topology_global_queue_empty()) {
auto guard = co_await _group0->client().start_operation(&_group0_as);
request_id = guard.new_group0_state_id();

topology_mutation_builder builder(guard.write_timestamp());
builder.set_global_topology_request(global_topology_request::keyspace_rf_change);
// builder.set_keyspace_rf_change_data(_name, rf_per_dc); // TODO: implement
topology_change change{{builder.build()}};

group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, "alter_tablets_keyspace");
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source);
break;
} catch (group0_concurrent_modification &) {
rtlogger.info("alter_tablets_keyspace: concurrent modification, retrying");
}
} else {
throw make_exception_future<std::tuple<::shared_ptr<::cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>(
exceptions::invalid_request_exception("alter_tablets_keyspace: topology mutation cannot be performed while other request is ongoing"));
}
}
auto error = co_await wait_for_topology_request_completion(request_id);

if (!error.empty()) {
throw std::runtime_error(fmt::format("alter_tablets_keyspace failed. See earlier errors ({})", error));
}
}

future<> storage_service::decommission() {
return run_with_api_lock(sstring("decommission"), [] (storage_service& ss) {
Expand Down Expand Up @@ -5018,7 +4980,6 @@ future<> storage_service::stream_tablet(locator::global_tablet_id tablet) {
switch (trinfo->transition) {
case locator::tablet_transition_kind::migration: return streaming::stream_reason::tablet_migration;
case locator::tablet_transition_kind::rebuild: return streaming::stream_reason::rebuild;
case locator::tablet_transition_kind::rf_change: return streaming::stream_reason::rf_change;
default:
throw std::runtime_error(format("stream_tablet(): Invalid tablet transition: {}", trinfo->transition));
}
Expand Down Expand Up @@ -5158,6 +5119,44 @@ future<> storage_service::move_tablet(table_id table, dht::token token, locator:
return !tmap.get_tablet_transition_info(tmap.get_tablet_id(token));
});
}
}
// TODO: remove. It's currently needed to throw cql3 exception if topology_global_queue_empty() == true
#include "cql3/cql_statement.hh"
namespace service {
future<> storage_service::alter_tablets_keyspace() {
utils::UUID request_id;
if (this_shard_id() != 0) {
co_return;
}

while (true) {
if (!topology_global_queue_empty()) {
auto guard = co_await _group0->client().start_operation(&_group0_as);
request_id = guard.new_group0_state_id();

topology_mutation_builder builder(guard.write_timestamp());
builder.set_global_topology_request(global_topology_request::keyspace_rf_change);
// builder.set_keyspace_rf_change_data(_name, rf_per_dc); // TODO: implement
topology_change change{{builder.build()}};

group0_command g0_cmd = _group0->client().prepare_command(std::move(change), guard, "alter_tablets_keyspace");
try {
co_await _group0->client().add_entry(std::move(g0_cmd), std::move(guard), &_abort_source);
break;
} catch (group0_concurrent_modification &) {
rtlogger.info("alter_tablets_keyspace: concurrent modification, retrying");
}
} else {
throw make_exception_future<std::tuple<::shared_ptr<::cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>(
exceptions::invalid_request_exception("alter_tablets_keyspace: topology mutation cannot be performed while other request is ongoing"));
}
}
auto error = co_await wait_for_topology_request_completion(request_id);

if (!error.empty()) {
throw std::runtime_error(fmt::format("alter_tablets_keyspace failed with: {}", error));
}
}

future<> storage_service::set_tablet_balancing_enabled(bool enabled) {
auto holder = _async_gate.hold();
Expand Down

0 comments on commit c23761f

Please sign in to comment.