Skip to content

Commit

Permalink
Merge "Introduce compacting reader" from Botond
Browse files Browse the repository at this point in the history
"
Allow adding compacting to any reader pipeline. The intended users are
streaming and repair, with the goal to prevent wasting transfer
bandwidth with data that is purgeable.
No current user in the tree.

Tests: unit(dev), mutation_reader_test.compacting_reader_*(debug)
"

* 'compacting-reader/v3' of https://github.com/denesb/scylla:
  test: boost/mutation_reader_test: add unit test for compacting_reader
  test: lib/flat_mutation_reader_assertions: be more lenient about empty mutations
  test: lib/mutation_source_test: make data compaction friendly
  test: random_mutation_generator: add generate_uncompactable mode
  mutation_reader: introduce compacting_reader
  • Loading branch information
avikivity committed Mar 16, 2020
2 parents 35d95d6 + 837b79c commit 342c967
Show file tree
Hide file tree
Showing 6 changed files with 325 additions and 31 deletions.
153 changes: 153 additions & 0 deletions mutation_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include <seastar/core/future-util.hh>
#include "flat_mutation_reader.hh"
#include "schema_registry.hh"
#include "mutation_compactor.hh"


static constexpr size_t merger_small_vector_size = 4;
Expand Down Expand Up @@ -1840,3 +1841,155 @@ std::pair<flat_mutation_reader, queue_reader_handle> make_queue_reader(schema_pt
auto handle = queue_reader_handle(*impl);
return {flat_mutation_reader(std::move(impl)), std::move(handle)};
}

