Skip to content

Commit

Permalink
Clean up position_in_partition.
Browse files Browse the repository at this point in the history
Introduce position_in_partition_view and use it in
position() method in mutation_fragment, range_tombstone,
static_row and clustering_row.
Clean up comparators in position_in_partition.

Signed-off-by: Piotr Jastrzebski <piotr@scylladb.com>
Message-Id: <c65293c71a6aa23cf930ed317fb63df1fdc34fd1.1477399763.git.piotr@scylladb.com>
  • Loading branch information
Piotr Jastrzebski authored and pdziepak committed Oct 25, 2016
1 parent cbaae2b commit 27726ce
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 104 deletions.
37 changes: 27 additions & 10 deletions partition_version.hh
Expand Up @@ -286,13 +286,28 @@ class partition_snapshot_reader : public streamed_mutation::impl, public MemoryA
};

class heap_compare {
position_in_partition::less_compare& _cmp;
rows_entry::compare _cmp;
public:
explicit heap_compare(position_in_partition::less_compare cmp) : _cmp(cmp) { }
explicit heap_compare(const schema& s) : _cmp(s) { }
bool operator()(const rows_position& a, const rows_position& b) {
return _cmp(*b._position, *a._position);
}
};
class rows_entry_compare {
position_in_partition::less_compare _cmp;
public:
explicit rows_entry_compare(const schema& s) : _cmp(s) { }
bool operator()(const rows_entry& a, const position_in_partition& b) {
position_in_partition_view a_view(position_in_partition_view::clustering_row_tag_t(),
a.key());
return _cmp(a_view, b);
}
bool operator()(const position_in_partition& a, const rows_entry& b) {
position_in_partition_view b_view(position_in_partition_view::clustering_row_tag_t(),
b.key());
return _cmp(a, b_view);
}
};
private:
// Keeps shared pointer to the container we read mutation from to make sure
// that its lifetime is appropriately extended.
Expand All @@ -303,8 +318,9 @@ private:
query::clustering_row_ranges::const_iterator _ck_range_end;
bool _in_ck_range = false;

position_in_partition::less_compare _cmp;
position_in_partition::equal_compare _eq;
rows_entry_compare _cmp;
clustering_key_prefix::equality _eq;
heap_compare _heap_cmp;

lw_shared_ptr<partition_snapshot> _snapshot;
stdx::optional<position_in_partition> _last_entry;
Expand Down Expand Up @@ -346,7 +362,7 @@ private:
}

_in_ck_range = true;
boost::range::make_heap(_clustering_rows, heap_compare(_cmp));
boost::range::make_heap(_clustering_rows, _heap_cmp);
}

void pop_clustering_row() {
Expand All @@ -355,7 +371,7 @@ private:
if (current._position == current._end) {
_clustering_rows.pop_back();
} else {
boost::range::push_heap(_clustering_rows, heap_compare(_cmp));
boost::range::push_heap(_clustering_rows, _heap_cmp);
}
}

Expand All @@ -381,16 +397,16 @@ private:
return mf;
}

boost::range::pop_heap(_clustering_rows, heap_compare(_cmp));
boost::range::pop_heap(_clustering_rows, _heap_cmp);
clustering_row result = *_clustering_rows.back()._position;
pop_clustering_row();
while (!_clustering_rows.empty() && _eq(*_clustering_rows.front()._position, result)) {
boost::range::pop_heap(_clustering_rows, heap_compare(_cmp));
while (!_clustering_rows.empty() && _eq(_clustering_rows.front()._position->key(), result.key())) {
boost::range::pop_heap(_clustering_rows, _heap_cmp);
auto& current = _clustering_rows.back();
result.apply(*_schema, *current._position);
pop_clustering_row();
}
_last_entry = result.position();
_last_entry = position_in_partition(result.position());
return mutation_fragment(std::move(result));
}
return _range_tombstones.get_next();
Expand Down Expand Up @@ -453,6 +469,7 @@ public:
, _ck_range_end(_ck_ranges.end())
, _cmp(*s)
, _eq(*s)
, _heap_cmp(*s)
, _snapshot(snp)
, _range_tombstones(*s)
, _lsa_region(region)
Expand Down
5 changes: 5 additions & 0 deletions range_tombstone.cc
Expand Up @@ -20,6 +20,7 @@
*/

#include "range_tombstone.hh"
#include "streamed_mutation.hh"

