Skip to content

Commit

Permalink
Merge 'view_updating_consumer: account empty partitions memory usage'…
Browse files Browse the repository at this point in the history
… from Botond Dénes

Te view updating consumer uses `_buffer_size` to decide when to flush the accumulated mutations, passing them to the actual view building code. This `_buffer_size` is incremented every time a mutation fragment is consumed. This is not exact, as e.g. range tombstones are represented differently in the mutation object, than in the fragment, but it is good enough. There is one flaw however: `_buffer_size` is not incremented when consuming a partition-start fragment. This is when the mutation object is created in the mutation rebuilder. This is not a big problem when partition have many rows, but if the partitions are tiny, the error in accounting quickly becomes significant. If the partitions are empty, `_buffer_size` is not bumped at all for empty partitions, and any number of these can accumulate in the buffer. We have recently seen this causing stalls and OOM as the buffer got to immense size, only containing empty and tiny partitions.
This PR fixes this by accounting the size of the freshly created `mutation` object in `_buffer_size`, after the partition-start fragment is consumed.

Fixes: #14819

Closes #14821

* github.com:scylladb/scylladb:
  test/boost/view_build_test: add test_view_update_generator_buffering_with_empty_mutations
  db/view/view_updating_consumer: account for the size of mutations
  mutation/mutation_rebuilder*: return const mutation& from consume_new_partition()
  mutation/mutation: add memory_usage()
  • Loading branch information
nyh committed Jul 26, 2023
2 parents d2ca600 + d0f725c commit 056d049
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 4 deletions.
6 changes: 5 additions & 1 deletion db/view/view_updating_consumer.hh
Expand Up @@ -83,7 +83,11 @@ public:

void consume_new_partition(const dht::decorated_key& dk) {
_mut_builder.emplace(_schema);
_mut_builder->consume_new_partition(dk);
// Further accounting is inaccurate as we base it on the consumed
// mutation-fragments, not on their final form in the mutation.
// This is good enough, as long as the difference is small and mostly
// constant (per fragment).
_buffer_size += _mut_builder->consume_new_partition(dk).memory_usage(*_schema);
}

void consume(tombstone t) {
Expand Down
10 changes: 10 additions & 0 deletions mutation/mutation.cc
Expand Up @@ -176,6 +176,16 @@ mutation mutation::compacted() const {
return m;
}

size_t mutation::memory_usage(const ::schema& s) const {
auto res = sizeof(*this);
if (_ptr) {
res += sizeof(data);
res += _ptr->_dk.external_memory_usage();
res += _ptr->_p.external_memory_usage(s);
}
return res;
}

mutation reverse(mutation mut) {
auto reverse_schema = mut.schema()->make_reversed();
mutation_rebuilder_v2 reverse_rebuilder(reverse_schema);
Expand Down
2 changes: 2 additions & 0 deletions mutation/mutation.hh
Expand Up @@ -183,6 +183,8 @@ public:
// Does not drop expired tombstones.
// Does not expire TTLed data.
mutation compacted() const;

size_t memory_usage(const ::schema& s) const;
private:
friend std::ostream& operator<<(std::ostream& os, const mutation& m);
};
Expand Down
7 changes: 4 additions & 3 deletions mutation/mutation_rebuilder.hh
Expand Up @@ -18,9 +18,10 @@ class mutation_rebuilder {
public:
explicit mutation_rebuilder(schema_ptr s) : _s(std::move(s)) { }

void consume_new_partition(const dht::decorated_key& dk) {
const mutation& consume_new_partition(const dht::decorated_key& dk) {
assert(!_m);
_m = mutation(_s, std::move(dk));
return *_m;
}

stop_iteration consume(tombstone t) {
Expand Down Expand Up @@ -93,8 +94,8 @@ public:
return std::move(mf).consume(*this);
}
public:
void consume_new_partition(const dht::decorated_key& dk) {
_builder.consume_new_partition(dk);
const mutation& consume_new_partition(const dht::decorated_key& dk) {
return _builder.consume_new_partition(dk);
}

stop_iteration consume(tombstone t) {
Expand Down
48 changes: 48 additions & 0 deletions test/boost/view_build_test.cc
Expand Up @@ -974,3 +974,51 @@ SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering_with_random_mutati
}
assert_that(total).is_equal_to_compacted(mut);
}

// Reproducer for #14819
// Push an partition containing only a tombstone to the view update generator
// (with soft limit set to 1) and expect it to trigger flushing the buffer on
// finishing the partition.
SEASTAR_THREAD_TEST_CASE(test_view_update_generator_buffering_with_empty_mutations) {
class consumer_verifier {
std::unique_ptr<row_locker> _rl;
std::unique_ptr<row_locker::stats> _rl_stats;
std::optional<dht::decorated_key> _last_dk;
bool& _buffer_flushed;

public:
consumer_verifier(schema_ptr schema, bool& buffer_flushed)
: _rl(std::make_unique<row_locker>(std::move(schema)))
, _rl_stats(std::make_unique<row_locker::stats>())
, _buffer_flushed(buffer_flushed)
{ }
future<row_locker::lock_holder> operator()(mutation mut) {
_buffer_flushed = true;
_last_dk = mut.decorated_key();
return _rl->lock_pk(*_last_dk, true, db::no_timeout, *_rl_stats);
}
};

simple_schema ss;
auto schema = ss.schema();
reader_concurrency_semaphore sem(reader_concurrency_semaphore::for_tests{}, get_name(), 1, replica::new_reader_base_cost);
auto stop_sem = deferred_stop(sem);
auto permit = sem.make_tracking_only_permit(schema.get(), "test", db::no_timeout, {});
abort_source as;
auto [staging_reader, staging_reader_handle] = make_manually_paused_evictable_reader_v2(make_empty_mutation_source(), schema, permit,
query::full_partition_range, schema->full_slice(), {}, mutation_reader::forwarding::no);
auto close_staging_reader = deferred_close(staging_reader);
bool buffer_flushed = false;

auto vuc = db::view::view_updating_consumer(schema, permit, as, staging_reader_handle, consumer_verifier(schema, buffer_flushed));
vuc.set_buffer_size_limit_for_testing_purposes(1);

vuc.consume_new_partition(ss.make_pkey(0));
vuc.consume(ss.new_tombstone());
vuc.consume_end_of_partition();

// consume_end_of_stream() forces a flush, so we need to check before it.
BOOST_REQUIRE(buffer_flushed);

vuc.consume_end_of_stream();
}

0 comments on commit 056d049

Please sign in to comment.