namespace {

class compacting_reader : public flat_mutation_reader::impl {
friend class compact_mutation_state<emit_only_live_rows::no, compact_for_sstables::yes>;

private:
flat_mutation_reader _reader;
compact_mutation_state<emit_only_live_rows::no, compact_for_sstables::yes> _compactor;
noop_compacted_fragments_consumer _gc_consumer;

// Uncompacted stream
partition_start _last_uncompacted_partition_start;
mutation_fragment::kind _last_uncompacted_kind = mutation_fragment::kind::partition_end;

// Compacted stream
bool _has_compacted_partition_start = false;
bool _ignore_partition_end = false;

private:
void maybe_push_partition_start() {
if (_has_compacted_partition_start) {
push_mutation_fragment(std::move(_last_uncompacted_partition_start));
_has_compacted_partition_start = false;
}
}
void maybe_inject_partition_end() {
// The compactor needs a valid stream, but downstream doesn't care about
// the injected partition end, so ignore it.
if (_last_uncompacted_kind != mutation_fragment::kind::partition_end) {
_ignore_partition_end = true;
_compactor.consume_end_of_partition(*this, _gc_consumer);
_ignore_partition_end = false;
}
}
void consume_new_partition(const dht::decorated_key& dk) {
_has_compacted_partition_start = true;
// We need to reset the partition's tombstone here. If the tombstone is
// compacted away, `consume(tombstone)` below is simply not called. If
// it is not compacted away, `consume(tombstone)` below will restore it.
_last_uncompacted_partition_start.partition_tombstone() = {};
}
void consume(tombstone t) {
_last_uncompacted_partition_start.partition_tombstone() = t;
maybe_push_partition_start();
}
stop_iteration consume(static_row&& sr, tombstone, bool) {
maybe_push_partition_start();
push_mutation_fragment(std::move(sr));
return stop_iteration::no;
}
stop_iteration consume(clustering_row&& cr, row_tombstone, bool) {
maybe_push_partition_start();
push_mutation_fragment(std::move(cr));
return stop_iteration::no;
}
stop_iteration consume(range_tombstone&& rt) {
maybe_push_partition_start();
push_mutation_fragment(std::move(rt));
return stop_iteration::no;
}
stop_iteration consume_end_of_partition() {
maybe_push_partition_start();
if (!_ignore_partition_end) {
push_mutation_fragment(partition_end{});
}
return stop_iteration::no;
}
void consume_end_of_stream() {
}

public:
compacting_reader(flat_mutation_reader source, gc_clock::time_point compaction_time,
std::function<api::timestamp_type(const dht::decorated_key&)> get_max_purgeable)
: impl(source.schema())
, _reader(std::move(source))
, _compactor(*_schema, compaction_time, get_max_purgeable)
, _last_uncompacted_partition_start(dht::decorated_key(dht::minimum_token(), partition_key::make_empty()), tombstone{}) {
}
virtual future<> fill_buffer(db::timeout_clock::time_point timeout) override {
return do_until([this] { return is_end_of_stream() || is_buffer_full(); }, [this, timeout] {
return _reader.fill_buffer(timeout).then([this, timeout] {
if (_reader.is_buffer_empty()) {
_end_of_stream = _reader.is_end_of_stream();
}
// It is important to not consume more than we actually need.
// Doing so leads to corner cases around `next_partition()`. The
// fragments consumed after our buffer is full might not be
// emitted by the compactor, so on a following `next_partition()`
// call we won't be able to determine whether we are at a
// partition boundary or not and thus whether we need to forward
// it to the underlying reader or not.
// This problem doesn't exist when we want more fragments, in this
// case we'll keep reading until the compactor emits something or
// we read EOS, and thus we'll know where we are.
while (!_reader.is_buffer_empty() && !is_buffer_full()) {
auto mf = _reader.pop_mutation_fragment();
_last_uncompacted_kind = mf.mutation_fragment_kind();
switch (mf.mutation_fragment_kind()) {
case mutation_fragment::kind::static_row:
_compactor.consume(std::move(mf).as_static_row(), *this, _gc_consumer);
break;
case mutation_fragment::kind::clustering_row:
_compactor.consume(std::move(mf).as_clustering_row(), *this, _gc_consumer);
break;
case mutation_fragment::kind::range_tombstone:
_compactor.consume(std::move(mf).as_range_tombstone(), *this, _gc_consumer);
break;
case mutation_fragment::kind::partition_start:
_last_uncompacted_partition_start = std::move(mf).as_partition_start();
_compactor.consume_new_partition(_last_uncompacted_partition_start.key());
if (_last_uncompacted_partition_start.partition_tombstone()) {
_compactor.consume(_last_uncompacted_partition_start.partition_tombstone(), *this, _gc_consumer);
}
break;
case mutation_fragment::kind::partition_end:
_compactor.consume_end_of_partition(*this, _gc_consumer);
break;
}
}
});
});
}
virtual void next_partition() override {
clear_buffer_to_next_partition();
if (!is_buffer_empty()) {
return;
}
_end_of_stream = false;
maybe_inject_partition_end();
_reader.next_partition();
}
virtual future<> fast_forward_to(const dht::partition_range& pr, db::timeout_clock::time_point timeout) override {
clear_buffer();
_end_of_stream = false;
maybe_inject_partition_end();
return _reader.fast_forward_to(pr, timeout);
}
virtual future<> fast_forward_to(position_range pr, db::timeout_clock::time_point timeout) override {
throw std::bad_function_call();
}
virtual size_t buffer_size() const override {
return flat_mutation_reader::impl::buffer_size() + _reader.buffer_size();
}
};

} // anonymous namespace

flat_mutation_reader make_compacting_reader(flat_mutation_reader source, gc_clock::time_point compaction_time,
std::function<api::timestamp_type(const dht::decorated_key&)> get_max_purgeable) {
return make_flat_mutation_reader<compacting_reader>(std::move(source), compaction_time, get_max_purgeable);
}
18 changes: 18 additions & 0 deletions mutation_reader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -530,3 +530,21 @@ public:
};

std::pair<flat_mutation_reader, queue_reader_handle> make_queue_reader(schema_ptr s);

/// Creates a compacting reader.
///
/// The compaction is done with a \ref mutation_compactor, using compaction-type
/// compaction (`compact_for_sstables::yes`).
///
/// \param source the reader whose output to compact.
///
/// Params \c compaction_time and \c get_max_purgeable are forwarded to the
/// \ref mutation_compactor instance.
///
/// Inter-partition forwarding: `next_partition()` and
/// `fast_forward_to(const dht::partition_range&)` is supported if the source
/// reader supports it
/// Intra-partition forwarding: `fast_forward_to(position_range)` is *not*
/// supported.
flat_mutation_reader make_compacting_reader(flat_mutation_reader source, gc_clock::time_point compaction_time,
std::function<api::timestamp_type(const dht::decorated_key&)> get_max_purgeable);
86 changes: 86 additions & 0 deletions test/boost/mutation_reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2862,3 +2862,89 @@ SEASTAR_THREAD_TEST_CASE(test_queue_reader) {
}
}
}

