Skip to content

Commit

Permalink
compact and remove expired range tombstones from cache on read
Browse files Browse the repository at this point in the history
during read from cache compact and expire range tombstones
remove expired empty rows from cache

Refs #2252
Fixes #6033

Closes #14463
  • Loading branch information
alezzqz authored and denesb committed Sep 1, 2023
1 parent 3bdbe62 commit 87fa7d0
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 0 deletions.
11 changes: 11 additions & 0 deletions cache_flat_mutation_reader.hh
Expand Up @@ -757,7 +757,9 @@ void cache_flat_mutation_reader::copy_from_cache_to_buffer() {
&& _snp->at_latest_version()
&& _snp->at_oldest_version()) {
deletable_row& row = _next_row.latest_row();
tombstone range_tomb = _next_row.range_tombstone_for_row();
auto t = row.deleted_at();
t.apply(range_tomb);

auto row_tomb_expired = [&](row_tombstone tomb) {
return (tomb && tomb.max_deletion_time() < _gc_before);
Expand All @@ -780,6 +782,15 @@ void cache_flat_mutation_reader::copy_from_cache_to_buffer() {
std::swap(row, row_copy);
});
remove_row = row.empty();

auto tomb_expired = [&](tombstone tomb) {
return (tomb && tomb.deletion_time < _gc_before);
};

auto latests_range_tomb = _next_row.get_iterator_in_latest_version()->range_tombstone();
if (tomb_expired(latests_range_tomb)) {
_next_row.get_iterator_in_latest_version()->set_range_tombstone({});
}
}
}

Expand Down
89 changes: 89 additions & 0 deletions test/boost/row_cache_test.cc
Expand Up @@ -4599,3 +4599,92 @@ SEASTAR_TEST_CASE(test_cache_compacts_expired_tombstones_on_read) {
BOOST_REQUIRE(tracker_stats.rows_compacted_away == 1);
});
}

SEASTAR_TEST_CASE(test_compact_range_tombstones_on_read) {
return seastar::async([] {
simple_schema s;
tests::reader_concurrency_semaphore_wrapper semaphore;

auto cache_mt = make_lw_shared<replica::memtable>(s.schema());

cache_tracker tracker;
row_cache cache(s.schema(), snapshot_source_from_snapshot(cache_mt->as_data_source()), tracker);

auto pk = s.make_pkey(0);
auto pr = dht::partition_range::make_singular(pk);

auto ck0 = s.make_ckey(0);
auto ck1 = s.make_ckey(1);
auto ck2 = s.make_ckey(2);
auto ck3 = s.make_ckey(3);

auto dt_noexp = gc_clock::now();
auto dt_exp = gc_clock::now() - std::chrono::seconds(s.schema()->gc_grace_seconds().count() + 1);

mutation m(s.schema(), pk);
auto rt1 = s.make_range_tombstone(s.make_ckey_range(2, 3), dt_noexp);
auto rt2 = s.make_range_tombstone(s.make_ckey_range(1, 2), dt_exp);
m.partition().apply_delete(*s.schema(), rt1);
m.partition().apply_delete(*s.schema(), rt2);
s.add_row(m, ck0, "v0");
s.add_row(m, ck1, "v1");
s.add_row(m, ck2, "v2");
s.add_row(m, ck3, "v3");
cache.populate(m);

tombstone_gc_state gc_state(nullptr);

cache_entry& entry = cache.lookup(pk);
auto& cp = entry.partition().version()->partition();

// check all rows are in cache
BOOST_REQUIRE(cp.find_row(*s.schema(), ck0) != nullptr);
BOOST_REQUIRE(cp.find_row(*s.schema(), ck1) != nullptr);
BOOST_REQUIRE(cp.find_row(*s.schema(), ck2) != nullptr);
BOOST_REQUIRE(cp.find_row(*s.schema(), ck3) != nullptr);

// workaround: make row cells to be compacted during next read
auto set_cells_timestamp_to_min = [&](deletable_row& row) {
row.cells().for_each_cell([&] (column_id id, atomic_cell_or_collection& cell) {
const column_definition& def = s.schema()->column_at(column_kind::clustering_key, id);

auto cell_view = cell.as_mutable_atomic_cell(def);
cell_view.set_timestamp(api::min_timestamp);
});
};

set_cells_timestamp_to_min(cp.clustered_row(*s.schema(), ck0));
set_cells_timestamp_to_min(cp.clustered_row(*s.schema(), ck1));
set_cells_timestamp_to_min(cp.clustered_row(*s.schema(), ck2));
set_cells_timestamp_to_min(cp.clustered_row(*s.schema(), ck3));

{
auto rd1 = cache.make_reader(s.schema(), semaphore.make_permit(), pr, &gc_state);
auto close_rd1 = deferred_close(rd1);
rd1.fill_buffer().get();

// check some rows are compacted on read from cache
BOOST_REQUIRE(cp.find_row(*s.schema(), ck0) != nullptr);
BOOST_REQUIRE(cp.find_row(*s.schema(), ck1) == nullptr);
BOOST_REQUIRE(cp.find_row(*s.schema(), ck2) == nullptr);
BOOST_REQUIRE(cp.find_row(*s.schema(), ck3) != nullptr);
}

{
auto rd2 = cache.make_reader(s.schema(), semaphore.make_permit(), pr, &gc_state);
auto close_rd2 = deferred_close(rd2);
rd2.fill_buffer().get();

// check compacted rows weren't resurrected on second read from cache
BOOST_REQUIRE(cp.find_row(*s.schema(), ck0) != nullptr);
BOOST_REQUIRE(cp.find_row(*s.schema(), ck1) == nullptr);
BOOST_REQUIRE(cp.find_row(*s.schema(), ck2) == nullptr);
BOOST_REQUIRE(cp.find_row(*s.schema(), ck3) != nullptr);
}

// check tracker stats
auto &tracker_stats = tracker.get_stats();
BOOST_REQUIRE(tracker_stats.rows_compacted == 2);
BOOST_REQUIRE(tracker_stats.rows_compacted_away == 2);
});
}

0 comments on commit 87fa7d0

Please sign in to comment.