std::ostream& operator<<(std::ostream& out, const range_tombstone& rt) {
if (rt) {
Expand Down Expand Up @@ -48,6 +49,10 @@ stdx::optional<range_tombstone> range_tombstone::apply(const schema& s, range_to
return { };
}

position_in_partition_view range_tombstone::position() const {
return position_in_partition_view(position_in_partition_view::range_tombstone_tag_t(), start_bound());
}

void range_tombstone_accumulator::update_current_tombstone() {
_current_tombstone = boost::accumulate(_range_tombstones, _partition_tombstone, [] (tombstone t, const range_tombstone& rt) {
t.apply(rt.tomb);
Expand Down
3 changes: 3 additions & 0 deletions range_tombstone.hh
Expand Up @@ -32,6 +32,8 @@
namespace bi = boost::intrusive;
namespace stdx = std::experimental;

class position_in_partition_view;

/**
* Represents a ranged deletion operation. Can be empty.
*/
Expand Down Expand Up @@ -90,6 +92,7 @@ public:
const bound_view end_bound() const {
return bound_view(end, end_kind);
}
position_in_partition_view position() const;
bool empty() const {
return !bool(tomb);
}
Expand Down
10 changes: 5 additions & 5 deletions sstables/partition.cc
Expand Up @@ -388,7 +388,7 @@ class mp_row_consumer : public row_consumer {
proceed flush_if_needed(bool is_static, position_in_partition&& pos) {
position_in_partition::equal_compare eq(*_schema);
proceed ret = proceed::yes;
if (_in_progress && !eq(*_in_progress, pos)) {
if (_in_progress && !eq(_in_progress->position(), pos)) {
ret = _skip_clustering_row ? proceed::yes : proceed::no;
flush();
}
Expand Down Expand Up @@ -701,18 +701,18 @@ class sstable_streamed_mutation : public streamed_mutation::impl {
// If sstable uses promoted index it will repeat relevant range tombstones in
// each block. Do not emit these duplicates as they will break the guarantee
// that mutation fragment are produced in ascending order.
if (!_last_position || !_cmp(*mf, *_last_position)) {
_last_position = mf->position();
if (!_last_position || !_cmp(mf->position(), *_last_position)) {
_last_position = position_in_partition(mf->position());
_range_tombstones.apply(std::move(mf->as_range_tombstone()));
}
} else {
// mp_row_consumer may produce mutation_fragments in parts if they are
// interrupted by range tombstone duplicate. Make sure they are merged
// before emitting them.
_last_position = mf->position();
_last_position = position_in_partition(mf->position());
if (!_current_candidate) {
_current_candidate = std::move(mf);
} else if (_current_candidate && _eq(*_current_candidate, *mf)) {
} else if (_current_candidate && _eq(_current_candidate->position(), mf->position())) {
_current_candidate->apply(*_schema, std::move(*mf));
} else {
_next_candidate = std::move(mf);
Expand Down
34 changes: 9 additions & 25 deletions streamed_mutation.cc
Expand Up @@ -71,16 +71,6 @@ const clustering_key_prefix& mutation_fragment::key() const
return visit(get_key_visitor());
}

int mutation_fragment::bound_kind_weight() const {
assert(has_key());
struct get_bound_kind_weight {
int operator()(const clustering_row&) { return 0; }
int operator()(const range_tombstone& rt) { return weight(rt.start_kind); }
int operator()(...) { abort(); }
};
return visit(get_bound_kind_weight());
}

void mutation_fragment::apply(const schema& s, mutation_fragment&& mf)
{
assert(_kind == mf._kind);
Expand All @@ -99,16 +89,9 @@ void mutation_fragment::apply(const schema& s, mutation_fragment&& mf)
mf._data.reset();
}

position_in_partition mutation_fragment::position() const
position_in_partition_view mutation_fragment::position() const
{
struct get_position {
position_in_partition operator()(const static_row& sr) { return sr.position(); }
position_in_partition operator()(const clustering_row& cr) { return cr.position(); }
position_in_partition operator()(const range_tombstone& rt) {
return position_in_partition(position_in_partition::range_tombstone_tag_t(), rt.start_bound());
}
};
return visit(get_position());
return visit([] (auto& mf) { return mf.position(); });
}

std::ostream& operator<<(std::ostream& os, const streamed_mutation& sm) {
Expand Down Expand Up @@ -153,7 +136,7 @@ streamed_mutation streamed_mutation_from_mutation(mutation m)
}
}
mutation_fragment_opt read_next() {
if (_cr && (!_rt || _cmp(*_cr, *_rt))) {
if (_cr && (!_rt || _cmp(_cr->position(), _rt->position()))) {
auto cr = move_and_disengage(_cr);
prepare_next_clustering_row();
return cr;
Expand Down Expand Up @@ -241,7 +224,7 @@ class mutation_merger final : public streamed_mutation::impl {
}

position_in_partition::less_compare cmp(*_schema);
auto heap_compare = [&] (auto& a, auto& b) { return cmp(b.row, a.row); };
auto heap_compare = [&] (auto& a, auto& b) { return cmp(b.row.position(), a.row.position()); };

auto result = [&] {
auto rt = _deferred_tombstones.get_next(_readers.front().row);
Expand All @@ -256,7 +239,7 @@ class mutation_merger final : public streamed_mutation::impl {
}();

while (!_readers.empty()) {
if (cmp(result, _readers.front().row)) {
if (cmp(result.position(), _readers.front().row.position())) {
break;
}
boost::range::pop_heap(_readers, heap_compare);
Expand All @@ -277,7 +260,7 @@ class mutation_merger final : public streamed_mutation::impl {

void do_fill_buffer() {
position_in_partition::less_compare cmp(*_schema);
auto heap_compare = [&] (auto& a, auto& b) { return cmp(b.row, a.row); };
auto heap_compare = [&] (auto& a, auto& b) { return cmp(b.row.position(), a.row.position()); };

for (auto& rd : _next_readers) {
if (rd->is_buffer_empty()) {
Expand Down Expand Up @@ -360,15 +343,16 @@ mutation_fragment_opt range_tombstone_stream::do_get_next()
mutation_fragment_opt range_tombstone_stream::get_next(const rows_entry& re)
{
if (!_list.empty()) {
return !_cmp(re, _list.begin()->start_bound()) ? do_get_next() : mutation_fragment_opt();
position_in_partition_view view(position_in_partition_view::clustering_row_tag_t(), re.key());
return !_cmp(view, _list.begin()->position()) ? do_get_next() : mutation_fragment_opt();
}
return { };
}

mutation_fragment_opt range_tombstone_stream::get_next(const mutation_fragment& mf)
{
if (!_list.empty()) {
return !_cmp(mf, *_list.begin()) ? do_get_next() : mutation_fragment_opt();
return !_cmp(mf.position(), _list.begin()->position()) ? do_get_next() : mutation_fragment_opt();
}
return { };
}
Expand Down

0 comments on commit 27726ce

Please sign in to comment.