Skip to content

Commit

Permalink
Merge 'storage_service: raft_check_and_repair_cdc_streams: don't crea…
Browse files Browse the repository at this point in the history
…te a new generation if current one is optimal' from Patryk Jędrzejczak

We add the CDC generation optimality check in
`storage_service::raft_check_and_repair_cdc_streams` so that it doesn't
create new generations when unnecessary. Since
`generation_service::check_and_repair_cdc_streams` already has this
check, we extract it to the new `is_cdc_generation_optimal` function to
not duplicate the code.

After this change, multiple tasks could wait for a single generation
change. Calling `signal` on `topology_state_machine.event` would't wake
them all. Moreover, we must ensure the topology coordinator wakes when
his logic expects it. Therefore, we change all `signal` calls on
`topology_state_machine.event` to `broadcast`.

We delay the deletion of the `new_cdc_generation` request to the moment
when the topology transition reaches the `publish_cdc_generation` state.
We need this change to ensure the added CDC generation optimality check
in the next commit has an intended effect. If we didn't make it, it
would be possible that a task makes the `new_cdc_generation` request,
and then, after this request was removed but before committing the new
generation, another task also makes the `new_cdc_generation` request. In
such a scenario, two generations are created, but only one should. After
delaying the deletion of `new_cdc_generation` requests, the second
request would have no effect.

Additionally, we modify the `test_topology_ops.py` test in a way that
verifies the new changes. We call
`storage_service::raft_check_and_repair_cdc_streams` multiple times
concurrently and verify that exactly one generation has been created.

Fixes #14055

Closes #14789

* github.com:scylladb/scylladb:
  storage_service: raft_check_and_repair_cdc_streams: don't create a new generation if current one is optimal
  storage_service: delay deletion of the new_cdc_generation request
  raft topology: broadcast on topology_state_machine.event instead of signal
  cdc: implement the is_cdc_generation_optimal function
  • Loading branch information
kbr-scylla committed Aug 1, 2023
2 parents 84bb75e + 3f29c98 commit 8bb3732
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 27 deletions.
41 changes: 23 additions & 18 deletions cdc/generation.cc
Expand Up @@ -263,6 +263,26 @@ bool should_propose_first_generation(const gms::inet_address& me, const gms::gos
});
}

bool is_cdc_generation_optimal(const cdc::topology_description& gen, const locator::token_metadata& tm) {
if (tm.sorted_tokens().size() != gen.entries().size()) {
// We probably have garbage streams from old generations
cdc_log.info("Generation size does not match the token ring");
return false;
} else {
std::unordered_set<dht::token> gen_ends;
for (const auto& entry : gen.entries()) {
gen_ends.insert(entry.token_range_end);
}
for (const auto& metadata_token : tm.sorted_tokens()) {
if (!gen_ends.contains(metadata_token)) {
cdc_log.warn("CDC generation missing token {}", metadata_token);
return false;
}
}
return true;
}
}

future<utils::chunked_vector<mutation>> get_cdc_generation_mutations(
schema_ptr s,
utils::UUID id,
Expand Down Expand Up @@ -875,24 +895,9 @@ future<> generation_service::check_and_repair_cdc_streams() {
" even though some node gossiped about it.",
latest, db_clock::now());
should_regenerate = true;
} else {
if (tmptr->sorted_tokens().size() != gen->entries().size()) {
// We probably have garbage streams from old generations
cdc_log.info("Generation size does not match the token ring, regenerating");
should_regenerate = true;
} else {
std::unordered_set<dht::token> gen_ends;
for (const auto& entry : gen->entries()) {
gen_ends.insert(entry.token_range_end);
}
for (const auto& metadata_token : tmptr->sorted_tokens()) {
if (!gen_ends.contains(metadata_token)) {
cdc_log.warn("CDC generation {} missing token {}. Regenerating.", latest, metadata_token);
should_regenerate = true;
break;
}
}
}
} else if (!is_cdc_generation_optimal(*gen, *tmptr)) {
should_regenerate = true;
cdc_log.info("CDC generation {} needs repair, regenerating", latest);
}
}

Expand Down
6 changes: 6 additions & 0 deletions cdc/generation.hh
Expand Up @@ -133,6 +133,12 @@ public:
*/
bool should_propose_first_generation(const gms::inet_address& me, const gms::gossiper&);

/*
* Checks if the CDC generation is optimal, which is true if its `topology_description` is consistent
* with `token_metadata`.
*/
bool is_cdc_generation_optimal(const cdc::topology_description& gen, const locator::token_metadata& tm);