SEASTAR_THREAD_TEST_CASE(test_compacting_reader_as_mutation_source) {
auto make_populate = [] (bool single_fragment_buffer) {
return [single_fragment_buffer] (schema_ptr s, const std::vector<mutation>& mutations, gc_clock::time_point query_time) mutable {
auto mt = make_lw_shared<memtable>(s);
for (auto& mut : mutations) {
mt->apply(mut);
}
return mutation_source([=] (
schema_ptr s,
reader_permit,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd_sm,
mutation_reader::forwarding fwd_mr) mutable {
auto source = mt->make_flat_reader(s, range, slice, pc, std::move(trace_state), streamed_mutation::forwarding::no, fwd_mr);
auto mr = make_compacting_reader(std::move(source), query_time, [] (const dht::decorated_key&) { return api::min_timestamp; });
if (single_fragment_buffer) {
mr.set_max_buffer_size(1);
}
if (fwd_sm == streamed_mutation::forwarding::yes) {
return make_forwardable(std::move(mr));
}
return mr;
});
};
};

BOOST_TEST_MESSAGE("run_mutation_source_tests(single_fragment_buffer=false)");
run_mutation_source_tests(make_populate(false));
BOOST_TEST_MESSAGE("run_mutation_source_tests(single_fragment_buffer=true)");
run_mutation_source_tests(make_populate(true));
}

// Check that next_partition() in the middle of a partition works properly.
SEASTAR_THREAD_TEST_CASE(test_compacting_reader_next_partition) {
simple_schema ss(simple_schema::with_static::no);
const auto& schema = *ss.schema();
std::deque<mutation_fragment> expected;

auto mr = [&] () {
const size_t buffer_size = 1024;
std::deque<mutation_fragment> mfs;
auto dk0 = ss.make_pkey(0);
auto dk1 = ss.make_pkey(1);

mfs.emplace_back(partition_start(dk0, tombstone{}));

auto i = 0;
size_t mfs_size = 0;
while (mfs_size <= buffer_size) {
mfs.emplace_back(ss.make_row(ss.make_ckey(i++), "v"));
mfs_size += mfs.back().memory_usage(schema);
}
mfs.emplace_back(partition_end{});

mfs.emplace_back(partition_start(dk1, tombstone{}));
mfs.emplace_back(ss.make_row(ss.make_ckey(0), "v"));
mfs.emplace_back(partition_end{});

for (const auto& mf : mfs) {
expected.emplace_back(*ss.schema(), mf);
}

auto mr = make_compacting_reader(make_flat_mutation_reader_from_fragments(ss.schema(), std::move(mfs)),
gc_clock::now(), [] (const dht::decorated_key&) { return api::min_timestamp; });
mr.set_max_buffer_size(buffer_size);

return mr;
}();

auto reader_assertions = assert_that(std::move(mr));

reader_assertions
.produces(schema, expected[0]) // partition start
.produces(schema, expected[1]) // first row
.next_partition();

auto it = expected.end() - 3;
while (it != expected.end()) {
reader_assertions.produces(schema, *it++);
}
reader_assertions.produces_end_of_stream();
}
4 changes: 4 additions & 0 deletions test/lib/flat_mutation_reader_assertions.hh
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,10 @@ public:
flat_reader_assertions& produces_compacted(const mutation& m, gc_clock::time_point query_time,
const std::optional<query::clustering_row_ranges>& ck_ranges = {}) {
auto mo = read_mutation_from_flat_mutation_reader(_reader, db::no_timeout).get0();
// If the passed in mutation is empty, allow for the reader to produce an empty or no partition.
if (m.partition().empty() && !mo) {
return *this;
}
BOOST_REQUIRE(bool(mo));
memory::disable_failure_guard dfg;
mutation got = *mo;
Expand Down

0 comments on commit 342c967

Please sign in to comment.