Skip to content

Commit

Permalink
tablets: alter ks DRAFT
Browse files Browse the repository at this point in the history
Just a sketch to show architecture/design, please don't review
  • Loading branch information
ptrsmrn committed Jan 13, 2024
1 parent d3139ac commit ecf18cd
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 5 deletions.
1 change: 1 addition & 0 deletions cql3/query_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -998,6 +998,7 @@ query_processor::execute_schema_statement(const statements::schema_altering_stat

cql3::cql_warnings_vec warnings;

// TODO: may require passing guard& and not only timestamp
auto [ret, m, cql_warnings] = co_await stmt.prepare_schema_mutations(*this, guard->write_timestamp());
warnings = std::move(cql_warnings);

Expand Down
35 changes: 32 additions & 3 deletions cql3/statements/alter_keyspace_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,42 @@ void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, c
#endif
}

#include "locator/load_sketch.hh"
#include "service/topology_guard.hh"
#include "service/topology_mutation.hh"
#include "locator/tablet_replication_strategy.hh"

future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>
cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_processor& qp, api::timestamp_type ts) const {
try {
auto old_ksm = qp.db().find_keyspace(_name).metadata();
const auto& tm = *qp.proxy().get_token_metadata_ptr();

auto m = service::prepare_keyspace_update_announcement(qp.db().real_database(), _attrs->as_ks_metadata_update(old_ksm, tm), ts);
const auto& tm = qp.proxy().get_token_metadata_ptr();
auto m = service::prepare_keyspace_update_announcement(qp.db().real_database(), _attrs->as_ks_metadata_update(old_ksm, *tm), ts);

auto&& replication_strategy = qp.db().find_keyspace(_name).get_replication_strategy();
if (replication_strategy.uses_tablets()) {
// TODO: check if new RF differs by at most 1 from the old RF. Fail the query otherwise

// Check if topology_state_machine handles other global request
// TODO: - #include topology_state_machine to this module
if (topology_state_machine._topology.global_request) // TODO: refactor this check into a separate function
{
// TODO: make this function accept group0_guard instead of timestamp_type
// this will require changing signature in the whole class hierarchy
service::group0_guard guard;
service::topology_mutation_builder builder(guard.write_timestamp());
builder.set_global_topology_request(service::global_topology_request::keyspace_rf_change);
builder.set_keyspace_rf_change_data(_name, rf_per_dc); // TODO: implement
service::topology_change change{{builder.build()}};
std::move(change.mutations.begin(), change.mutations.end(), std::back_inserter(m));
}
else {
// TODO: before returning, wait until the current global topology request is done. Blocker: #16374
return make_exception_future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>(
exceptions::invalid_request_exception(
"topology mutation cannot be performed while other request is ongoing"));
}
}

using namespace cql_transport;
auto ret = ::make_shared<event::schema_change>(
Expand Down
3 changes: 3 additions & 0 deletions db/system_keyspace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ schema_ptr system_keyspace::topology() {
.with_column("ignore_msb", int32_type)
.with_column("supported_features", set_type_impl::get_instance(utf8_type, true))
.with_column("new_cdc_generation_data_uuid", timeuuid_type, column_kind::static_column)
// TODO: add new columns
.with_column("version", long_type, column_kind::static_column)
.with_column("fence_version", long_type, column_kind::static_column)
.with_column("transition_state", utf8_type, column_kind::static_column)
Expand Down Expand Up @@ -2664,6 +2665,8 @@ future<service::topology> system_keyspace::load_topology_state() {
ret.new_cdc_generation_data_uuid = some_row.get_as<utils::UUID>("new_cdc_generation_data_uuid");
}

// TODO: load new data

if (some_row.has("current_cdc_generation_uuid")) {
auto gen_uuid = some_row.get_as<utils::UUID>("current_cdc_generation_uuid");
if (!some_row.has("current_cdc_generation_timestamp")) {
Expand Down
3 changes: 2 additions & 1 deletion docs/dev/topology-over-raft.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,7 @@ CREATE TABLE system.topology (
unpublished_cdc_generations set<tuple<timestamp, timeuuid>> static,
global_topology_request text static,
new_cdc_generation_data_uuid timeuuid static,
// TODO: extend ks
PRIMARY KEY (key, host_id)
)
```
Expand Down Expand Up @@ -288,7 +289,7 @@ There are also a few static columns for cluster-global properties:
- `unpublished_cdc_generations` - the IDs of the committed yet unpublished CDC generations
- `global_topology_request` - if set, contains one of the supported global topology requests
- `new_cdc_generation_data_uuid` - used in `commit_cdc_generation` state, the time UUID of the generation to be committed

// TODO: extend
# Join procedure

In topology on raft mode, new nodes need to go through a new handshake procedure
Expand Down
3 changes: 3 additions & 0 deletions service/migration_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,9 @@ future<> migration_manager::announce_with_raft(std::vector<mutation> schema, gro
auto schema_features = _feat.cluster_schema_features();
auto adjusted_schema = db::schema_tables::adjust_schema_for_schema_features(std::move(schema), schema_features);

// we have access to group0 client here, so global_topology_request::keyspace_rf_change
// will be executed under group0 out of the box (?)

auto group0_cmd = _group0_client.prepare_command(
schema_change{
.mutations{adjusted_schema.begin(), adjusted_schema.end()},
Expand Down
25 changes: 25 additions & 0 deletions service/storage_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1166,9 +1166,34 @@ class topology_coordinator {
auto reason = ::format(
"insert CDC generation data (UUID: {})", gen_uuid);
co_await update_topology_state(std::move(guard), {std::move(mutation), builder.build()}, reason);
break;
}
case global_topology_request::keyspace_rf_change: {
slogger.info("raft topology: new keyspace RF change requested");
auto tmptr = get_token_metadata_ptr();
auto ks_name = _topo_sm._topology.new_keyspace_rf_change_ks_name;
auto rf_per_dc = _topo_sm._topology.new_keyspace_rf_change_rf_per_dc;

locator::tablet_map tablets = tmptr->tablets().get_tablet_map(schema_ptr{}->id());
const auto& dcs = tmptr->get_topology().get_datacenters();
for (auto tablet : ks_tablets) {
tablet_id = tablet.id();
locator::tablet_replica_set replicas = tablets.get_tablet_info(tablet_id).replicas;
// TODO: should be based on network_topology_strategy::allocate_tablets_for_new_table, basically:
// When new RF < old RF, take the most loaded endpoints from load_sketch and drop extra redundant replicas from these endpoints
// Otherwise add new replicas from the least loaded endpoints
slogger.debug("Allocated tablets for {}.{} ({}): {}", s->ks_name(), s->cf_name(), s->id(), tablets);
tablet_transition_info transition{ .next = replicas };
tablets.set_tablet_transition_info(tablet_id, transition);
}
topology_mutation_builder builder(guard.write_timestamp());
builder.set_transition_state(topology::transition_state::tablet_migration);
auto reason = ::format(
"insert keyspace RF change data (ks name: {}, new RF per DC: {})", ks_name, rf_per_dc);
co_await update_topology_state(std::move(guard), {std::move(mutation), builder.build()}, reason);
break;
}
}
}

// Preconditions:
Expand Down
7 changes: 7 additions & 0 deletions service/topology_mutation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ topology_mutation_builder& topology_mutation_builder::set_new_cdc_generation_dat
return apply_atomic("new_cdc_generation_data_uuid", value);
}

topology_mutation_builder& topology_mutation_builder::set_new_keyspace_rf_change_data(
const sstring& ks_name, const std::map<sstring, unsigned>& rf_per_dc) {
apply_atomic("new_keyspace_rf_change_ks_name", ks_name);
apply_atomic("new_keyspace_rf_change_rf_per_dc", rf_per_dc); // TODO: transform into a string
return *this;
}

topology_mutation_builder& topology_mutation_builder::set_unpublished_cdc_generations(const std::vector<cdc::generation_id_v2>& values) {
auto dv = values | boost::adaptors::transformed([&] (const auto& v) {
return make_tuple_value(db::cdc_generation_ts_id_type, tuple_type_impl::native_type({v.ts, timeuuid_native_type{v.id}}));
Expand Down
3 changes: 2 additions & 1 deletion service/topology_state_machine.cc
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,8 @@ topology_request topology_request_from_string(const sstring& s) {
}

static std::unordered_map<global_topology_request, sstring> global_topology_request_to_name_map = {
{global_topology_request::new_cdc_generation, "new_cdc_generation"},
{global_topology_request::new_cdc_generation, "new_cdc_generation"},
{global_topology_request::keyspace_rf_change, "keyspace_rf_change"},
};

std::ostream& operator<<(std::ostream& os, const global_topology_request& req) {
Expand Down
3 changes: 3 additions & 0 deletions service/topology_state_machine.hh
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ using request_param = std::variant<join_param, rebuild_param, removenode_param,

enum class global_topology_request: uint16_t {
new_cdc_generation,
keyspace_rf_change,
};

struct ring_slice {
Expand Down Expand Up @@ -141,6 +142,8 @@ struct topology {
// It's used as the first column of the clustering key in CDC_GENERATIONS_V3 table.
std::optional<utils::UUID> new_cdc_generation_data_uuid;

// TODO: add new fields

// The IDs of the committed yet unpublished CDC generations sorted by timestamps.
std::vector<cdc::generation_id_v2> unpublished_cdc_generations;

Expand Down

0 comments on commit ecf18cd

Please sign in to comment.