Skip to content

Commit

Permalink
Merge "Multiple fixes to tests/normalizing_reader" from Vladimir
Browse files Browse the repository at this point in the history
This patchset addresses multiple errors in normalizing_reader
implementation found during review.

I have decided to not make a clustering key full inside
before_key()/after_key() helpers. The reason is that for this they
would need schema to be passed as another parameter so existing
methods don't suit. OTOH, introducing new members for a class using
for testing purposes only seems an overkill.

* github.com/argenet/scylla.git projects/sstables-30/normalizing_reader_fixes/v1:
  range_tombstone: Add constructor accepting position_in_partition_views
    for range bounds.
  tests: Make sure range tombstone is properly split over rows with
    non-full keys.
  tests: Multiple fixes for draining and clearing range tombstones in
    normalizing_reader.
  • Loading branch information
tgrabiec committed Sep 27, 2018
2 parents 653fb37 + b74706a commit 78d9205
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 15 deletions.
5 changes: 5 additions & 0 deletions range_tombstone.hh
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ public:
range_tombstone(bound_view start, bound_view end, tombstone tomb)
: range_tombstone(start.prefix(), start.kind(), end.prefix(), end.kind(), std::move(tomb))
{ }

// Can be called only when both start and end are !is_static_row && !is_clustering_row().
range_tombstone(position_in_partition_view start, position_in_partition_view end, tombstone tomb)
: range_tombstone(start.as_start_bound_view(), end.as_end_bound_view(), tomb)
{}
range_tombstone(clustering_key_prefix&& start, clustering_key_prefix&& end, tombstone tomb)
: range_tombstone(std::move(start), bound_kind::incl_start, std::move(end), bound_kind::incl_end, std::move(tomb))
{ }
Expand Down
31 changes: 16 additions & 15 deletions tests/normalizing_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ future<> normalizing_reader::fill_buffer(db::timeout_clock::time_point timeout)
return do_until([this] { return is_buffer_full() || is_end_of_stream(); }, [this, timeout] {
return _rd.fill_buffer(timeout).then([this] {
position_in_partition::less_compare less{*_rd.schema()};
position_in_partition::equal_compare eq{*_rd.schema()};
while (!_rd.is_buffer_empty()) {
auto mf = _rd.pop_mutation_fragment();
if (mf.is_end_of_partition()) {
Expand All @@ -43,35 +42,35 @@ future<> normalizing_reader::fill_buffer(db::timeout_clock::time_point timeout)
_range_tombstones.apply(std::move(mf).as_range_tombstone());
continue;
} else if (mf.is_clustering_row()) {
const clustering_row& cr = mf.as_clustering_row();
auto ck = cr.key();
auto end_kind = clustering_key::make_full(*_schema, ck) ? bound_kind::excl_end : bound_kind::incl_end;
auto ck = mf.as_clustering_row().key();
clustering_key::make_full(*_rd.schema(), ck);
auto after_pos = position_in_partition::after_key(ck);
while (auto mfo = _range_tombstones.get_next(mf)) {
range_tombstone&& rt = std::move(*mfo).as_range_tombstone();
if (less(rt.end_position(), cr.position())
|| eq(rt.end_position(), position_in_partition::after_key(ck))) {
if (!less(after_pos, rt.end_position())) {
push_mutation_fragment(std::move(rt));
} else {
push_mutation_fragment(range_tombstone{
rt.start_bound(),
bound_view{ck, end_kind},
rt.tomb});

rt.trim_front(*_rd.schema(), position_in_partition::after_key(ck));
_range_tombstones.apply(std::move(rt));
break;
push_mutation_fragment(range_tombstone{rt.position(), after_pos, rt.tomb});
_range_tombstones.apply(range_tombstone{after_pos, rt.end_position(), rt.tomb});
}
}
}

push_mutation_fragment(std::move(mf));
}
_end_of_stream = _rd.is_end_of_stream() && _range_tombstones.empty();

if (_rd.is_end_of_stream()) {
while (auto mfo = _range_tombstones.get_next()) {
push_mutation_fragment(std::move(*mfo));
}
_end_of_stream = true;
}
});
});
}

void normalizing_reader::next_partition() {
_range_tombstones.reset();
clear_buffer_to_next_partition();
if (is_buffer_empty()) {
_end_of_stream = false;
Expand All @@ -80,12 +79,14 @@ void normalizing_reader::next_partition() {
}
future<> normalizing_reader::fast_forward_to(
const dht::partition_range& pr, db::timeout_clock::time_point timeout) {
_range_tombstones.reset();
clear_buffer();
_end_of_stream = false;
return _rd.fast_forward_to(pr, timeout);
}
future<> normalizing_reader::fast_forward_to(
position_range pr, db::timeout_clock::time_point timeout) {
_range_tombstones.forward_to(pr.start());
forward_buffer_to(pr.start());
_end_of_stream = false;
return _rd.fast_forward_to(std::move(pr), timeout);
Expand Down

0 comments on commit 78d9205

Please sign in to comment.