diff --git a/cdc/generation.cc b/cdc/generation.cc index 5dd1c2fcdf72..34de370ea89e 100644 --- a/cdc/generation.cc +++ b/cdc/generation.cc @@ -68,10 +68,10 @@ static constexpr auto stream_id_index_shift = stream_id_version_shift + stream_i static constexpr auto stream_id_random_shift = stream_id_index_shift + stream_id_index_bits; /** - * Responsibilty for encoding stream_id moved from factory method to - * this constructor, to keep knowledge of composition in a single place. - * Note this is private and friended to topology_description_generator, - * because he is the one who defined the "order" we view vnodes etc. + * Responsibility for encoding stream_id moved from the create_stream_ids + * function to this constructor, to keep knowledge of composition in a + * single place. Note the make_new_generation_description function + * defines the "order" in which we view vnodes etc. */ stream_id::stream_id(dht::token token, size_t vnode_index) : _value(bytes::initialized_later(), 2 * sizeof(int64_t)) @@ -185,76 +185,6 @@ static std::vector create_stream_ids( return result; } -class topology_description_generator final { - const std::unordered_set& _bootstrap_tokens; - const locator::token_metadata_ptr _tmptr; - const noncopyable_function (dht::token)>& _get_sharding_info; - - // Compute a set of tokens that split the token ring into vnodes - auto get_tokens() const { - auto tokens = _tmptr->sorted_tokens(); - auto it = tokens.insert( - tokens.end(), _bootstrap_tokens.begin(), _bootstrap_tokens.end()); - std::sort(it, tokens.end()); - std::inplace_merge(tokens.begin(), it, tokens.end()); - tokens.erase(std::unique(tokens.begin(), tokens.end()), tokens.end()); - return tokens; - } - - token_range_description create_description(size_t index, dht::token start, dht::token end) const { - token_range_description desc; - - desc.token_range_end = end; - - auto [shard_count, ignore_msb] = _get_sharding_info(end); - desc.streams = create_stream_ids(index, start, end, shard_count, ignore_msb); - desc.sharding_ignore_msb = ignore_msb; - - return desc; - } -public: - topology_description_generator( - const std::unordered_set& bootstrap_tokens, - const locator::token_metadata_ptr tmptr, - // This function must return sharding parameters for a node that owns the vnode ending with - // the given token. Returns pair. - const noncopyable_function (dht::token)>& get_sharding_info) - : _bootstrap_tokens(bootstrap_tokens) - , _tmptr(std::move(tmptr)) - , _get_sharding_info(get_sharding_info) - {} - - /* - * Generate a set of CDC stream identifiers such that for each shard - * and vnode pair there exists a stream whose token falls into this vnode - * and is owned by this shard. It is sometimes not possible to generate - * a CDC stream identifier for some (vnode, shard) pair because not all - * shards have to own tokens in a vnode. Small vnode can be totally owned - * by a single shard. In such case, a stream identifier that maps to - * end of the vnode is generated. - * - * Then build a cdc::topology_description which maps tokens to generated - * stream identifiers, such that if token T is owned by shard S in vnode V, - * it gets mapped to the stream identifier generated for (S, V). - */ - // Run in seastar::async context. - topology_description generate() const { - const auto tokens = get_tokens(); - - std::vector vnode_descriptions; - vnode_descriptions.reserve(tokens.size()); - - vnode_descriptions.push_back( - create_description(0, tokens.back(), tokens.front())); - for (size_t idx = 1; idx < tokens.size(); ++idx) { - vnode_descriptions.push_back( - create_description(idx, tokens[idx - 1], tokens[idx])); - } - - return {std::move(vnode_descriptions)}; - } -}; - bool should_propose_first_generation(const gms::inet_address& me, const gms::gossiper& g) { auto my_host_id = g.get_host_id(me); return g.for_each_endpoint_state_until([&] (const gms::inet_address& node, const gms::endpoint_state& eps) { @@ -282,21 +212,21 @@ bool is_cdc_generation_optimal(const cdc::topology_description& gen, const locat } } -future> get_cdc_generation_mutations( +static future> get_common_cdc_generation_mutations( schema_ptr s, - utils::UUID id, + const partition_key& pkey, + noncopyable_function&& get_ckey_from_range_end, const cdc::topology_description& desc, size_t mutation_size_threshold, api::timestamp_type ts) { utils::chunked_vector res; - res.emplace_back(s, partition_key::from_singular(*s, id)); - res.back().set_static_cell(to_bytes("num_ranges"), int32_t(desc.entries().size()), ts); + res.emplace_back(s, pkey); size_t size_estimate = 0; size_t total_size_estimate = 0; for (auto& e : desc.entries()) { if (size_estimate >= mutation_size_threshold) { total_size_estimate += size_estimate; - res.emplace_back(s, partition_key::from_singular(*s, id)); + res.emplace_back(s, pkey); size_estimate = 0; } @@ -307,7 +237,7 @@ future> get_cdc_generation_mutations( } size_estimate += e.streams.size() * 20; - auto ckey = clustering_key::from_singular(*s, dht::token::to_int64(e.token_range_end)); + auto ckey = get_ckey_from_range_end(e.token_range_end); res.back().set_cell(ckey, to_bytes("streams"), make_set_value(db::cdc_streams_set_type, std::move(streams)), ts); res.back().set_cell(ckey, to_bytes("ignore_msb"), int8_t(e.sharding_ignore_msb), ts); @@ -331,6 +261,36 @@ future> get_cdc_generation_mutations( co_return res; } +future> get_cdc_generation_mutations_v2( + schema_ptr s, + utils::UUID id, + const cdc::topology_description& desc, + size_t mutation_size_threshold, + api::timestamp_type ts) { + auto pkey = partition_key::from_singular(*s, id); + auto get_ckey = [s] (dht::token range_end) { + return clustering_key::from_singular(*s, dht::token::to_int64(range_end)); + }; + + auto res = co_await get_common_cdc_generation_mutations(s, pkey, std::move(get_ckey), desc, mutation_size_threshold, ts); + res.back().set_static_cell(to_bytes("num_ranges"), int32_t(desc.entries().size()), ts); + co_return res; +} + +future> get_cdc_generation_mutations_v3( + schema_ptr s, + utils::UUID id, + const cdc::topology_description& desc, + size_t mutation_size_threshold, + api::timestamp_type ts) { + auto pkey = partition_key::from_singular(*s, CDC_GENERATIONS_V3_KEY); + auto get_ckey = [&] (dht::token range_end) { + return clustering_key::from_exploded(*s, {timeuuid_type->decompose(id), long_type->decompose(dht::token::to_int64(range_end))}) ; + }; + + co_return co_await get_common_cdc_generation_mutations(s, pkey, std::move(get_ckey), desc, mutation_size_threshold, ts); +} + // non-static for testing size_t limit_of_streams_in_topology_description() { // Each stream takes 16B and we don't want to exceed 4MB so we can have @@ -363,13 +323,47 @@ topology_description limit_number_of_streams_if_needed(topology_description&& de return topology_description(std::move(entries)); } -std::pair make_new_generation_data( +// Compute a set of tokens that split the token ring into vnodes. +static auto get_tokens(const std::unordered_set& bootstrap_tokens, const locator::token_metadata_ptr tmptr) { + auto tokens = tmptr->sorted_tokens(); + auto it = tokens.insert(tokens.end(), bootstrap_tokens.begin(), bootstrap_tokens.end()); + std::sort(it, tokens.end()); + std::inplace_merge(tokens.begin(), it, tokens.end()); + tokens.erase(std::unique(tokens.begin(), tokens.end()), tokens.end()); + return tokens; +} + +static token_range_description create_token_range_description( + size_t index, + dht::token start, + dht::token end, + const noncopyable_function (dht::token)>& get_sharding_info) { + token_range_description desc; + + desc.token_range_end = end; + + auto [shard_count, ignore_msb] = get_sharding_info(end); + desc.streams = create_stream_ids(index, start, end, shard_count, ignore_msb); + desc.sharding_ignore_msb = ignore_msb; + + return desc; +} + +cdc::topology_description make_new_generation_description( const std::unordered_set& bootstrap_tokens, const noncopyable_function(dht::token)>& get_sharding_info, const locator::token_metadata_ptr tmptr) { - auto gen = topology_description_generator(bootstrap_tokens, tmptr, get_sharding_info).generate(); - auto uuid = utils::make_random_uuid(); - return {uuid, std::move(gen)}; + const auto tokens = get_tokens(bootstrap_tokens, tmptr); + + std::vector vnode_descriptions; + vnode_descriptions.reserve(tokens.size()); + + vnode_descriptions.push_back(create_token_range_description(0, tokens.back(), tokens.front(), get_sharding_info)); + for (size_t idx = 1; idx < tokens.size(); ++idx) { + vnode_descriptions.push_back(create_token_range_description(idx, tokens[idx - 1], tokens[idx], get_sharding_info)); + } + + return {std::move(vnode_descriptions)}; } db_clock::time_point new_generation_timestamp(bool add_delay, std::chrono::milliseconds ring_delay) { @@ -401,7 +395,9 @@ future generation_service::legacy_make_new_generation(const return {sc > 0 ? sc : 1, get_sharding_ignore_msb(*endpoint, _gossiper)}; } }; - auto [uuid, gen] = make_new_generation_data(bootstrap_tokens, get_sharding_info, tmptr); + + auto uuid = utils::make_random_uuid(); + auto gen = make_new_generation_description(bootstrap_tokens, get_sharding_info, tmptr); // Our caller should ensure that there are normal tokens in the token ring. auto normal_token_owners = tmptr->count_normal_token_owners(); diff --git a/cdc/generation.hh b/cdc/generation.hh index 882b166e665b..fa89b3337231 100644 --- a/cdc/generation.hh +++ b/cdc/generation.hh @@ -139,7 +139,22 @@ bool should_propose_first_generation(const gms::inet_address& me, const gms::gos */ bool is_cdc_generation_optimal(const cdc::topology_description& gen, const locator::token_metadata& tm); -std::pair make_new_generation_data( +/* + * Generate a set of CDC stream identifiers such that for each shard + * and vnode pair there exists a stream whose token falls into this vnode + * and is owned by this shard. It is sometimes not possible to generate + * a CDC stream identifier for some (vnode, shard) pair because not all + * shards have to own tokens in a vnode. Small vnode can be totally owned + * by a single shard. In such case, a stream identifier that maps to + * end of the vnode is generated. + * + * Then build a cdc::topology_description which maps tokens to generated + * stream identifiers, such that if token T is owned by shard S in vnode V, + * it gets mapped to the stream identifier generated for (S, V). + * + * Run in seastar::async context. + */ +cdc::topology_description make_new_generation_description( const std::unordered_set& bootstrap_tokens, const noncopyable_function (dht::token)>& get_sharding_info, const locator::token_metadata_ptr); @@ -150,9 +165,20 @@ db_clock::time_point new_generation_timestamp(bool add_delay, std::chrono::milli // using `mutation_size_threshold` to decide on the mutation sizes. The partition key of each mutation // is given by `gen_uuid`. The timestamp of each cell in each mutation is given by `mutation_timestamp`. // -// Works for only specific schemas: CDC_GENERATIONS_V2 (in system_distributed_keyspace) -// and CDC_GENERATIONS_V3 (in system_keyspace). -future> get_cdc_generation_mutations( +// Works only for the CDC_GENERATIONS_V2 schema (in system_distributed keyspace). +future> get_cdc_generation_mutations_v2( + schema_ptr, utils::UUID gen_uuid, const cdc::topology_description&, + size_t mutation_size_threshold, api::timestamp_type mutation_timestamp); + +// The partition key of all rows in the single-partition CDC_GENERATIONS_V3 schema (in system keyspace). +static constexpr auto CDC_GENERATIONS_V3_KEY = "cdc_generations"; + +// Translates the CDC generation data given by a `cdc::topology_description` into a vector of mutations, +// using `mutation_size_threshold` to decide on the mutation sizes. The first clustering key column is +// given by `gen_uuid`. The timestamp of each cell in each mutation is given by `mutation_timestamp`. +// +// Works only for the CDC_GENERATIONS_V3 schema (in system keyspace). +future> get_cdc_generation_mutations_v3( schema_ptr, utils::UUID gen_uuid, const cdc::topology_description&, size_t mutation_size_threshold, api::timestamp_type mutation_timestamp); diff --git a/db/system_distributed_keyspace.cc b/db/system_distributed_keyspace.cc index 8206d7db1edb..09cad02b87e0 100644 --- a/db/system_distributed_keyspace.cc +++ b/db/system_distributed_keyspace.cc @@ -536,7 +536,7 @@ system_distributed_keyspace::insert_cdc_generation( auto s = _qp.db().real_database().find_schema( system_distributed_keyspace::NAME_EVERYWHERE, system_distributed_keyspace::CDC_GENERATIONS_V2); - auto ms = co_await cdc::get_cdc_generation_mutations(s, id, desc, mutation_size_threshold, api::new_timestamp()); + auto ms = co_await cdc::get_cdc_generation_mutations_v2(s, id, desc, mutation_size_threshold, api::new_timestamp()); co_await max_concurrent_for_each(ms, concurrency, [&] (mutation& m) -> future<> { co_await _sp.mutate( { std::move(m) }, diff --git a/db/system_keyspace.cc b/db/system_keyspace.cc index 89c3b9f36b11..ad16db34c61e 100644 --- a/db/system_keyspace.cc +++ b/db/system_keyspace.cc @@ -213,7 +213,7 @@ schema_ptr system_keyspace::batchlog() { return paxos; } -thread_local data_type cdc_generation_id_v2_type = tuple_type_impl::get_instance({timestamp_type, uuid_type}); +thread_local data_type cdc_generation_ts_id_type = tuple_type_impl::get_instance({timestamp_type, timeuuid_type}); schema_ptr system_keyspace::topology() { static thread_local auto schema = [] { @@ -234,12 +234,12 @@ schema_ptr system_keyspace::topology() { .with_column("shard_count", int32_type) .with_column("ignore_msb", int32_type) .with_column("supported_features", set_type_impl::get_instance(utf8_type, true)) - .with_column("new_cdc_generation_data_uuid", uuid_type, column_kind::static_column) + .with_column("new_cdc_generation_data_uuid", timeuuid_type, column_kind::static_column) .with_column("version", long_type, column_kind::static_column) .with_column("transition_state", utf8_type, column_kind::static_column) - .with_column("current_cdc_generation_uuid", uuid_type, column_kind::static_column) + .with_column("current_cdc_generation_uuid", timeuuid_type, column_kind::static_column) .with_column("current_cdc_generation_timestamp", timestamp_type, column_kind::static_column) - .with_column("unpublished_cdc_generations", set_type_impl::get_instance(cdc_generation_id_v2_type, true), column_kind::static_column) + .with_column("unpublished_cdc_generations", set_type_impl::get_instance(cdc_generation_ts_id_type, true), column_kind::static_column) .with_column("global_topology_request", utf8_type, column_kind::static_column) .with_column("enabled_features", set_type_impl::get_instance(utf8_type, true), column_kind::static_column) .set_comment("Current state of topology change machine") @@ -257,15 +257,18 @@ schema_ptr system_keyspace::cdc_generations_v3() { thread_local auto schema = [] { auto id = generate_legacy_id(NAME, CDC_GENERATIONS_V3); return schema_builder(NAME, CDC_GENERATIONS_V3, {id}) + /* This is a single-partition table with key 'cdc_generations'. */ + .with_column("key", utf8_type, column_kind::partition_key) /* The unique identifier of this generation. */ - .with_column("id", uuid_type, column_kind::partition_key) + .with_column("id", timeuuid_type, column_kind::clustering_key) /* The generation describes a mapping from all tokens in the token ring to a set of stream IDs. * This mapping is built from a bunch of smaller mappings, each describing how tokens in a * subrange of the token ring are mapped to stream IDs; these subranges together cover the entire - * token ring. Each such range-local mapping is represented by a row of this table. The - * clustering key of the row is the end of the range being described by this row. The start of - * this range is the range_end of the previous row (in the clustering order, which is the integer - * order) or of the last row of this partition if this is the first the first row. */ + * token ring. Each such range-local mapping is represented by a row of this table. The second + * column of the clustering key of the row is the end of the range being described by this row. + * The start of this range is the range_end of the previous row (in the clustering order, which + * is the integer order) or of the last row with the same id value if this is the first row with + * such id. */ .with_column("range_end", long_type, column_kind::clustering_key) /* The set of streams mapped to in this range. The number of streams mapped to a single range in * a CDC generation is bounded from above by the number of shards on the owner of that range in @@ -278,10 +281,6 @@ schema_ptr system_keyspace::cdc_generations_v3() { * range when the generation was first created. Together with the set of streams above it fully * describes the mapping for this particular range. */ .with_column("ignore_msb", byte_type) - /* Column used for sanity checking. For a given generation it's equal to the number of ranges in - * this generation; thus, after the generation is fully inserted, it must be equal to the number - * of rows in the partition. */ - .with_column("num_ranges", int32_type, column_kind::static_column) .with_version(system_keyspace::generate_schema_version(id)) .build(); }(); @@ -2620,7 +2619,7 @@ future system_keyspace::load_topology_state() { auto gen_uuid = some_row.get_as("current_cdc_generation_uuid"); if (!some_row.has("current_cdc_generation_timestamp")) { on_internal_error(slogger, format( - "load_topology_state: current CDC generation UUID ({}) present, but timestamp missing", gen_uuid)); + "load_topology_state: current CDC generation time UUID ({}) present, but timestamp missing", gen_uuid)); } auto gen_ts = some_row.get_as("current_cdc_generation_timestamp"); ret.current_cdc_generation_id = cdc::generation_id_v2 { @@ -2631,22 +2630,16 @@ future system_keyspace::load_topology_state() { // Sanity check for CDC generation data consistency. { auto gen_rows = co_await execute_cql( - format("SELECT count(range_end) as cnt, num_ranges FROM system.{} WHERE id = ?", - CDC_GENERATIONS_V3), + format("SELECT count(range_end) as cnt FROM {}.{} WHERE key = '{}' AND id = ?", + NAME, CDC_GENERATIONS_V3, cdc::CDC_GENERATIONS_V3_KEY), gen_uuid); assert(gen_rows); if (gen_rows->empty()) { on_internal_error(slogger, format( - "load_topology_state: current CDC generation UUID ({}) present, but data missing", gen_uuid)); - } - auto& row = gen_rows->one(); - auto counted_ranges = row.get_as("cnt"); - auto num_ranges = row.get_as("num_ranges"); - if (counted_ranges != num_ranges) { - on_internal_error(slogger, format( - "load_topology_state: inconsistency in CDC generation data (UUID {}):" - " counted {} ranges, should be {}", gen_uuid, counted_ranges, num_ranges)); + "load_topology_state: current CDC generation time UUID ({}) present, but data missing", gen_uuid)); } + auto cnt = gen_rows->one().get_as("cnt"); + slogger.debug("load_topology_state: current CDC generation time UUID ({}), loaded {} ranges", gen_uuid, cnt); } } else { if (!ret.normal_nodes.empty()) { @@ -2714,10 +2707,9 @@ future<> system_keyspace::update_topology_fence_version(int64_t value) { future system_keyspace::read_cdc_generation(utils::UUID id) { std::vector entries; - size_t num_ranges = 0; co_await _qp.query_internal( - format("SELECT range_end, streams, ignore_msb, num_ranges FROM {}.{} WHERE id = ?", - NAME, CDC_GENERATIONS_V3), + format("SELECT range_end, streams, ignore_msb FROM {}.{} WHERE key = '{}' AND id = ?", + NAME, CDC_GENERATIONS_V3, cdc::CDC_GENERATIONS_V3_KEY), db::consistency_level::ONE, { id }, 1000, // for ~1KB rows, ~1MB page size @@ -2728,7 +2720,6 @@ system_keyspace::read_cdc_generation(utils::UUID id) { dht::token::from_int64(row.get_as("range_end")), std::move(streams), uint8_t(row.get_as("ignore_msb"))}); - num_ranges = row.get_as("num_ranges"); return make_ready_future(stop_iteration::no); }); @@ -2738,12 +2729,6 @@ system_keyspace::read_cdc_generation(utils::UUID id) { "read_cdc_generation: data for CDC generation {} not present", id)); } - if (entries.size() != num_ranges) { - throw std::runtime_error(format( - "read_cdc_generation: wrong number of rows. The `num_ranges` column claimed {} rows," - " but reading the partition returned {}.", num_ranges, entries.size())); - } - co_return cdc::topology_description{std::move(entries)}; } diff --git a/docs/dev/cdc.md b/docs/dev/cdc.md index 42f7307dfbc4..5b8c580b86aa 100644 --- a/docs/dev/cdc.md +++ b/docs/dev/cdc.md @@ -109,7 +109,7 @@ Having different generations operating at different points in time is necessary #### Gossiper-based topology changes -The joining node learns about the current vnodes, chooses tokens which will split them into smaller vnodes and creates a new `cdc::topology_description` which refines those smaller vnodes. This is done in the `cdc::topology_description_generator` class. It then inserts the generation description into an internal distributed table `cdc_generation_descriptions_v2` in the `system_distributed_everywhere` keyspace. The table is defined as follows: +The joining node learns about the current vnodes, chooses tokens which will split them into smaller vnodes and creates a new `cdc::topology_description` which refines those smaller vnodes. This is done in the `cdc::make_new_generation_description` function It then inserts the generation description into an internal distributed table `cdc_generation_descriptions_v2` in the `system_distributed_everywhere` keyspace. The table is defined as follows: ``` CREATE TABLE system_distributed_everywhere.cdc_generation_descriptions_v2 ( id uuid, @@ -160,23 +160,23 @@ Thus we give up availability for safety. This likely won't happen if the adminis #### Raft group 0 based topology changes (WIP) -When a node requests the cluster to join, the topology coordinator chooses tokens for the new node. This splits vnodes in the token ring into smaller vnodes. The coordinator then creates a new `cdc::topology_description` which refines those smaller vnodes. This is node using the `cdc::topology_description_generator` class. +When a node requests the cluster to join, the topology coordinator chooses tokens for the new node. This splits vnodes in the token ring into smaller vnodes. The coordinator then creates a new `cdc::topology_description` which refines those smaller vnodes. This is node using the `cdc::make_new_generation_description` function. The generation data described by `cdc::topology_description` is then translated into mutations and committed to group 0 using Raft commands. When a node applies these commands (every node in the cluster eventually does that, being a member of group 0), it writes the data into a local table `system.cdc_generations_v3`. The table has the following schema: ``` CREATE TABLE system.cdc_generations_v3 ( - id uuid, + key text, + id timeuuid, range_end bigint, ignore_msb tinyint, - num_ranges int static, streams frozen>, - PRIMARY KEY (id, range_end) + PRIMARY KEY (key, id, range_end) ) ... ``` -The table's partition key is the `id uuid` column. The UUID used to insert a new generation into this table is randomly generated by the coordinator. +The table is single-partition where `key` always equals "cdc_generations". The time UUID used to insert a new generation into this table is generated by the coordinator. -The committed commands also update the `system.topology` table, storing the UUID in the `new_cdc_generation_data_uuid` column in the row which describes the joining node. Thanks to this, if the coordinator manages to insert the data but then fails, the next coordinator can resume from where the previous coordinator left off - using `new_cdc_generation_data_uuid` to continue with the generation switch. +The committed commands also update the `system.topology` table, storing the time UUID in the `new_cdc_generation_data_uuid` column in the row which describes the joining node. Thanks to this, if the coordinator manages to insert the data but then fails, the next coordinator can resume from where the previous coordinator left off - using `new_cdc_generation_data_uuid` to continue with the generation switch. Note that the `cdc::topology_description` contains the stream IDs of the generation and describes the generation's mapping, so constructing and inserting it into this table does not require knowing the generation's timestamp. diff --git a/docs/dev/topology-over-raft.md b/docs/dev/topology-over-raft.md index ae453247932f..909eb4a44cf8 100644 --- a/docs/dev/topology-over-raft.md +++ b/docs/dev/topology-over-raft.md @@ -187,10 +187,10 @@ CREATE TABLE system.topology ( topology_request text, transition_state text static, current_cdc_generation_timestamp timestamp static, - current_cdc_generation_uuid uuid static, - unpublished_cdc_generations set> static, + current_cdc_generation_uuid timeuuid static, + unpublished_cdc_generations set> static, global_topology_request text static, - new_cdc_generation_data_uuid uuid static, + new_cdc_generation_data_uuid timeuuid static, PRIMARY KEY (key, host_id) ) ``` @@ -214,7 +214,7 @@ Each node has a clustering row in the table where its `host_id` is the clusterin There are also a few static columns for cluster-global properties: - `transition_state` - the transitioning state of the cluster (as described earlier), may be null - `current_cdc_generation_timestamp` - the timestamp of the last introduced CDC generation -- `current_cdc_generation_uuid` - the UUID of the last introduced CDC generation (used to access its data) +- `current_cdc_generation_uuid` - the time UUID of the last introduced CDC generation (used to access its data) - `unpublished_cdc_generations` - the IDs of the committed yet unpublished CDC generations - `global_topology_request` - if set, contains one of the supported global topology requests -- `new_cdc_generation_data_uuid` - used in `commit_cdc_generation` state, the UUID of the generation to be committed +- `new_cdc_generation_data_uuid` - used in `commit_cdc_generation` state, the time UUID of the generation to be committed diff --git a/service/storage_service.cc b/service/storage_service.cc index 0e176304c4b4..dd93164f2655 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -96,7 +96,7 @@ using inet_address = gms::inet_address; extern logging::logger cdc_log; namespace db { - extern thread_local data_type cdc_generation_id_v2_type; + extern thread_local data_type cdc_generation_ts_id_type; } namespace service { @@ -773,7 +773,7 @@ topology_mutation_builder& topology_mutation_builder::set_new_cdc_generation_dat topology_mutation_builder& topology_mutation_builder::set_unpublished_cdc_generations(const std::vector& values) { auto dv = values | boost::adaptors::transformed([&] (const auto& v) { - return make_tuple_value(db::cdc_generation_id_v2_type, tuple_type_impl::native_type({v.ts, v.id})); + return make_tuple_value(db::cdc_generation_ts_id_type, tuple_type_impl::native_type({v.ts, timeuuid_native_type{v.id}})); }); return apply_set("unpublished_cdc_generations", collection_apply_mode::overwrite, std::move(dv)); } @@ -789,7 +789,7 @@ topology_mutation_builder& topology_mutation_builder::add_enabled_features(const } topology_mutation_builder& topology_mutation_builder::add_unpublished_cdc_generation(const cdc::generation_id_v2& value) { - auto dv = make_tuple_value(db::cdc_generation_id_v2_type, tuple_type_impl::native_type({value.ts, value.id})); + auto dv = make_tuple_value(db::cdc_generation_ts_id_type, tuple_type_impl::native_type({value.ts, timeuuid_native_type{value.id}})); return apply_set("unpublished_cdc_generations", collection_apply_mode::update, std::vector{std::move(dv)}); } @@ -1135,14 +1135,15 @@ class topology_coordinator { } }; - auto [gen_uuid, gen_desc] = cdc::make_new_generation_data( + auto gen_uuid = guard.new_group0_state_id(); + auto gen_desc = cdc::make_new_generation_description( binfo ? binfo->bootstrap_tokens : std::unordered_set{}, get_sharding_info, tmptr); auto gen_table_schema = _db.find_schema( db::system_keyspace::NAME, db::system_keyspace::CDC_GENERATIONS_V3); const size_t max_command_size = _raft.max_command_size(); const size_t mutation_size_threshold = max_command_size / 2; - auto gen_mutations = co_await cdc::get_cdc_generation_mutations( + auto gen_mutations = co_await cdc::get_cdc_generation_mutations_v3( gen_table_schema, gen_uuid, gen_desc, mutation_size_threshold, guard.write_timestamp()); co_return std::pair{gen_uuid, std::move(gen_mutations)}; diff --git a/service/topology_state_machine.hh b/service/topology_state_machine.hh index 31b216aa7293..e2c06f957562 100644 --- a/service/topology_state_machine.hh +++ b/service/topology_state_machine.hh @@ -131,9 +131,9 @@ struct topology { // The ID of the last introduced CDC generation. std::optional current_cdc_generation_id; - // This is the UUID used to access the data of a new CDC generation introduced + // This is the time UUID used to access the data of a new CDC generation introduced // e.g. when a new node bootstraps, needed in `commit_cdc_generation` transition state. - // It's used as partition key in CDC_GENERATIONS_V3 table. + // It's used as the first column of the clustering key in CDC_GENERATIONS_V3 table. std::optional new_cdc_generation_data_uuid; // The IDs of the commited yet unpublished CDC generations sorted by timestamps.