diff --git a/mutation_partition.cc b/mutation_partition.cc index db8d0fd669d7..967143407651 100644 --- a/mutation_partition.cc +++ b/mutation_partition.cc @@ -1534,12 +1534,19 @@ bool row::compact_and_expire(const schema& s, column_kind kind, tombstone tomb, const column_definition& def = s.column_at(kind, id); if (def.is_atomic()) { atomic_cell_view cell = c.as_atomic_cell(); + auto can_erase_cell = [&] { + return cell.deletion_time() < gc_before && can_gc(tombstone(cell.timestamp(), cell.deletion_time())); + }; + if (cell.is_covered_by(tomb, def.is_counter())) { erase = true; } else if (cell.has_expired(query_time)) { - c = atomic_cell::make_dead(cell.timestamp(), cell.deletion_time()); + erase = can_erase_cell(); + if (!erase) { + c = atomic_cell::make_dead(cell.timestamp(), cell.deletion_time()); + } } else if (!cell.is_live()) { - erase = cell.deletion_time() < gc_before && can_gc(tombstone(cell.timestamp(), cell.deletion_time())); + erase = can_erase_cell(); } else { any_live |= true; } diff --git a/tests/sstable_datafile_test.cc b/tests/sstable_datafile_test.cc index 1e496d8a98a1..70186ba00ce5 100644 --- a/tests/sstable_datafile_test.cc +++ b/tests/sstable_datafile_test.cc @@ -2284,6 +2284,13 @@ SEASTAR_TEST_CASE(tombstone_purge_test) { return m; }; + auto make_expiring = [&] (partition_key key, bool ttl) { + mutation m(key, s); + m.set_clustered_cell(clustering_key::make_empty(), bytes("value"), data_value(int32_t(1)), + gc_clock::now().time_since_epoch().count(), gc_clock::duration(ttl)); + return m; + }; + auto make_delete = [&] (partition_key key) { mutation m(key, s); tombstone tomb(next_timestamp(), gc_clock::now()); @@ -2291,6 +2298,25 @@ SEASTAR_TEST_CASE(tombstone_purge_test) { return m; }; + auto assert_that_produces_dead_cell = [&] (auto& sst, partition_key& key) { + auto reader = make_lw_shared(sstable_reader(sst, s)); + (*reader)().then([&key] (auto sm) { + return mutation_from_streamed_mutation(std::move(sm)); + }).then([reader, s, &key] (mutation_opt m) { + BOOST_REQUIRE(m); + BOOST_REQUIRE(m->key().equal(*s, key)); + auto& rows = m->partition().clustered_rows(); + BOOST_REQUIRE_EQUAL(rows.calculate_size(), 1); + auto& row = rows.begin()->row(); + auto& cells = row.cells(); + BOOST_REQUIRE_EQUAL(cells.size(), 1); + BOOST_REQUIRE(!cells.cell_at(s->get_column_definition("value")->id).as_atomic_cell().is_live()); + return (*reader)(); + }).then([reader, s] (streamed_mutation_opt m) { + BOOST_REQUIRE(!m); + }).get(); + }; + auto alpha = partition_key::from_exploded(*s, {to_bytes("alpha")}); auto beta = partition_key::from_exploded(*s, {to_bytes("beta")}); @@ -2369,6 +2395,52 @@ SEASTAR_TEST_CASE(tombstone_purge_test) { .produces(mut3) .produces_end_of_stream(); } + + { + // check that expired cell will not be purged if it will ressurect overwritten data. + auto mut1 = make_insert(alpha); + auto mut2 = make_expiring(alpha, 1); + + auto sst1 = make_sstable_containing(sst_gen, {mut1}); + auto sst2 = make_sstable_containing(sst_gen, {mut2}); + + forward_jump_clocks(std::chrono::seconds(5)); + + auto result = compact({sst1, sst2}, {sst2}); + BOOST_REQUIRE_EQUAL(1, result.size()); + assert_that_produces_dead_cell(result[0], alpha); + + result = compact({sst1, sst2}, {sst1, sst2}); + BOOST_REQUIRE_EQUAL(0, result.size()); + } + { + auto mut1 = make_insert(alpha); + auto mut2 = make_expiring(beta, 1); + + auto sst1 = make_sstable_containing(sst_gen, {mut1}); + auto sst2 = make_sstable_containing(sst_gen, {mut2}); + + forward_jump_clocks(std::chrono::seconds(5)); + + auto result = compact({sst1, sst2}, {sst2}); + BOOST_REQUIRE_EQUAL(0, result.size()); + } + { + auto mut1 = make_insert(alpha); + auto mut2 = make_expiring(alpha, 1); + auto mut3 = make_insert(beta); + + auto sst1 = make_sstable_containing(sst_gen, {mut1}); + auto sst2 = make_sstable_containing(sst_gen, {mut2, mut3}); + + forward_jump_clocks(std::chrono::seconds(5)); + + auto result = compact({sst1, sst2}, {sst1, sst2}); + BOOST_REQUIRE_EQUAL(1, result.size()); + assert_that(sstable_reader(result[0], s)) + .produces(mut3) + .produces_end_of_stream(); + } }); } diff --git a/types.cc b/types.cc index 1b05ed3a76ee..4984b576ca73 100644 --- a/types.cc +++ b/types.cc @@ -2149,14 +2149,20 @@ bool collection_type_impl::mutation::compact_and_expire(tombstone base_tomb, gc_ std::vector> survivors; for (auto&& name_and_cell : cells) { atomic_cell& cell = name_and_cell.second; + auto cannot_erase_cell = [&] { + return cell.deletion_time() >= gc_before || !can_gc(tombstone(cell.timestamp(), cell.deletion_time())); + }; + if (cell.is_covered_by(tomb, false)) { continue; } if (cell.has_expired(query_time)) { - survivors.emplace_back(std::make_pair( - std::move(name_and_cell.first), atomic_cell::make_dead(cell.timestamp(), cell.deletion_time()))); + if (cannot_erase_cell()) { + survivors.emplace_back(std::make_pair( + std::move(name_and_cell.first), atomic_cell::make_dead(cell.timestamp(), cell.deletion_time()))); + } } else if (!cell.is_live()) { - if (cell.deletion_time() >= gc_before || !can_gc(tombstone(cell.timestamp(), cell.deletion_time()))) { + if (cannot_erase_cell()) { survivors.emplace_back(std::move(name_and_cell)); } } else {