Skip to content

Commit

Permalink
Fix potential data resurrection when another compaction type does cle…
Browse files Browse the repository at this point in the history
…anup work

Since commit f1bbf70, many compaction types can do cleanup work, but turns out
we forgot to invalidate cache on their completion.

So if a node regains ownership of token that had partition deleted in its previous
owner (and tombstone is already gone), data can be resurrected.

Tablet is not affected, as it explicitly invalidates cache during migration
cleanup stage.

Scylla 5.4 is affected.

Fixes #17501.
Fixes #17452.

Signed-off-by: Raphael S. Carvalho <raphaelsc@scylladb.com>

Closes #17502
  • Loading branch information
raphaelsc authored and nyh committed Feb 25, 2024
1 parent b4cef63 commit f07c233
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 24 deletions.
47 changes: 23 additions & 24 deletions compaction/compaction.cc
Expand Up @@ -499,6 +499,26 @@ class compaction {
cdata.compaction_fan_in = descriptor.fan_in();
return cdata;
}

// Called in a seastar thread
dht::partition_range_vector
get_ranges_for_invalidation(const std::vector<shared_sstable>& sstables) {
// If owned ranges is disengaged, it means no cleanup work was done and
// so nothing needs to be invalidated.
if (!_owned_ranges) {
return dht::partition_range_vector{};
}
auto owned_ranges = dht::to_partition_ranges(*_owned_ranges, utils::can_yield::yes);

auto non_owned_ranges = boost::copy_range<dht::partition_range_vector>(sstables
| boost::adaptors::transformed([] (const shared_sstable& sst) {
seastar::thread::maybe_yield();
return dht::partition_range::make({sst->get_first_decorated_key(), true},
{sst->get_last_decorated_key(), true});
}));

return dht::subtract_ranges(*_schema, non_owned_ranges, std::move(owned_ranges)).get();
}
protected:
compaction(table_state& table_s, compaction_descriptor descriptor, compaction_data& cdata, compaction_progress_monitor& progress_monitor, use_backlog_tracker use_backlog_tracker)
: _cdata(init_compaction_data(cdata, descriptor))
Expand Down Expand Up @@ -587,9 +607,10 @@ class compaction {
return _stats_collector.get();
}

virtual compaction_completion_desc
compaction_completion_desc
get_compaction_completion_desc(std::vector<shared_sstable> input_sstables, std::vector<shared_sstable> output_sstables) {
return compaction_completion_desc{std::move(input_sstables), std::move(output_sstables)};
auto ranges_for_for_invalidation = get_ranges_for_invalidation(input_sstables);
return compaction_completion_desc{std::move(input_sstables), std::move(output_sstables), std::move(ranges_for_for_invalidation)};
}

// Tombstone expiration is enabled based on the presence of sstable set.
Expand Down Expand Up @@ -1280,28 +1301,6 @@ class reshape_compaction : public regular_compaction {
};

