diff --git a/cdc/generation.cc b/cdc/generation.cc index b0d31d1e62cc..35420a3dcc76 100644 --- a/cdc/generation.cc +++ b/cdc/generation.cc @@ -263,6 +263,26 @@ bool should_propose_first_generation(const gms::inet_address& me, const gms::gos }); } +bool is_cdc_generation_optimal(const cdc::topology_description& gen, const locator::token_metadata& tm) { + if (tm.sorted_tokens().size() != gen.entries().size()) { + // We probably have garbage streams from old generations + cdc_log.info("Generation size does not match the token ring"); + return false; + } else { + std::unordered_set gen_ends; + for (const auto& entry : gen.entries()) { + gen_ends.insert(entry.token_range_end); + } + for (const auto& metadata_token : tm.sorted_tokens()) { + if (!gen_ends.contains(metadata_token)) { + cdc_log.warn("CDC generation missing token {}", metadata_token); + return false; + } + } + return true; + } +} + future> get_cdc_generation_mutations( schema_ptr s, utils::UUID id, @@ -875,24 +895,9 @@ future<> generation_service::check_and_repair_cdc_streams() { " even though some node gossiped about it.", latest, db_clock::now()); should_regenerate = true; - } else { - if (tmptr->sorted_tokens().size() != gen->entries().size()) { - // We probably have garbage streams from old generations - cdc_log.info("Generation size does not match the token ring, regenerating"); - should_regenerate = true; - } else { - std::unordered_set gen_ends; - for (const auto& entry : gen->entries()) { - gen_ends.insert(entry.token_range_end); - } - for (const auto& metadata_token : tmptr->sorted_tokens()) { - if (!gen_ends.contains(metadata_token)) { - cdc_log.warn("CDC generation {} missing token {}. Regenerating.", latest, metadata_token); - should_regenerate = true; - break; - } - } - } + } else if (!is_cdc_generation_optimal(*gen, *tmptr)) { + should_regenerate = true; + cdc_log.info("CDC generation {} needs repair, regenerating", latest); } } diff --git a/cdc/generation.hh b/cdc/generation.hh index 6bd9f2bcb854..882b166e665b 100644 --- a/cdc/generation.hh +++ b/cdc/generation.hh @@ -133,6 +133,12 @@ public: */ bool should_propose_first_generation(const gms::inet_address& me, const gms::gossiper&); +/* + * Checks if the CDC generation is optimal, which is true if its `topology_description` is consistent + * with `token_metadata`. +*/ +bool is_cdc_generation_optimal(const cdc::topology_description& gen, const locator::token_metadata& tm); + std::pair make_new_generation_data( const std::unordered_set& bootstrap_tokens, const noncopyable_function (dht::token)>& get_sharding_info, diff --git a/service/raft/group0_state_machine.cc b/service/raft/group0_state_machine.cc index 92d90710606c..2223317cc61a 100644 --- a/service/raft/group0_state_machine.cc +++ b/service/raft/group0_state_machine.cc @@ -171,7 +171,7 @@ future<> group0_state_machine::load_snapshot(raft::snapshot_id id) { // memory and thus needs to be protected with apply mutex auto read_apply_mutex_holder = co_await _client.hold_read_apply_mutex(); co_await _ss.topology_state_load(_cdc_gen_svc); - _ss._topology_state_machine.event.signal(); + _ss._topology_state_machine.event.broadcast(); } future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::snapshot_descriptor snp) { diff --git a/service/storage_service.cc b/service/storage_service.cc index 9f2d02126138..c75aa3c0baf1 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -487,7 +487,7 @@ future<> storage_service::topology_transition(cdc::generation_service& cdc_gen_s assert(this_shard_id() == 0); co_await topology_state_load(cdc_gen_svc); // reload new state - _topology_state_machine.event.signal(); + _topology_state_machine.event.broadcast(); } future<> storage_service::merge_topology_snapshot(raft_topology_snapshot snp) { @@ -1141,9 +1141,13 @@ class topology_coordinator { guard = std::move(guard_); topology_mutation_builder builder(guard.write_timestamp()); + // We don't delete the request now, but only after the generation is committed. If we deleted + // the request now and received another new_cdc_generation request later, but before committing + // the new generation, the second request would also create a new generation. Deleting requests + // after the generation is committed prevents this from happening. The second request would have + // no effect - it would just overwrite the first request. builder.set_transition_state(topology::transition_state::commit_cdc_generation) - .set_new_cdc_generation_data_uuid(gen_uuid) - .del_global_topology_request(); + .set_new_cdc_generation_data_uuid(gen_uuid); auto reason = ::format( "insert CDC generation data (UUID: {})", gen_uuid); co_await update_topology_state(std::move(guard), {std::move(mutation), builder.build()}, reason); @@ -1486,6 +1490,9 @@ class topology_coordinator { builder.set_transition_state(topology::transition_state::publish_cdc_generation) .set_current_cdc_generation_id(cdc_gen_id) .set_version(_topo_sm._topology.version + 1); + if (_topo_sm._topology.global_request == global_topology_request::new_cdc_generation) { + builder.del_global_topology_request(); + } auto str = ::format("committed new CDC generation, ID: {}", cdc_gen_id); co_await update_topology_state(std::move(guard), {builder.build()}, std::move(str)); } @@ -1870,7 +1877,7 @@ future<> topology_coordinator::run() { slogger.info("raft topology: start topology coordinator fiber"); auto abort = _as.subscribe([this] () noexcept { - _topo_sm.event.signal(); + _topo_sm.event.broadcast(); }); while (!_as.abort_requested()) { @@ -4929,8 +4936,16 @@ future<> storage_service::raft_check_and_repair_cdc_streams() { } curr_gen = _topology_state_machine._topology.current_cdc_generation_id; - - // FIXME: check if the current generation is optimal, don't request new one if it isn't + if (!curr_gen) { + slogger.error("check_and_repair_cdc_streams: no current CDC generation, requesting a new one."); + } else { + auto gen = co_await _sys_ks.local().read_cdc_generation(curr_gen->id); + if (cdc::is_cdc_generation_optimal(gen, get_token_metadata())) { + cdc_log.info("CDC generation {} does not need repair", curr_gen); + co_return; + } + cdc_log.info("CDC generation {} needs repair, requesting a new one", curr_gen); + } topology_mutation_builder builder(guard.write_timestamp()); builder.set_global_topology_request(global_topology_request::new_cdc_generation); @@ -4947,7 +4962,6 @@ future<> storage_service::raft_check_and_repair_cdc_streams() { } // Wait until the current CDC generation changes. - // This might happen due to a different reason than our request but we don't care. co_await _topology_state_machine.event.when([this, &curr_gen] { return curr_gen != _topology_state_machine._topology.current_cdc_generation_id; }); diff --git a/test/topology_experimental_raft/test_topology_ops.py b/test/topology_experimental_raft/test_topology_ops.py index b6adac53b9d0..dd3e67ec2133 100644 --- a/test/topology_experimental_raft/test_topology_ops.py +++ b/test/topology_experimental_raft/test_topology_ops.py @@ -14,6 +14,7 @@ import pytest import logging import time +import asyncio from datetime import datetime from typing import Optional @@ -62,7 +63,8 @@ async def test_topology_ops(request, manager: ManagerClient): await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60) gen_timestamps = {r.time for r in await manager.get_cql().run_async(query)} logger.info(f"Timestamps before check_and_repair: {gen_timestamps}") - await manager.api.client.post("/storage_service/cdc_streams_check_and_repair", servers[1].ip_addr) + await asyncio.gather(*[manager.api.client.post("/storage_service/cdc_streams_check_and_repair", servers[i % 2].ip_addr) + for i in range(10)]) async def new_gen_appeared() -> Optional[set[datetime]]: new_gen_timestamps = {r.time for r in await manager.get_cql().run_async(query)} assert(gen_timestamps <= new_gen_timestamps) @@ -70,6 +72,7 @@ async def new_gen_appeared() -> Optional[set[datetime]]: return new_gen_timestamps return None new_gen_timestamps = await wait_for(new_gen_appeared, time.time() + 60) + assert(len(gen_timestamps) + 1 == len(new_gen_timestamps)) logger.info(f"Timestamps after check_and_repair: {new_gen_timestamps}") logger.info(f"Decommissioning node {servers[0]}")