Skip to content

Commit

Permalink
mv: handle different ERMs for base and view table
Browse files Browse the repository at this point in the history
When calculating the base-view mapping while the topology
is changing, we may encounter a situation where the base
table noticed the change in its effective replication map
while the view table hasn't, or vice-versa. This can happen
because the ERM update may be performed during the preemption
between taking the base ERM and view ERM, or, due to f2ff701,
the update may have just been performed partially when we are
taking the ERMs.

Until now, we assumed that the ERMs are synchronized while calling
finding the base-view endpoint mapping, so in particular, we were
using the topology from the base's ERM to check the datacenters of
all endpoints. Now that the ERMs are more likely to not be the same,
we may try to get the datacenter of a view endpoint that doesn't
exist in the base's topology, causing us to crash.

This is fixed in this patch by using the view table's topology for
endpoints coming from the view ERM. The mapping resulting from the
call might now be a temporary mapping between endpoints in different
topologies, but it still maps base and view replicas 1-to-1.

Fixes: #17786
Fixes: #18709
  • Loading branch information
wmitros committed May 26, 2024
1 parent 86b988a commit 83b51b7
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 6 deletions.
14 changes: 8 additions & 6 deletions db/view/view.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1625,25 +1625,26 @@ get_view_natural_endpoint(
}
}

auto& view_topology = view_erm->get_token_metadata_ptr()->get_topology();
for (auto&& view_endpoint : view_erm->get_replicas(view_token)) {
if (use_legacy_self_pairing) {
auto it = std::find(base_endpoints.begin(), base_endpoints.end(),
view_endpoint);
// If this base replica is also one of the view replicas, we use
// ourselves as the view replica.
if (view_endpoint == me) {
if (view_endpoint == me && it != base_endpoints.end()) {
return topology.my_address();
}
// We have to remove any endpoint which is shared between the base
// and the view, as it will select itself and throw off the counts
// otherwise.
auto it = std::find(base_endpoints.begin(), base_endpoints.end(),
view_endpoint);
if (it != base_endpoints.end()) {
base_endpoints.erase(it);
} else if (!network_topology || topology.get_datacenter(view_endpoint) == my_datacenter) {
} else if (!network_topology || view_topology.get_datacenter(view_endpoint) == my_datacenter) {
view_endpoints.push_back(view_endpoint);
}
} else {
if (!network_topology || topology.get_datacenter(view_endpoint) == my_datacenter) {
if (!network_topology || view_topology.get_datacenter(view_endpoint) == my_datacenter) {
view_endpoints.push_back(view_endpoint);
}
}
Expand All @@ -1658,7 +1659,7 @@ get_view_natural_endpoint(
return {};
}
auto replica = view_endpoints[base_it - base_endpoints.begin()];
return topology.get_node(replica).endpoint();
return view_topology.get_node(replica).endpoint();
}

static future<> apply_to_remote_endpoints(service::storage_proxy& proxy, locator::effective_replication_map_ptr ermp,
Expand Down Expand Up @@ -1715,6 +1716,7 @@ future<> view_update_generator::mutate_MV(
{
auto base_ermp = base->table().get_effective_replication_map();
static constexpr size_t max_concurrent_updates = 128;
co_await utils::get_local_injector().inject("delay_before_get_view_natural_endpoint", 8000ms);
co_await max_concurrent_for_each(view_updates, max_concurrent_updates,
[this, base_token, &stats, &cf_stats, tr_state, &pending_view_updates, allow_hints, wait_for_all, base_ermp] (frozen_mutation_and_schema mut) mutable -> future<> {
auto view_token = dht::get_token(*mut.s, mut.fm.key());
Expand Down
11 changes: 11 additions & 0 deletions service/storage_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3037,6 +3037,17 @@ future<> storage_service::replicate_to_all_cores(mutable_token_metadata_ptr tmpt
for (auto it = table_erms.begin(); it != table_erms.end(); ) {
auto& cf = db.find_column_family(it->first);
co_await cf.update_effective_replication_map(std::move(it->second));
co_await utils::get_local_injector().inject("delay_after_erm_update", [&cf, this] (auto& handler) -> future<> {
const auto ks_name = handler.get("ks_name");
const auto cf_name = handler.get("cf_name");
assert(ks_name);
assert(cf_name);
if (cf.schema()->ks_name() != ks_name || cf.schema()->cf_name() != *cf_name) {
co_return;
}

co_await sleep_abortable(std::chrono::seconds{5}, _abort_source);
});
if (cf.uses_tablets()) {
register_tablet_split_candidate(it->first);
}
Expand Down
79 changes: 79 additions & 0 deletions test/topology_custom/test_mv_topology_change.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#
import asyncio
import pytest
import time
import logging

from cassandra.cluster import ConnectionException, NoHostAvailable # type: ignore

from test.pylib.manager_client import ManagerClient
from test.topology.conftest import skip_mode


logger = logging.getLogger(__name__)

# This test reproduces issues #17786 and #18709
# In the test, we create a keyspace with a table and a materialized view.
# We then start writing to the table, causing the materialized view to be updated.
# While the writes are in progress, we add then decommission a node in the cluster.
# The test verifies that no node crashes as a result of the topology change combined
# with the writes.
@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_mv_topology_change(manager: ManagerClient):
cfg = {'force_gossip_topology_changes': True, 'error_injections_at_startup': ['view_update_generator_max_concurrent_updates']}

servers = [await manager.server_add(config=cfg, timeout=60) for _ in range(3)]

cql = manager.get_cql()
await cql.run_async("CREATE KEYSPACE ks WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 3};")
await cql.run_async("CREATE TABLE ks.t (pk int primary key, v int)")
await cql.run_async("CREATE materialized view ks.t_view AS select pk, v from ks.t where v is not null primary key (v, pk)")

stop_event = asyncio.Event()
concurrency = 10
async def do_writes(start_it, repeat) -> int:
iteration = start_it
while not stop_event.is_set():
start_time = time.time()
try:
await cql.run_async(f"insert into ks.t (pk, v) values ({iteration}, {iteration})")
except NoHostAvailable as e:
for _, err in e.errors.items():
# ConnectionException can be raised when the node is shutting down.
if not isinstance(err, ConnectionException):
logger.error(f"Write started {time.time() - start_time}s ago failed: {e}")
raise
except Exception as e:
logger.error(f"Write started {time.time() - start_time}s ago failed: {e}")
raise
iteration += concurrency
if not repeat:
break
await asyncio.sleep(0.01)
return iteration


# to hit the issue #18709 it's enough to start one batch of writes, the effective
# replication maps for base and view will change after the writes start but before they finish
tasks = [asyncio.create_task(do_writes(i, repeat=False)) for i in range(concurrency)]

server = await manager.server_add(config=cfg)

await asyncio.gather(*tasks)

[await manager.api.disable_injection(s.ip_addr, "delay_before_get_view_natural_endpoint") for s in servers]
[await manager.api.enable_injection(s.ip_addr, "delay_after_erm_update", False, parameters={'ks_name': 'ks', 'cf_name': 't'}) for s in servers]

# to hit the issue #17786 we need to run multiple batches of writes, so that some write is processed while the
# effective replication maps for base and view are different
tasks = [asyncio.create_task(do_writes(i, repeat=True)) for i in range(concurrency)]
await manager.decommission_node(server.server_id)

stop_event.set()
await asyncio.gather(*tasks)

0 comments on commit 83b51b7

Please sign in to comment.