Skip to content

Commit

Permalink
reconcilable_result_builder: add v2 support
Browse files Browse the repository at this point in the history
Add a `consume()` overload for range tombstone changes and convert them
internally to range tombstones, as the underlying reconcilable result
is still v1.
  • Loading branch information
denesb committed Mar 11, 2022
1 parent 728c145 commit 4629f7d
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 0 deletions.
14 changes: 14 additions & 0 deletions mutation_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1990,6 +1990,7 @@ stop_iteration query::result_memory_accounter::check_local_limit() const {
}

void reconcilable_result_builder::consume_new_partition(const dht::decorated_key& dk) {
_rt_assembler.reset();
_return_static_content_on_partition_with_no_rows =
_slice.options.contains(query::partition_slice::option::always_return_static_content) ||
!has_ck_selector(_slice.row_ranges(_schema, dk.key()));
Expand All @@ -2009,6 +2010,11 @@ stop_iteration reconcilable_result_builder::consume(static_row&& sr, tombstone,
}

stop_iteration reconcilable_result_builder::consume(clustering_row&& cr, row_tombstone, bool is_alive) {
if (_rt_assembler.needs_flush()) {
if (auto rt_opt = _rt_assembler.flush(_schema, position_in_partition::after_key(cr.key()))) {
consume(std::move(*rt_opt));
}
}
_live_rows += is_alive;
auto stop = _memory_accounter.update_and_check(cr.memory_usage(_schema));
if (is_alive) {
Expand All @@ -2031,7 +2037,15 @@ stop_iteration reconcilable_result_builder::consume(range_tombstone&& rt) {
return _mutation_consumer->consume(std::move(rt));
}

stop_iteration reconcilable_result_builder::consume(range_tombstone_change&& rtc) {
if (auto rt_opt = _rt_assembler.consume(_schema, std::move(rtc))) {
return consume(std::move(*rt_opt));
}
return stop_iteration::no;
}

stop_iteration reconcilable_result_builder::consume_end_of_partition() {
_rt_assembler.on_end_of_stream();
if (_live_rows == 0 && _static_row_is_alive && _return_static_content_on_partition_with_no_rows) {
++_live_rows;
// Normally we count only live clustering rows, to guarantee that
Expand Down
3 changes: 3 additions & 0 deletions mutation_query.hh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "db/timeout_clock.hh"
#include "mutation.hh"
#include "utils/chunked_vector.hh"
#include "range_tombstone_assembler.hh"

class reconcilable_result;
class frozen_reconcilable_result;
Expand Down Expand Up @@ -132,6 +133,7 @@ class reconcilable_result_builder {
query::result_memory_accounter _memory_accounter;
stop_iteration _stop;
std::optional<streamed_mutation_freezer> _mutation_consumer;
range_tombstone_assembler _rt_assembler;

uint64_t _live_rows{};
// make this the last member so it is destroyed first. #7240
Expand All @@ -149,6 +151,7 @@ public:
stop_iteration consume(static_row&& sr, tombstone, bool is_alive);
stop_iteration consume(clustering_row&& cr, row_tombstone, bool is_alive);
stop_iteration consume(range_tombstone&& rt);
stop_iteration consume(range_tombstone_change&& rtc);
stop_iteration consume_end_of_partition();
reconcilable_result consume_end_of_stream();
};
Expand Down

0 comments on commit 4629f7d

Please sign in to comment.