std::pair<utils::UUID, cdc::topology_description> make_new_generation_data(
const std::unordered_set<dht::token>& bootstrap_tokens,
const noncopyable_function<std::pair<size_t, uint8_t> (dht::token)>& get_sharding_info,
Expand Down
2 changes: 1 addition & 1 deletion service/raft/group0_state_machine.cc
Expand Up @@ -171,7 +171,7 @@ future<> group0_state_machine::load_snapshot(raft::snapshot_id id) {
// memory and thus needs to be protected with apply mutex
auto read_apply_mutex_holder = co_await _client.hold_read_apply_mutex();
co_await _ss.topology_state_load(_cdc_gen_svc);
_ss._topology_state_machine.event.signal();
_ss._topology_state_machine.event.broadcast();
}

future<> group0_state_machine::transfer_snapshot(gms::inet_address from, raft::snapshot_descriptor snp) {
Expand Down
28 changes: 21 additions & 7 deletions service/storage_service.cc
Expand Up @@ -487,7 +487,7 @@ future<> storage_service::topology_transition(cdc::generation_service& cdc_gen_s
assert(this_shard_id() == 0);
co_await topology_state_load(cdc_gen_svc); // reload new state

_topology_state_machine.event.signal();
_topology_state_machine.event.broadcast();
}

future<> storage_service::merge_topology_snapshot(raft_topology_snapshot snp) {
Expand Down Expand Up @@ -1141,9 +1141,13 @@ class topology_coordinator {
guard = std::move(guard_);

topology_mutation_builder builder(guard.write_timestamp());
// We don't delete the request now, but only after the generation is committed. If we deleted
// the request now and received another new_cdc_generation request later, but before committing
// the new generation, the second request would also create a new generation. Deleting requests
// after the generation is committed prevents this from happening. The second request would have
// no effect - it would just overwrite the first request.
builder.set_transition_state(topology::transition_state::commit_cdc_generation)
.set_new_cdc_generation_data_uuid(gen_uuid)
.del_global_topology_request();
.set_new_cdc_generation_data_uuid(gen_uuid);
auto reason = ::format(
"insert CDC generation data (UUID: {})", gen_uuid);
co_await update_topology_state(std::move(guard), {std::move(mutation), builder.build()}, reason);
Expand Down Expand Up @@ -1486,6 +1490,9 @@ class topology_coordinator {
builder.set_transition_state(topology::transition_state::publish_cdc_generation)
.set_current_cdc_generation_id(cdc_gen_id)
.set_version(_topo_sm._topology.version + 1);
if (_topo_sm._topology.global_request == global_topology_request::new_cdc_generation) {
builder.del_global_topology_request();
}
auto str = ::format("committed new CDC generation, ID: {}", cdc_gen_id);
co_await update_topology_state(std::move(guard), {builder.build()}, std::move(str));
}
Expand Down Expand Up @@ -1870,7 +1877,7 @@ future<> topology_coordinator::run() {
slogger.info("raft topology: start topology coordinator fiber");

auto abort = _as.subscribe([this] () noexcept {
_topo_sm.event.signal();
_topo_sm.event.broadcast();
});

while (!_as.abort_requested()) {
Expand Down Expand Up @@ -4929,8 +4936,16 @@ future<> storage_service::raft_check_and_repair_cdc_streams() {
}

curr_gen = _topology_state_machine._topology.current_cdc_generation_id;

// FIXME: check if the current generation is optimal, don't request new one if it isn't
if (!curr_gen) {
slogger.error("check_and_repair_cdc_streams: no current CDC generation, requesting a new one.");
} else {
auto gen = co_await _sys_ks.local().read_cdc_generation(curr_gen->id);
if (cdc::is_cdc_generation_optimal(gen, get_token_metadata())) {
cdc_log.info("CDC generation {} does not need repair", curr_gen);
co_return;
}
cdc_log.info("CDC generation {} needs repair, requesting a new one", curr_gen);
}

topology_mutation_builder builder(guard.write_timestamp());
builder.set_global_topology_request(global_topology_request::new_cdc_generation);
Expand All @@ -4947,7 +4962,6 @@ future<> storage_service::raft_check_and_repair_cdc_streams() {
}

// Wait until the current CDC generation changes.
// This might happen due to a different reason than our request but we don't care.
co_await _topology_state_machine.event.when([this, &curr_gen] {
return curr_gen != _topology_state_machine._topology.current_cdc_generation_id;
});
Expand Down
5 changes: 4 additions & 1 deletion test/topology_experimental_raft/test_topology_ops.py
Expand Up @@ -14,6 +14,7 @@
import pytest
import logging
import time
import asyncio
from datetime import datetime
from typing import Optional

Expand Down Expand Up @@ -62,14 +63,16 @@ async def test_topology_ops(request, manager: ManagerClient):
await wait_for_cql_and_get_hosts(cql, servers, time.time() + 60)
gen_timestamps = {r.time for r in await manager.get_cql().run_async(query)}
logger.info(f"Timestamps before check_and_repair: {gen_timestamps}")
await manager.api.client.post("/storage_service/cdc_streams_check_and_repair", servers[1].ip_addr)
await asyncio.gather(*[manager.api.client.post("/storage_service/cdc_streams_check_and_repair", servers[i % 2].ip_addr)
for i in range(10)])
async def new_gen_appeared() -> Optional[set[datetime]]:
new_gen_timestamps = {r.time for r in await manager.get_cql().run_async(query)}
assert(gen_timestamps <= new_gen_timestamps)
if gen_timestamps < new_gen_timestamps:
return new_gen_timestamps
return None
new_gen_timestamps = await wait_for(new_gen_appeared, time.time() + 60)
assert(len(gen_timestamps) + 1 == len(new_gen_timestamps))
logger.info(f"Timestamps after check_and_repair: {new_gen_timestamps}")

logger.info(f"Decommissioning node {servers[0]}")
Expand Down

0 comments on commit 8bb3732

Please sign in to comment.