Skip to content

Commit

Permalink
compact and remove expired rows from cache on read
Browse files Browse the repository at this point in the history
when read from cache compact and expire row tombstones
remove expired empty rows from cache
do not expire range tombstones in this patch

Refs #2252, #6033

Closes #12917
  • Loading branch information
alezzqz authored and tgrabiec committed Jun 26, 2023
1 parent b233619 commit ca4e7f9
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 10 deletions.
81 changes: 77 additions & 4 deletions cache_flat_mutation_reader.hh
Expand Up @@ -110,6 +110,9 @@ class cache_flat_mutation_reader final : public flat_mutation_reader_v2::impl {
flat_mutation_reader_v2* _underlying = nullptr;
flat_mutation_reader_v2_opt _underlying_holder;

gc_clock::time_point _read_time;
gc_clock::time_point _gc_before;

future<> do_fill_buffer();
future<> ensure_underlying();
void copy_from_cache_to_buffer();
Expand Down Expand Up @@ -178,6 +181,20 @@ class cache_flat_mutation_reader final : public flat_mutation_reader_v2::impl {
const schema& table_schema() {
return *_snp->schema();
}

gc_clock::time_point get_read_time() {
return _read_context.tombstone_gc_state() ? gc_clock::now() : gc_clock::time_point::min();
}

gc_clock::time_point get_gc_before(const schema& schema, dht::decorated_key dk, const gc_clock::time_point query_time) {
auto gc_state = _read_context.tombstone_gc_state();
if (gc_state) {
return gc_state->get_gc_before_for_key(schema.shared_from_this(), dk, query_time);
}

return gc_clock::time_point::min();
}

public:
cache_flat_mutation_reader(schema_ptr s,
dht::decorated_key dk,
Expand All @@ -196,6 +213,8 @@ public:
, _read_context_holder()
, _read_context(ctx) // ctx is owned by the caller, who's responsible for closing it.
, _next_row(*_schema, *_snp, false, _read_context.is_reversed())
, _read_time(get_read_time())
, _gc_before(get_gc_before(*_schema, dk, _read_time))
{
clogger.trace("csm {}: table={}.{}, reversed={}, snap={}", fmt::ptr(this), _schema->ks_name(), _schema->cf_name(), _read_context.is_reversed(),
fmt::ptr(&*_snp));
Expand Down Expand Up @@ -730,9 +749,40 @@ void cache_flat_mutation_reader::copy_from_cache_to_buffer() {
}
}

// We add the row to the buffer even when it's full.
// This simplifies the code. For more info see #3139.
if (_next_row_in_range) {
bool remove_row = false;

if (_read_context.tombstone_gc_state() // do not compact rows when tombstone_gc_state is not set (used in some unit tests)
&& !_next_row.dummy()
&& _snp->at_latest_version()
&& _snp->at_oldest_version()) {
deletable_row& row = _next_row.latest_row();
auto t = row.deleted_at();

auto row_tomb_expired = [&](row_tombstone tomb) {
return (tomb && tomb.max_deletion_time() < _gc_before);
};

auto is_row_dead = [&](const deletable_row& row) {
auto& m = row.marker();
return (!m.is_missing() && m.is_dead(_read_time) && m.deletion_time() < _gc_before);
};

if (row_tomb_expired(t) || is_row_dead(row)) {
can_gc_fn always_gc = [&](tombstone) { return true; };
const schema& row_schema = _next_row.latest_row_schema();

_read_context.cache()._tracker.on_row_compacted();

with_allocator(_snp->region().allocator(), [&] {
deletable_row row_copy(row_schema, row);
row_copy.compact_and_expire(row_schema, t.tomb(), _read_time, always_gc, _gc_before, nullptr);
std::swap(row, row_copy);
});
remove_row = row.empty();
}
}

if (_next_row.range_tombstone_for_row() != _current_tombstone) [[unlikely]] {
auto tomb = _next_row.range_tombstone_for_row();
auto new_lower_bound = position_in_partition::before_key(_next_row.position());
Expand All @@ -742,8 +792,31 @@ void cache_flat_mutation_reader::copy_from_cache_to_buffer() {
_current_tombstone = tomb;
_read_context.cache()._tracker.on_range_tombstone_read();
}
add_to_buffer(_next_row);
move_to_next_entry();

if (remove_row) {
_read_context.cache()._tracker.on_row_compacted_away();

_lower_bound = position_in_partition::after_key(*_schema, _next_row.position());

partition_snapshot_row_weakref row_ref(_next_row);
move_to_next_entry();

with_allocator(_snp->region().allocator(), [&] {
cache_tracker& tracker = _read_context.cache()._tracker;
if (row_ref->is_linked()) {
tracker.get_lru().remove(*row_ref);
}
row_ref->on_evicted(tracker);
});

_snp->region().allocator().invalidate_references();
_next_row.force_valid();
} else {
// We add the row to the buffer even when it's full.
// This simplifies the code. For more info see #3139.
add_to_buffer(_next_row);
move_to_next_entry();
}
} else {
move_to_next_range();
}
Expand Down
4 changes: 4 additions & 0 deletions db/cache_tracker.hh
Expand Up @@ -68,6 +68,8 @@ public:
uint64_t pinned_dirty_memory_overload;
uint64_t range_tombstone_reads;
uint64_t row_tombstone_reads;
uint64_t rows_compacted;
uint64_t rows_compacted_away;

uint64_t active_reads() const {
return reads - reads_done;
Expand Down Expand Up @@ -115,6 +117,8 @@ public:
void on_row_merged_from_memtable() noexcept { ++_stats.rows_merged_from_memtable; }
void on_range_tombstone_read() noexcept { ++_stats.range_tombstone_reads; }
void on_row_tombstone_read() noexcept { ++_stats.row_tombstone_reads; }
void on_row_compacted() noexcept { ++_stats.rows_compacted; }
void on_row_compacted_away() noexcept { ++_stats.rows_compacted_away; }
void pinned_dirty_memory_overload(uint64_t bytes) noexcept;
allocation_strategy& allocator() noexcept;
logalloc::region& region() noexcept;
Expand Down
9 changes: 9 additions & 0 deletions partition_snapshot_row_cursor.hh
Expand Up @@ -564,6 +564,15 @@ public:
return cr;
}

const schema& latest_row_schema() const noexcept {
return *_current_row[0].schema;
}

// Can be called only when cursor is valid and pointing at a row.
deletable_row& latest_row() const noexcept {
return _current_row[0].it->row();
}

// Can be called only when cursor is valid and pointing at a row.
void latest_row_prepare_hash() const {
_current_row[0].it->row().cells().prepare_hash(*_current_row[0].schema, column_kind::regular_column);
Expand Down
4 changes: 4 additions & 0 deletions read_context.hh
Expand Up @@ -126,6 +126,7 @@ class read_context final : public enable_lw_shared_from_this<read_context> {
tracing::trace_state_ptr _trace_state;
mutation_reader::forwarding _fwd_mr;
bool _range_query;
const tombstone_gc_state* _tombstone_gc_state;
// When reader enters a partition, it must be set up for reading that
// partition from the underlying mutation source (_underlying) in one of two ways:
//
Expand All @@ -148,6 +149,7 @@ public:
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const tombstone_gc_state* gc_state,
tracing::trace_state_ptr trace_state,
mutation_reader::forwarding fwd_mr)
: _cache(cache)
Expand All @@ -158,6 +160,7 @@ public:
, _trace_state(std::move(trace_state))
, _fwd_mr(fwd_mr)
, _range_query(!query::is_single_partition(range))
, _tombstone_gc_state(gc_state)
, _underlying(_cache, *this)
{
if (_slice.options.contains(query::partition_slice::option::reversed)) {
Expand Down Expand Up @@ -195,6 +198,7 @@ public:
bool partition_exists() const { return _partition_exists; }
void on_underlying_created() { ++_underlying_created; }
bool digest_requested() const { return _slice.options.contains<query::partition_slice::option::with_digest>(); }
const tombstone_gc_state* tombstone_gc_state() const { return _tombstone_gc_state; }
public:
future<> ensure_underlying() {
if (_underlying_snapshot) {
Expand Down
2 changes: 1 addition & 1 deletion replica/table.cc
Expand Up @@ -261,7 +261,7 @@ table::make_reader_v2(schema_ptr s,

const auto bypass_cache = slice.options.contains(query::partition_slice::option::bypass_cache);
if (cache_enabled() && !bypass_cache && !(reversed && _config.reversed_reads_auto_bypass_cache())) {
if (auto reader_opt = _cache.make_reader_opt(s, permit, range, slice, std::move(trace_state), fwd, fwd_mr)) {
if (auto reader_opt = _cache.make_reader_opt(s, permit, range, slice, &_compaction_manager.get_tombstone_gc_state(), std::move(trace_state), fwd, fwd_mr)) {
readers.emplace_back(std::move(*reader_opt));
}
} else {
Expand Down
7 changes: 6 additions & 1 deletion row_cache.cc
Expand Up @@ -140,6 +140,10 @@ cache_tracker::setup_metrics() {
sm::description("total amount of range tombstones processed during read")),
sm::make_counter("row_tombstone_reads", _stats.row_tombstone_reads,
sm::description("total amount of row tombstones processed during read")),
sm::make_counter("rows_compacted", _stats.rows_compacted,
sm::description("total amount of attempts to compact expired rows during read")),
sm::make_counter("rows_compacted_away", _stats.rows_compacted_away,
sm::description("total amount of compacted and removed rows during read")),
});
}

Expand Down Expand Up @@ -725,12 +729,13 @@ row_cache::make_reader_opt(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const tombstone_gc_state* gc_state,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr)
{
auto make_context = [&] {
return std::make_unique<read_context>(*this, s, std::move(permit), range, slice, trace_state, fwd_mr);
return std::make_unique<read_context>(*this, s, std::move(permit), range, slice, gc_state, trace_state, fwd_mr);
};

if (query::is_single_partition(range) && !fwd_mr) {
Expand Down
15 changes: 11 additions & 4 deletions row_cache.hh
Expand Up @@ -358,14 +358,16 @@ public:
// User needs to ensure that the row_cache object stays alive
// as long as the reader is used.
// The range must not wrap around.

flat_mutation_reader_v2 make_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range,
const query::partition_slice& slice,
tracing::trace_state_ptr trace_state = nullptr,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no) {
if (auto reader_opt = make_reader_opt(s, permit, range, slice, std::move(trace_state), fwd, fwd_mr)) {
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no,
const tombstone_gc_state* gc_state = nullptr) {
if (auto reader_opt = make_reader_opt(s, permit, range, slice, gc_state, std::move(trace_state), fwd, fwd_mr)) {
return std::move(*reader_opt);
}
[[unlikely]] return make_empty_flat_reader_v2(std::move(s), std::move(permit));
Expand All @@ -376,13 +378,18 @@ public:
reader_permit permit,
const dht::partition_range&,
const query::partition_slice&,
const tombstone_gc_state*,
tracing::trace_state_ptr trace_state = nullptr,
streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no,
mutation_reader::forwarding fwd_mr = mutation_reader::forwarding::no);

flat_mutation_reader_v2 make_reader(schema_ptr s, reader_permit permit, const dht::partition_range& range = query::full_partition_range) {
flat_mutation_reader_v2 make_reader(schema_ptr s,
reader_permit permit,
const dht::partition_range& range = query::full_partition_range,
const tombstone_gc_state* gc_state = nullptr) {
auto& full_slice = s->full_slice();
return make_reader(std::move(s), std::move(permit), range, full_slice);
return make_reader(std::move(s), std::move(permit), range, full_slice, nullptr,
streamed_mutation::forwarding::no, mutation_reader::forwarding::no, gc_state);
}

const stats& stats() const { return _stats; }
Expand Down
56 changes: 56 additions & 0 deletions test/boost/row_cache_test.cc
Expand Up @@ -4543,3 +4543,59 @@ SEASTAR_THREAD_TEST_CASE(test_digest_read_during_schema_upgrade) {
m2.upgrade(s2);
assert_that(std::move(rd)).produces(m2);
}

SEASTAR_TEST_CASE(test_cache_compacts_expired_tombstones_on_read) {
return seastar::async([] {
auto s = schema_builder("ks", "cf")
.with_column("pk", int32_type, column_kind::partition_key)
.with_column("ck", int32_type, column_kind::clustering_key)
.with_column("v", int32_type)
.build();

tests::reader_concurrency_semaphore_wrapper semaphore;
auto pkey = tests::generate_partition_key(s);

auto make_ck = [&s] (int v) {
return clustering_key::from_deeply_exploded(*s, {data_value{v}});
};

auto make_prefix = [&s] (int v) {
return clustering_key_prefix::from_deeply_exploded(*s, {data_value{v}});
};

auto ck1 = make_ck(1);
auto ck2 = make_ck(2);
auto ck3 = make_ck(3);
auto dt_noexp = gc_clock::now();
auto dt_exp = gc_clock::now() - std::chrono::seconds(s->gc_grace_seconds().count() + 1);

auto mt = make_lw_shared<replica::memtable>(s);
cache_tracker tracker;
row_cache cache(s, snapshot_source_from_snapshot(mt->as_data_source()), tracker);

{
mutation m(s, pkey);
m.set_clustered_cell(ck1, "v", data_value(101), 1);
m.partition().apply_delete(*s, make_prefix(2), tombstone(1, dt_noexp)); // create non-expired tombstone
m.partition().apply_delete(*s, make_prefix(3), tombstone(2, dt_exp)); // create expired tombstone
cache.populate(m);
}

tombstone_gc_state gc_state(nullptr);
auto rd1 = cache.make_reader(s, semaphore.make_permit(), query::full_partition_range, &gc_state);
auto close_rd = deferred_close(rd1);
rd1.fill_buffer().get(); // cache_flat_mutation_reader compacts cache on fill buffer

cache_entry& entry = cache.lookup(pkey);
auto& cp = entry.partition().version()->partition();

BOOST_REQUIRE(cp.find_row(*s, ck1) != nullptr); // live row is in cache
BOOST_REQUIRE_EQUAL(cp.clustered_row(*s, ck2).deleted_at(), row_tombstone(tombstone(1, dt_noexp))); // non-expired tombstone is in cache
BOOST_REQUIRE(cp.find_row(*s, ck3) == nullptr); // expired tombstone isn't in cache

// check tracker stats
auto &tracker_stats = tracker.get_stats();
BOOST_REQUIRE(tracker_stats.rows_compacted == 1);
BOOST_REQUIRE(tracker_stats.rows_compacted_away == 1);
});
}

0 comments on commit ca4e7f9

Please sign in to comment.