class cleanup_compaction final : public regular_compaction {
private:
// Called in a seastar thread
dht::partition_range_vector
get_ranges_for_invalidation(const std::vector<shared_sstable>& sstables) {
auto owned_ranges = dht::to_partition_ranges(*_owned_ranges, utils::can_yield::yes);

auto non_owned_ranges = boost::copy_range<dht::partition_range_vector>(sstables
| boost::adaptors::transformed([] (const shared_sstable& sst) {
seastar::thread::maybe_yield();
return dht::partition_range::make({sst->get_first_decorated_key(), true},
{sst->get_last_decorated_key(), true});
}));

return dht::subtract_ranges(*_schema, non_owned_ranges, std::move(owned_ranges)).get();
}
protected:
virtual compaction_completion_desc
get_compaction_completion_desc(std::vector<shared_sstable> input_sstables, std::vector<shared_sstable> output_sstables) override {
auto ranges_for_for_invalidation = get_ranges_for_invalidation(input_sstables);
return compaction_completion_desc{std::move(input_sstables), std::move(output_sstables), std::move(ranges_for_for_invalidation)};
}

public:
cleanup_compaction(table_state& table_s, compaction_descriptor descriptor, compaction_data& cdata, compaction_progress_monitor& progress_monitor)
: regular_compaction(table_s, std::move(descriptor), cdata, progress_monitor)
Expand Down
3 changes: 3 additions & 0 deletions compaction/compaction_manager.cc
Expand Up @@ -1963,6 +1963,9 @@ future<> compaction_manager::try_perform_cleanup(owned_ranges_ptr sorted_owned_r
if (found_maintenance_sstables) {
co_await perform_offstrategy(t, info);
}
if (utils::get_local_injector().enter("major_compaction_before_cleanup")) {
co_await perform_major_compaction(t, info);
}

// Called with compaction_disabled
auto get_sstables = [this, &t] () -> future<std::vector<sstables::shared_sstable>> {
Expand Down
4 changes: 4 additions & 0 deletions test/pylib/rest_client.py
Expand Up @@ -264,6 +264,10 @@ async def flush_keyspace(self, node_ip: str, ks: str) -> None:
"""Flush keyspace"""
await self.client.post(f"/storage_service/keyspace_flush/{ks}", host=node_ip)

async def cleanup_keyspace(self, node_ip: str, ks: str) -> None:
"""Cleanup keyspace"""
await self.client.post(f"/storage_service/keyspace_cleanup/{ks}", host=node_ip)

async def load_new_sstables(self, node_ip: str, keyspace: str, table: str) -> None:
"""Load sstables from upload directory"""
await self.client.post(f"/storage_service/sstables/{keyspace}?cf={table}", host=node_ip)
Expand Down
72 changes: 72 additions & 0 deletions test/topology_custom/test_data_resurrection_after_cleanup.py
@@ -0,0 +1,72 @@
#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: AGPL-3.0-or-later
#

from test.pylib.manager_client import ManagerClient
from test.pylib.rest_client import inject_error_one_shot
from test.topology.conftest import skip_mode
from test.topology.util import check_token_ring_and_group0_consistency

import pytest
import asyncio
import logging
import time

logger = logging.getLogger(__name__)

@pytest.mark.asyncio
@skip_mode('release', 'error injections are not supported in release mode')
async def test_data_resurrection_after_cleanup(manager: ManagerClient):
logger.info("Bootstrapping cluster")
cmdline = [
'--logger-log-level', 'table=debug',
'--smp', '1',
]
servers = [await manager.server_add(cmdline=cmdline)]

cql = manager.get_cql()

await cql.run_async("CREATE KEYSPACE test WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1};")
await cql.run_async("CREATE TABLE test.test (pk int PRIMARY KEY, c int) WITH gc_grace_seconds=0;")

keys = range(256)
await asyncio.gather(*[cql.run_async(f"INSERT INTO test.test (pk, c) VALUES ({k}, {k});") for k in keys])

async def check(expected_keys):
logger.info("Checking table")
cql = manager.get_cql()
rows = await cql.run_async("SELECT * FROM test.test;")
assert len(rows) == len(expected_keys)
for r in rows:
assert r.c == r.pk

await manager.api.flush_keyspace(servers[0].ip_addr, "test")

await check(keys)

logger.info("Adding new server")
servers.append(await manager.server_add(cmdline=cmdline))

time.sleep(1)
await check(keys)

await inject_error_one_shot(manager.api, servers[0].ip_addr, "major_compaction_before_cleanup")
await manager.api.cleanup_keyspace(servers[0].ip_addr, "test")

deleted_keys = range(128)
await asyncio.gather(*[cql.run_async(f"DELETE FROM test.test WHERE pk={k};") for k in deleted_keys])
# Make sures tombstones are gone
await manager.api.flush_keyspace(servers[1].ip_addr, "test")
time.sleep(1)
await manager.api.keyspace_compaction(servers[1].ip_addr, "test")

# Regains ownership of deleted data

logger.info(f"Decommissioning node {servers[1]}")
await manager.decommission_node(servers[1].server_id)
await check_token_ring_and_group0_consistency(manager)

time.sleep(1)
await check(range(128))

0 comments on commit f07c233

Please sign in to comment.