Skip to content

Commit

Permalink
Merge 'raft topology: make CDC_GENERATIONS_V3 single-partition, timeu…
Browse files Browse the repository at this point in the history
…uid-sorted' from Patryk Jędrzejczak

We make the `CDC_GENERATIONS_V3` table single-partition and change the
clustering key from `range_end` to `(id, range_end)`. We also change the
type of `id` to `timeuuid` and ensure that a new generation always has
the highest `id`. These changes allow efficient clearing of obsolete CDC
generation data, which we need to prevent Raft-topology snapshots from
endlessly growing as we introduce new generations over time.

All this code is protected by an experimental feature flag. It includes
the definition of `CDC_GENERATIONS_V3`. The table is not created unless
the feature flag is enabled.

Fixes #15163

Closes #15319

* github.com:scylladb/scylladb:
  system_keyspace: rename cdc_generation_id_v2
  system_keyspace: change id to timeuuid in CDC_GENERATIONS_V3
  cdc: generation: remove topology_description_generator
  cdc: do not create uuid in make_new_generation_data
  system_kayspace: make CDC_GENERATIONS_V3 single-partition
  cdc: generation: introduce get_common_cdc_generation_mutations
  cdc: generation: rename get_cdc_generation_mutations
  • Loading branch information
kbr-scylla committed Sep 13, 2023
2 parents bbb6e4f + 9220999 commit a184b07
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 144 deletions.
166 changes: 81 additions & 85 deletions cdc/generation.cc
Expand Up @@ -68,10 +68,10 @@ static constexpr auto stream_id_index_shift = stream_id_version_shift + stream_i
static constexpr auto stream_id_random_shift = stream_id_index_shift + stream_id_index_bits;

/**
* Responsibilty for encoding stream_id moved from factory method to
* this constructor, to keep knowledge of composition in a single place.
* Note this is private and friended to topology_description_generator,
* because he is the one who defined the "order" we view vnodes etc.
* Responsibility for encoding stream_id moved from the create_stream_ids
* function to this constructor, to keep knowledge of composition in a
* single place. Note the make_new_generation_description function
* defines the "order" in which we view vnodes etc.
*/
stream_id::stream_id(dht::token token, size_t vnode_index)
: _value(bytes::initialized_later(), 2 * sizeof(int64_t))
Expand Down Expand Up @@ -185,76 +185,6 @@ static std::vector<stream_id> create_stream_ids(
return result;
}

class topology_description_generator final {
const std::unordered_set<dht::token>& _bootstrap_tokens;
const locator::token_metadata_ptr _tmptr;
const noncopyable_function<std::pair<size_t, uint8_t> (dht::token)>& _get_sharding_info;

// Compute a set of tokens that split the token ring into vnodes
auto get_tokens() const {
auto tokens = _tmptr->sorted_tokens();
auto it = tokens.insert(
tokens.end(), _bootstrap_tokens.begin(), _bootstrap_tokens.end());
std::sort(it, tokens.end());
std::inplace_merge(tokens.begin(), it, tokens.end());
tokens.erase(std::unique(tokens.begin(), tokens.end()), tokens.end());
return tokens;
}

token_range_description create_description(size_t index, dht::token start, dht::token end) const {
token_range_description desc;

desc.token_range_end = end;

auto [shard_count, ignore_msb] = _get_sharding_info(end);
desc.streams = create_stream_ids(index, start, end, shard_count, ignore_msb);
desc.sharding_ignore_msb = ignore_msb;

return desc;
}
public:
topology_description_generator(
const std::unordered_set<dht::token>& bootstrap_tokens,
const locator::token_metadata_ptr tmptr,
// This function must return sharding parameters for a node that owns the vnode ending with
// the given token. Returns <shard_count, ignore_msb> pair.
const noncopyable_function<std::pair<size_t, uint8_t> (dht::token)>& get_sharding_info)
: _bootstrap_tokens(bootstrap_tokens)
, _tmptr(std::move(tmptr))
, _get_sharding_info(get_sharding_info)
{}

/*
* Generate a set of CDC stream identifiers such that for each shard
* and vnode pair there exists a stream whose token falls into this vnode
* and is owned by this shard. It is sometimes not possible to generate
* a CDC stream identifier for some (vnode, shard) pair because not all
* shards have to own tokens in a vnode. Small vnode can be totally owned
* by a single shard. In such case, a stream identifier that maps to
* end of the vnode is generated.
*
* Then build a cdc::topology_description which maps tokens to generated
* stream identifiers, such that if token T is owned by shard S in vnode V,
* it gets mapped to the stream identifier generated for (S, V).
*/
// Run in seastar::async context.
topology_description generate() const {
const auto tokens = get_tokens();

std::vector<token_range_description> vnode_descriptions;
vnode_descriptions.reserve(tokens.size());

vnode_descriptions.push_back(
create_description(0, tokens.back(), tokens.front()));
for (size_t idx = 1; idx < tokens.size(); ++idx) {
vnode_descriptions.push_back(
create_description(idx, tokens[idx - 1], tokens[idx]));
}

return {std::move(vnode_descriptions)};
}
};

