Skip to content

Commit

Permalink
storage_service: raft_check_and_repair_cdc_streams: don't create a ne…
Browse files Browse the repository at this point in the history
…w generation if current one is optimal

We add the CDC generation optimality check in
storage_service::raft_check_and_repair_cdc_streams so that it
doesn't create new generations when unnecessary.

Additionally, we modify the test_topology_ops.py test in a way
that verifies the new changes. We call
storage_service::raft_check_and_repair_cdc_streams multiple
times concurrently and verify that exactly one generation has been
created.
  • Loading branch information
patjed41 committed Jul 28, 2023
1 parent b11f429 commit 3f29c98
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 4 deletions.
13 changes: 10 additions & 3 deletions service/storage_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4661,8 +4661,16 @@ future<> storage_service::raft_check_and_repair_cdc_streams() {
}

curr_gen = _topology_state_machine._topology.current_cdc_generation_id;

// FIXME: check if the current generation is optimal, don't request new one if it isn't
if (!curr_gen) {
slogger.error("check_and_repair_cdc_streams: no current CDC generation, requesting a new one.");
} else {
auto gen = co_await _sys_ks.local().read_cdc_generation(curr_gen->id);
if (cdc::is_cdc_generation_optimal(gen, get_token_metadata())) {
cdc_log.info("CDC generation {} does not need repair", curr_gen);
co_return;
}
cdc_log.info("CDC generation {} needs repair, requesting a new one", curr_gen);
}

topology_mutation_builder builder(guard.write_timestamp());
builder.set_global_topology_request(global_topology_request::new_cdc_generation);
Expand All @@ -4679,7 +4687,6 @@ future<> storage_service::raft_check_and_repair_cdc_streams() {
}

// Wait until the current CDC generation changes.
// This might happen due to a different reason than our request but we don't care.
co_await _topology_state_machine.event.when([this, &curr_gen] {
return curr_gen != _topology_state_machine._topology.current_cdc_generation_id;
});
Expand Down
5 changes: 4 additions & 1 deletion test/topology_experimental_raft/test_topology_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import pytest
import logging
import time
import asyncio
from datetime import datetime
from typing import Optional

Expand Down Expand Up @@ -62,14 +63,16 @@ async def test_topology_ops(request, manager: ManagerClient):
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
gen_timestamps = {r.time for r in await manager.get_cql().run_async(query)}
logger.info(f"Timestamps before check_and_repair: {gen_timestamps}")
await manager.api.client.post("/storage_service/cdc_streams_check_and_repair", servers[1].ip_addr)
await asyncio.gather(*[manager.api.client.post("/storage_service/cdc_streams_check_and_repair", servers[i % 2].ip_addr)
for i in range(10)])
async def new_gen_appeared() -> Optional[set[datetime]]:
new_gen_timestamps = {r.time for r in await manager.get_cql().run_async(query)}
assert(gen_timestamps <= new_gen_timestamps)
if gen_timestamps < new_gen_timestamps:
return new_gen_timestamps
return None
new_gen_timestamps = await wait_for(new_gen_appeared, time.time() + 60)
assert(len(gen_timestamps) + 1 == len(new_gen_timestamps))
logger.info(f"Timestamps after check_and_repair: {new_gen_timestamps}")

logger.info(f"Decommissioning node {servers[0]}")
Expand Down

0 comments on commit 3f29c98

Please sign in to comment.