bool should_propose_first_generation(const gms::inet_address& me, const gms::gossiper& g) {
auto my_host_id = g.get_host_id(me);
return g.for_each_endpoint_state_until([&] (const gms::inet_address& node, const gms::endpoint_state& eps) {
Expand Down Expand Up @@ -282,21 +212,21 @@ bool is_cdc_generation_optimal(const cdc::topology_description& gen, const locat
}
}

future<utils::chunked_vector<mutation>> get_cdc_generation_mutations(
static future<utils::chunked_vector<mutation>> get_common_cdc_generation_mutations(
schema_ptr s,
utils::UUID id,
const partition_key& pkey,
noncopyable_function<clustering_key (dht::token)>&& get_ckey_from_range_end,
const cdc::topology_description& desc,
size_t mutation_size_threshold,
api::timestamp_type ts) {
utils::chunked_vector<mutation> res;
res.emplace_back(s, partition_key::from_singular(*s, id));
res.back().set_static_cell(to_bytes("num_ranges"), int32_t(desc.entries().size()), ts);
res.emplace_back(s, pkey);
size_t size_estimate = 0;
size_t total_size_estimate = 0;
for (auto& e : desc.entries()) {
if (size_estimate >= mutation_size_threshold) {
total_size_estimate += size_estimate;
res.emplace_back(s, partition_key::from_singular(*s, id));
res.emplace_back(s, pkey);
size_estimate = 0;
}

Expand All @@ -307,7 +237,7 @@ future<utils::chunked_vector<mutation>> get_cdc_generation_mutations(
}

size_estimate += e.streams.size() * 20;
auto ckey = clustering_key::from_singular(*s, dht::token::to_int64(e.token_range_end));
auto ckey = get_ckey_from_range_end(e.token_range_end);
res.back().set_cell(ckey, to_bytes("streams"), make_set_value(db::cdc_streams_set_type, std::move(streams)), ts);
res.back().set_cell(ckey, to_bytes("ignore_msb"), int8_t(e.sharding_ignore_msb), ts);

Expand All @@ -331,6 +261,36 @@ future<utils::chunked_vector<mutation>> get_cdc_generation_mutations(
co_return res;
}

future<utils::chunked_vector<mutation>> get_cdc_generation_mutations_v2(
schema_ptr s,
utils::UUID id,
const cdc::topology_description& desc,
size_t mutation_size_threshold,
api::timestamp_type ts) {
auto pkey = partition_key::from_singular(*s, id);
auto get_ckey = [s] (dht::token range_end) {
return clustering_key::from_singular(*s, dht::token::to_int64(range_end));
};

auto res = co_await get_common_cdc_generation_mutations(s, pkey, std::move(get_ckey), desc, mutation_size_threshold, ts);
res.back().set_static_cell(to_bytes("num_ranges"), int32_t(desc.entries().size()), ts);
co_return res;
}

future<utils::chunked_vector<mutation>> get_cdc_generation_mutations_v3(
schema_ptr s,
utils::UUID id,
const cdc::topology_description& desc,
size_t mutation_size_threshold,
api::timestamp_type ts) {
auto pkey = partition_key::from_singular(*s, CDC_GENERATIONS_V3_KEY);
auto get_ckey = [&] (dht::token range_end) {
return clustering_key::from_exploded(*s, {timeuuid_type->decompose(id), long_type->decompose(dht::token::to_int64(range_end))}) ;
};

co_return co_await get_common_cdc_generation_mutations(s, pkey, std::move(get_ckey), desc, mutation_size_threshold, ts);
}

// non-static for testing
size_t limit_of_streams_in_topology_description() {
// Each stream takes 16B and we don't want to exceed 4MB so we can have
Expand Down Expand Up @@ -363,13 +323,47 @@ topology_description limit_number_of_streams_if_needed(topology_description&& de
return topology_description(std::move(entries));
}

std::pair<utils::UUID, cdc::topology_description> make_new_generation_data(
// Compute a set of tokens that split the token ring into vnodes.
static auto get_tokens(const std::unordered_set<dht::token>& bootstrap_tokens, const locator::token_metadata_ptr tmptr) {
auto tokens = tmptr->sorted_tokens();
auto it = tokens.insert(tokens.end(), bootstrap_tokens.begin(), bootstrap_tokens.end());
std::sort(it, tokens.end());
std::inplace_merge(tokens.begin(), it, tokens.end());
tokens.erase(std::unique(tokens.begin(), tokens.end()), tokens.end());
return tokens;
}

static token_range_description create_token_range_description(
size_t index,
dht::token start,
dht::token end,
const noncopyable_function<std::pair<size_t, uint8_t> (dht::token)>& get_sharding_info) {
token_range_description desc;

desc.token_range_end = end;

auto [shard_count, ignore_msb] = get_sharding_info(end);
desc.streams = create_stream_ids(index, start, end, shard_count, ignore_msb);
desc.sharding_ignore_msb = ignore_msb;

return desc;
}

cdc::topology_description make_new_generation_description(
const std::unordered_set<dht::token>& bootstrap_tokens,
const noncopyable_function<std::pair<size_t, uint8_t>(dht::token)>& get_sharding_info,
const locator::token_metadata_ptr tmptr) {
auto gen = topology_description_generator(bootstrap_tokens, tmptr, get_sharding_info).generate();
auto uuid = utils::make_random_uuid();
return {uuid, std::move(gen)};
const auto tokens = get_tokens(bootstrap_tokens, tmptr);

std::vector<token_range_description> vnode_descriptions;
vnode_descriptions.reserve(tokens.size());

vnode_descriptions.push_back(create_token_range_description(0, tokens.back(), tokens.front(), get_sharding_info));
for (size_t idx = 1; idx < tokens.size(); ++idx) {
vnode_descriptions.push_back(create_token_range_description(idx, tokens[idx - 1], tokens[idx], get_sharding_info));
}

return {std::move(vnode_descriptions)};
}

db_clock::time_point new_generation_timestamp(bool add_delay, std::chrono::milliseconds ring_delay) {
Expand Down Expand Up @@ -401,7 +395,9 @@ future<cdc::generation_id> generation_service::legacy_make_new_generation(const
return {sc > 0 ? sc : 1, get_sharding_ignore_msb(*endpoint, _gossiper)};
}
};
auto [uuid, gen] = make_new_generation_data(bootstrap_tokens, get_sharding_info, tmptr);

auto uuid = utils::make_random_uuid();
auto gen = make_new_generation_description(bootstrap_tokens, get_sharding_info, tmptr);

// Our caller should ensure that there are normal tokens in the token ring.
auto normal_token_owners = tmptr->count_normal_token_owners();
Expand Down
34 changes: 30 additions & 4 deletions cdc/generation.hh
Expand Up @@ -139,7 +139,22 @@ bool should_propose_first_generation(const gms::inet_address& me, const gms::gos
*/
bool is_cdc_generation_optimal(const cdc::topology_description& gen, const locator::token_metadata& tm);

std::pair<utils::UUID, cdc::topology_description> make_new_generation_data(
/*
* Generate a set of CDC stream identifiers such that for each shard
* and vnode pair there exists a stream whose token falls into this vnode
* and is owned by this shard. It is sometimes not possible to generate
* a CDC stream identifier for some (vnode, shard) pair because not all
* shards have to own tokens in a vnode. Small vnode can be totally owned
* by a single shard. In such case, a stream identifier that maps to
* end of the vnode is generated.
*
* Then build a cdc::topology_description which maps tokens to generated
* stream identifiers, such that if token T is owned by shard S in vnode V,
* it gets mapped to the stream identifier generated for (S, V).
*
* Run in seastar::async context.
*/
cdc::topology_description make_new_generation_description(
const std::unordered_set<dht::token>& bootstrap_tokens,
const noncopyable_function<std::pair<size_t, uint8_t> (dht::token)>& get_sharding_info,
const locator::token_metadata_ptr);
Expand All @@ -150,9 +165,20 @@ db_clock::time_point new_generation_timestamp(bool add_delay, std::chrono::milli
// using `mutation_size_threshold` to decide on the mutation sizes. The partition key of each mutation
// is given by `gen_uuid`. The timestamp of each cell in each mutation is given by `mutation_timestamp`.
//
// Works for only specific schemas: CDC_GENERATIONS_V2 (in system_distributed_keyspace)
// and CDC_GENERATIONS_V3 (in system_keyspace).
future<utils::chunked_vector<mutation>> get_cdc_generation_mutations(
// Works only for the CDC_GENERATIONS_V2 schema (in system_distributed keyspace).
future<utils::chunked_vector<mutation>> get_cdc_generation_mutations_v2(
schema_ptr, utils::UUID gen_uuid, const cdc::topology_description&,
size_t mutation_size_threshold, api::timestamp_type mutation_timestamp);

// The partition key of all rows in the single-partition CDC_GENERATIONS_V3 schema (in system keyspace).
static constexpr auto CDC_GENERATIONS_V3_KEY = "cdc_generations";

// Translates the CDC generation data given by a `cdc::topology_description` into a vector of mutations,
// using `mutation_size_threshold` to decide on the mutation sizes. The first clustering key column is
// given by `gen_uuid`. The timestamp of each cell in each mutation is given by `mutation_timestamp`.
//
// Works only for the CDC_GENERATIONS_V3 schema (in system keyspace).
future<utils::chunked_vector<mutation>> get_cdc_generation_mutations_v3(
schema_ptr, utils::UUID gen_uuid, const cdc::topology_description&,
size_t mutation_size_threshold, api::timestamp_type mutation_timestamp);

Expand Down
2 changes: 1 addition & 1 deletion db/system_distributed_keyspace.cc
Expand Up @@ -536,7 +536,7 @@ system_distributed_keyspace::insert_cdc_generation(

auto s = _qp.db().real_database().find_schema(
system_distributed_keyspace::NAME_EVERYWHERE, system_distributed_keyspace::CDC_GENERATIONS_V2);
auto ms = co_await cdc::get_cdc_generation_mutations(s, id, desc, mutation_size_threshold, api::new_timestamp());
auto ms = co_await cdc::get_cdc_generation_mutations_v2(s, id, desc, mutation_size_threshold, api::new_timestamp());
co_await max_concurrent_for_each(ms, concurrency, [&] (mutation& m) -> future<> {
co_await _sp.mutate(
{ std::move(m) },
Expand Down

0 comments on commit a184b07

Please sign in to comment.