diff --git a/cache_streamed_mutation.hh b/cache_streamed_mutation.hh
new file mode 100644
index 000000000000..dfb579ca3d9e
--- /dev/null
+++ b/cache_streamed_mutation.hh
@@ -0,0 +1,482 @@
+/*
+ * Copyright (C) 2017 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see .
+ */
+
+#pragma once
+
+#include
+#include "row_cache.hh"
+#include "mutation_reader.hh"
+#include "streamed_mutation.hh"
+#include "partition_version.hh"
+#include "utils/logalloc.hh"
+#include "query-request.hh"
+#include "partition_snapshot_reader.hh"
+#include "partition_snapshot_row_cursor.hh"
+#include "read_context.hh"
+
+namespace cache {
+
+class lsa_manager {
+ row_cache& _cache;
+public:
+ lsa_manager(row_cache& cache) : _cache(cache) { }
+ template
+ decltype(auto) run_in_read_section(const Func& func) {
+ return _cache._read_section(_cache._tracker.region(), [&func] () {
+ return with_linearized_managed_bytes([&func] () {
+ return func();
+ });
+ });
+ }
+ template
+ decltype(auto) run_in_update_section(const Func& func) {
+ return _cache._update_section(_cache._tracker.region(), [&func] () {
+ return with_linearized_managed_bytes([&func] () {
+ return func();
+ });
+ });
+ }
+ template
+ void run_in_update_section_with_allocator(Func&& func) {
+ return _cache._update_section(_cache._tracker.region(), [this, &func] () {
+ return with_linearized_managed_bytes([this, &func] () {
+ return with_allocator(_cache._tracker.region().allocator(), [this, &func] () mutable {
+ return func();
+ });
+ });
+ });
+ }
+ logalloc::region& region() { return _cache._tracker.region(); }
+ logalloc::allocating_section& read_section() { return _cache._read_section; }
+};
+
+class cache_streamed_mutation final : public streamed_mutation::impl {
+ lw_shared_ptr _snp;
+ position_in_partition::tri_compare _position_cmp;
+
+ query::clustering_key_filter_ranges _ck_ranges;
+ query::clustering_row_ranges::const_iterator _ck_ranges_curr;
+ query::clustering_row_ranges::const_iterator _ck_ranges_end;
+
+ lsa_manager _lsa_manager;
+
+ stdx::optional _last_row_key;
+
+ // We need to be prepared that we may get overlapping and out of order
+ // range tombstones. We must emit fragments with strictly monotonic positions,
+ // so we can't just trim such tombstones to the position of the last fragment.
+ // To solve that, range tombstones are accumulated first in a range_tombstone_stream
+ // and emitted once we have a fragment with a larger position.
+ range_tombstone_stream _tombstones;
+
+ // Holds the lower bound of a position range which hasn't been processed yet.
+ // Only fragments with positions < _lower_bound have been emitted.
+ position_in_partition _lower_bound;
+ position_in_partition_view _upper_bound;
+
+ bool _static_row_done = false;
+ bool _reading_underlying = false;
+ lw_shared_ptr _read_context;
+ partition_snapshot_row_cursor _next_row;
+ bool _next_row_in_range = false;
+
+ future<> do_fill_buffer();
+ future<> copy_from_cache_to_buffer();
+ future<> process_static_row();
+ void move_to_end();
+ future<> move_to_next_range();
+ future<> move_to_current_range();
+ future<> move_to_next_entry();
+ // Emits all delayed range tombstones with positions smaller than upper_bound.
+ void drain_tombstones(position_in_partition_view upper_bound);
+ // Emits all delayed range tombstones.
+ void drain_tombstones();
+ void add_to_buffer(const partition_snapshot_row_cursor&);
+ void add_to_buffer(clustering_row&&);
+ void add_to_buffer(range_tombstone&&);
+ void add_to_buffer(mutation_fragment&&);
+ future<> read_from_underlying();
+ future<> start_reading_from_underlying();
+ bool after_current_range(position_in_partition_view position);
+ bool can_populate() const;
+ void maybe_update_continuity();
+ void maybe_add_to_cache(const mutation_fragment& mf);
+ void maybe_add_to_cache(const clustering_row& cr);
+ void maybe_add_to_cache(const range_tombstone& rt);
+ void maybe_add_to_cache(const static_row& sr);
+ void maybe_set_static_row_continuous();
+public:
+ cache_streamed_mutation(schema_ptr s,
+ dht::decorated_key dk,
+ query::clustering_key_filter_ranges&& crr,
+ lw_shared_ptr ctx,
+ lw_shared_ptr snp,
+ row_cache& cache)
+ : streamed_mutation::impl(std::move(s), dk, snp->partition_tombstone())
+ , _snp(std::move(snp))
+ , _position_cmp(*_schema)
+ , _ck_ranges(std::move(crr))
+ , _ck_ranges_curr(_ck_ranges.begin())
+ , _ck_ranges_end(_ck_ranges.end())
+ , _lsa_manager(cache)
+ , _tombstones(*_schema)
+ , _lower_bound(position_in_partition::before_all_clustered_rows())
+ , _upper_bound(position_in_partition_view::before_all_clustered_rows())
+ , _read_context(std::move(ctx))
+ , _next_row(*_schema, cache._tracker.region(), *_snp)
+ { }
+ cache_streamed_mutation(const cache_streamed_mutation&) = delete;
+ cache_streamed_mutation(cache_streamed_mutation&&) = delete;
+ virtual future<> fill_buffer() override;
+ virtual ~cache_streamed_mutation() {
+ maybe_merge_versions(_snp, _lsa_manager.region(), _lsa_manager.read_section());
+ }
+};
+
+inline
+future<> cache_streamed_mutation::process_static_row() {
+ if (_snp->version()->partition().static_row_continuous()) {
+ row sr = _snp->static_row();
+ if (!sr.empty()) {
+ push_mutation_fragment(mutation_fragment(static_row(std::move(sr))));
+ }
+ return make_ready_future<>();
+ } else {
+ return _read_context->get_next_fragment().then([this] (mutation_fragment_opt&& sr) {
+ if (sr) {
+ assert(sr->is_static_row());
+ maybe_add_to_cache(sr->as_static_row());
+ push_mutation_fragment(std::move(*sr));
+ }
+ maybe_set_static_row_continuous();
+ });
+ }
+}
+
+inline
+future<> cache_streamed_mutation::fill_buffer() {
+ if (!_static_row_done) {
+ _static_row_done = true;
+ return process_static_row().then([this] {
+ return _lsa_manager.run_in_read_section([this] {
+ return move_to_current_range();
+ }).then([this] {
+ return fill_buffer();
+ });
+ });
+ }
+ return do_until([this] { return _end_of_stream || is_buffer_full(); }, [this] {
+ return do_fill_buffer();
+ });
+}
+
+inline
+future<> cache_streamed_mutation::do_fill_buffer() {
+ if (_reading_underlying) {
+ return read_from_underlying();
+ }
+ return _lsa_manager.run_in_read_section([this] {
+ auto same_pos = _next_row.maybe_refresh();
+ // FIXME: If continuity changed anywhere between _lower_bound and _next_row.position()
+ // we need to redo the lookup with _lower_bound. There is no eviction yet, so not yet a problem.
+ assert(same_pos);
+ while (!is_buffer_full() && !_end_of_stream && !_reading_underlying) {
+ future<> f = copy_from_cache_to_buffer();
+ if (!f.available() || need_preempt()) {
+ return f;
+ }
+ }
+ return make_ready_future<>();
+ });
+}
+
+inline
+future<> cache_streamed_mutation::read_from_underlying() {
+ return do_until([this] { return !_reading_underlying || is_buffer_full(); }, [this] {
+ return _read_context->get_next_fragment().then([this] (auto&& mfopt) {
+ if (!mfopt) {
+ _reading_underlying = false;
+ return _lsa_manager.run_in_update_section([this] {
+ auto same_pos = _next_row.maybe_refresh();
+ assert(same_pos); // FIXME: handle eviction
+ if (_next_row_in_range) {
+ this->maybe_update_continuity();
+ this->add_to_buffer(_next_row);
+ return this->move_to_next_entry();
+ } else {
+ if (no_clustering_row_between(*_schema, _upper_bound, _next_row.position())) {
+ this->maybe_update_continuity();
+ } else {
+ // FIXME: Insert dummy entry at _upper_bound.
+ }
+ return this->move_to_next_range();
+ }
+ });
+ } else {
+ this->maybe_add_to_cache(*mfopt);
+ this->add_to_buffer(std::move(*mfopt));
+ return make_ready_future<>();
+ }
+ });
+ });
+}
+
+inline
+void cache_streamed_mutation::maybe_update_continuity() {
+ if (can_populate() && _next_row.is_in_latest_version()) {
+ if (_last_row_key) {
+ if (_next_row.previous_row_in_latest_version_has_key(*_last_row_key)) {
+ _next_row.set_continuous(true);
+ }
+ } else if (!_ck_ranges_curr->start()) {
+ _next_row.set_continuous(true);
+ }
+ }
+}
+
+inline
+void cache_streamed_mutation::maybe_add_to_cache(const mutation_fragment& mf) {
+ if (mf.is_range_tombstone()) {
+ maybe_add_to_cache(mf.as_range_tombstone());
+ } else {
+ assert(mf.is_clustering_row());
+ const clustering_row& cr = mf.as_clustering_row();
+ maybe_add_to_cache(cr);
+ }
+}
+
+inline
+void cache_streamed_mutation::maybe_add_to_cache(const clustering_row& cr) {
+ if (!can_populate()) {
+ return;
+ }
+ _lsa_manager.run_in_update_section_with_allocator([this, &cr] {
+ mutation_partition& mp = _snp->version()->partition();
+ rows_entry::compare less(*_schema);
+
+ // FIXME: If _next_row is up to date, but latest version doesn't have iterator in
+ // current row (could be far away, so we'd do this often), then this will do
+ // the lookup in mp. This is not necessary, because _next_row has iterators for
+ // next rows in each version, even if they're not part of the current row.
+ // They're currently buried in the heap, but you could keep a vector of
+ // iterators per each version in addition to the heap.
+ auto new_entry = alloc_strategy_unique_ptr(
+ current_allocator().construct(cr.key(), cr.tomb(), cr.marker(), cr.cells()));
+ new_entry->set_continuous(false);
+ auto it = _next_row.has_up_to_date_row_from_latest_version()
+ ? _next_row.get_iterator_in_latest_version() : mp.clustered_rows().lower_bound(cr.key(), less);
+ auto insert_result = mp.clustered_rows().insert_check(it, *new_entry, less);
+ if (insert_result.second) {
+ new_entry.release();
+ }
+ it = insert_result.first;
+
+ rows_entry& e = *it;
+ if (_last_row_key) {
+ if (it == mp.clustered_rows().begin()) {
+ // FIXME: check whether entry for _last_row_key is in older versions and if so set
+ // continuity to true.
+ } else {
+ auto prev_it = it;
+ --prev_it;
+ clustering_key_prefix::tri_compare tri_comp(*_schema);
+ if (tri_comp(*_last_row_key, prev_it->key()) == 0) {
+ e.set_continuous(true);
+ }
+ }
+ } else if (!_ck_ranges_curr->start()) {
+ e.set_continuous(true);
+ } else {
+ // FIXME: Insert dummy entry at _ck_ranges_curr->start()
+ }
+ });
+}
+
+inline
+bool cache_streamed_mutation::after_current_range(position_in_partition_view p) {
+ return _position_cmp(p, _upper_bound) >= 0;
+}
+
+inline
+future<> cache_streamed_mutation::start_reading_from_underlying() {
+ _reading_underlying = true;
+ auto end = _next_row_in_range ? position_in_partition(_next_row.position())
+ : position_in_partition(_upper_bound);
+ return _read_context->fast_forward_to(position_range{_lower_bound, std::move(end)});
+}
+
+inline
+future<> cache_streamed_mutation::copy_from_cache_to_buffer() {
+ position_in_partition_view next_lower_bound = _next_row.dummy() ? _next_row.position() : position_in_partition_view::after_key(_next_row.key());
+ for (auto&& rts : _snp->range_tombstones(*_schema, _lower_bound, _next_row_in_range ? next_lower_bound : _upper_bound)) {
+ add_to_buffer(std::move(rts));
+ if (is_buffer_full()) {
+ return make_ready_future<>();
+ }
+ }
+ if (_next_row_in_range) {
+ add_to_buffer(_next_row);
+ return move_to_next_entry();
+ } else {
+ return move_to_next_range();
+ }
+}
+
+inline
+void cache_streamed_mutation::move_to_end() {
+ drain_tombstones();
+ _end_of_stream = true;
+}
+
+inline
+future<> cache_streamed_mutation::move_to_next_range() {
+ ++_ck_ranges_curr;
+ if (_ck_ranges_curr == _ck_ranges_end) {
+ move_to_end();
+ return make_ready_future<>();
+ } else {
+ return move_to_current_range();
+ }
+}
+
+inline
+future<> cache_streamed_mutation::move_to_current_range() {
+ _last_row_key = std::experimental::nullopt;
+ _lower_bound = position_in_partition::for_range_start(*_ck_ranges_curr);
+ _upper_bound = position_in_partition_view::for_range_end(*_ck_ranges_curr);
+ auto complete_until_next = _next_row.advance_to(_lower_bound) || _next_row.continuous();
+ _next_row_in_range = !after_current_range(_next_row.position());
+ if (!complete_until_next) {
+ return start_reading_from_underlying();
+ }
+ return make_ready_future<>();
+}
+
+// _next_row must be inside the range.
+inline
+future<> cache_streamed_mutation::move_to_next_entry() {
+ if (no_clustering_row_between(*_schema, _next_row.position(), _upper_bound)) {
+ return move_to_next_range();
+ } else {
+ if (!_next_row.next()) {
+ move_to_end();
+ return make_ready_future<>();
+ }
+ _next_row_in_range = !after_current_range(_next_row.position());
+ if (!_next_row.continuous()) {
+ return start_reading_from_underlying();
+ }
+ return make_ready_future<>();
+ }
+}
+
+inline
+void cache_streamed_mutation::drain_tombstones(position_in_partition_view pos) {
+ while (auto mfo = _tombstones.get_next(pos)) {
+ push_mutation_fragment(std::move(*mfo));
+ }
+}
+
+inline
+void cache_streamed_mutation::drain_tombstones() {
+ while (auto mfo = _tombstones.get_next()) {
+ push_mutation_fragment(std::move(*mfo));
+ }
+}
+
+inline
+void cache_streamed_mutation::add_to_buffer(mutation_fragment&& mf) {
+ if (mf.is_clustering_row()) {
+ add_to_buffer(std::move(std::move(mf).as_clustering_row()));
+ } else {
+ assert(mf.is_range_tombstone());
+ add_to_buffer(std::move(mf).as_range_tombstone());
+ }
+}
+
+inline
+void cache_streamed_mutation::add_to_buffer(const partition_snapshot_row_cursor& row) {
+ if (!row.dummy()) {
+ add_to_buffer(row.row());
+ }
+}
+
+inline
+void cache_streamed_mutation::add_to_buffer(clustering_row&& row) {
+ drain_tombstones(row.position());
+ _last_row_key = row.key();
+ _lower_bound = position_in_partition::after_key(row.key());
+ push_mutation_fragment(std::move(row));
+}
+
+inline
+void cache_streamed_mutation::add_to_buffer(range_tombstone&& rt) {
+ // This guarantees that rt starts after any emitted clustering_row
+ if (!rt.trim_front(*_schema, _lower_bound)) {
+ return;
+ }
+ _lower_bound = position_in_partition(rt.position());
+ _tombstones.apply(std::move(rt));
+ drain_tombstones(_lower_bound);
+}
+
+inline
+void cache_streamed_mutation::maybe_add_to_cache(const range_tombstone& rt) {
+ if (can_populate()) {
+ _lsa_manager.run_in_update_section_with_allocator([&] {
+ _snp->version()->partition().apply_row_tombstone(*_schema, rt);
+ });
+ }
+}
+
+inline
+void cache_streamed_mutation::maybe_add_to_cache(const static_row& sr) {
+ if (can_populate()) {
+ _lsa_manager.run_in_update_section_with_allocator([&] {
+ _snp->version()->partition().static_row().apply(*_schema, column_kind::static_column, sr.cells());
+ });
+ }
+}
+
+inline
+void cache_streamed_mutation::maybe_set_static_row_continuous() {
+ if (can_populate()) {
+ _snp->version()->partition().set_static_row_continuous(true);
+ }
+}
+
+inline
+bool cache_streamed_mutation::can_populate() const {
+ return _snp->at_latest_version() && _read_context->cache().phase_of(_read_context->key()) == _read_context->phase();
+}
+
+} // namespace cache
+
+inline streamed_mutation make_cache_streamed_mutation(schema_ptr s,
+ dht::decorated_key dk,
+ query::clustering_key_filter_ranges crr,
+ row_cache& cache,
+ lw_shared_ptr ctx,
+ lw_shared_ptr snp)
+{
+ return make_streamed_mutation(
+ std::move(s), std::move(dk), std::move(crr), std::move(ctx), std::move(snp), cache);
+}
diff --git a/clustering_bounds_comparator.hh b/clustering_bounds_comparator.hh
index f55d6a2e520b..61445e96f61d 100644
--- a/clustering_bounds_comparator.hh
+++ b/clustering_bounds_comparator.hh
@@ -54,8 +54,8 @@ static inline bound_kind flip_bound_kind(bound_kind bk)
}
class bound_view {
- const static thread_local clustering_key empty_prefix;
public:
+ const static thread_local clustering_key empty_prefix;
const clustering_key_prefix& prefix;
bound_kind kind;
bound_view(const clustering_key_prefix& prefix, bound_kind kind)
diff --git a/configure.py b/configure.py
index 859ae4ce4007..d5edf1e93326 100755
--- a/configure.py
+++ b/configure.py
@@ -184,6 +184,8 @@ def endswith(self, end):
'tests/perf/perf_cql_parser',
'tests/perf/perf_simple_query',
'tests/perf/perf_fast_forward',
+ 'tests/cache_streamed_mutation_test',
+ 'tests/row_cache_stress_test',
'tests/memory_footprint',
'tests/perf/perf_sstable',
'tests/cql_query_test',
@@ -625,6 +627,7 @@ def endswith(self, end):
'tests/message',
'tests/perf/perf_simple_query',
'tests/perf/perf_fast_forward',
+ 'tests/row_cache_stress_test',
'tests/memory_footprint',
'tests/gossip',
'tests/perf/perf_sstable',
diff --git a/converting_mutation_partition_applier.hh b/converting_mutation_partition_applier.hh
index d06228ccda36..58799887770d 100644
--- a/converting_mutation_partition_applier.hh
+++ b/converting_mutation_partition_applier.hh
@@ -22,6 +22,7 @@
#pragma once
#include "mutation_partition_view.hh"
+#include "mutation_partition.hh"
#include "schema.hh"
// Mutation partition visitor which applies visited data into
@@ -37,12 +38,12 @@ private:
static bool is_compatible(const column_definition& new_def, const data_type& old_type, column_kind kind) {
return ::is_compatible(new_def.kind, kind) && new_def.type->is_value_compatible_with(*old_type);
}
- void accept_cell(row& dst, column_kind kind, const column_definition& new_def, const data_type& old_type, atomic_cell_view cell) {
+ static void accept_cell(row& dst, column_kind kind, const column_definition& new_def, const data_type& old_type, atomic_cell_view cell) {
if (is_compatible(new_def, old_type, kind) && cell.timestamp() > new_def.dropped_at()) {
dst.apply(new_def, atomic_cell_or_collection(cell));
}
}
- void accept_cell(row& dst, column_kind kind, const column_definition& new_def, const data_type& old_type, collection_mutation_view cell) {
+ static void accept_cell(row& dst, column_kind kind, const column_definition& new_def, const data_type& old_type, collection_mutation_view cell) {
if (!is_compatible(new_def, old_type, kind)) {
return;
}
@@ -94,8 +95,8 @@ public:
_p.apply_row_tombstone(_p_schema, rt);
}
- virtual void accept_row(clustering_key_view key, const row_tombstone& deleted_at, const row_marker& rm) override {
- deletable_row& r = _p.clustered_row(_p_schema, key);
+ virtual void accept_row(position_in_partition_view key, const row_tombstone& deleted_at, const row_marker& rm, is_dummy dummy, is_continuous continuous) override {
+ deletable_row& r = _p.clustered_row(_p_schema, key, dummy, continuous);
r.apply(rm);
r.apply(deleted_at);
_current_row = &r;
@@ -116,4 +117,14 @@ public:
accept_cell(_current_row->cells(), column_kind::regular_column, *def, col.type(), collection);
}
}
+
+ // Appends the cell to dst upgrading it to the new schema.
+ // Cells must have monotonic names.
+ static void append_cell(row& dst, column_kind kind, const column_definition& new_def, const data_type& old_type, const atomic_cell_or_collection& cell) {
+ if (new_def.is_atomic()) {
+ accept_cell(dst, kind, new_def, old_type, cell.as_atomic_cell());
+ } else {
+ accept_cell(dst, kind, new_def, old_type, cell.as_collection_mutation());
+ }
+ }
};
diff --git a/cql3/statements/batch_statement.cc b/cql3/statements/batch_statement.cc
index fac623ddb06a..248e8b7d07d3 100644
--- a/cql3/statements/batch_statement.cc
+++ b/cql3/statements/batch_statement.cc
@@ -233,7 +233,7 @@ void batch_statement::verify_batch_size(const std::vector& mutations)
size += v.data.size();
}
void accept_row_tombstone(const range_tombstone&) override {}
- void accept_row(clustering_key_view, const row_tombstone&, const row_marker&) override {}
+ void accept_row(position_in_partition_view, const row_tombstone&, const row_marker&, is_dummy, is_continuous) override {}
void accept_row_cell(column_id, atomic_cell_view v) override {
size += v.value().size();
}
diff --git a/database.cc b/database.cc
index 98bda7f76e63..8d7b1011ea72 100644
--- a/database.cc
+++ b/database.cc
@@ -144,7 +144,7 @@ column_family::column_family(schema_ptr schema, config config, db::commitlog* cl
, _streaming_memtables(_config.enable_disk_writes ? make_streaming_memtable_list() : make_memory_only_memtable_list())
, _compaction_strategy(make_compaction_strategy(_schema->compaction_strategy(), _schema->compaction_strategy_options()))
, _sstables(make_lw_shared(_compaction_strategy.make_sstable_set(_schema)))
- , _cache(_schema, sstables_as_mutation_source(), global_cache_tracker(), _config.max_cached_partition_size_in_bytes)
+ , _cache(_schema, sstables_as_snapshot_source(), global_cache_tracker())
, _commitlog(cl)
, _compaction_manager(compaction_manager)
, _flush_queue(std::make_unique())
@@ -183,7 +183,24 @@ column_family::sstables_as_mutation_source() {
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
- return make_sstable_reader(std::move(s), r, slice, pc, std::move(trace_state), fwd, fwd_mr);
+ return make_sstable_reader(std::move(s), _sstables, r, slice, pc, std::move(trace_state), fwd, fwd_mr);
+ });
+}
+
+snapshot_source
+column_family::sstables_as_snapshot_source() {
+ return snapshot_source([this] () {
+ // FIXME: Will keep sstables on disk until next memtable flush. Make compaction force cache refresh.
+ auto sst_set = _sstables;
+ return mutation_source([this, sst_set = std::move(sst_set)] (schema_ptr s,
+ const dht::partition_range& r,
+ const query::partition_slice& slice,
+ const io_priority_class& pc,
+ tracing::trace_state_ptr trace_state,
+ streamed_mutation::forwarding fwd,
+ mutation_reader::forwarding fwd_mr) {
+ return make_sstable_reader(std::move(s), sst_set, r, slice, pc, std::move(trace_state), fwd, fwd_mr);
+ });
});
}
@@ -529,6 +546,7 @@ class single_key_sstable_reader final : public mutation_reader::impl {
mutation_reader
column_family::make_sstable_reader(schema_ptr s,
+ lw_shared_ptr sstables,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
@@ -555,11 +573,11 @@ column_family::make_sstable_reader(schema_ptr s,
if (dht::shard_of(pos.token()) != engine().cpu_id()) {
return make_empty_reader(); // range doesn't belong to this shard
}
- return restrict_reader(make_mutation_reader(const_cast(this), std::move(s), _sstables,
+ return restrict_reader(make_mutation_reader(const_cast(this), std::move(s), std::move(sstables),
_stats.estimated_sstable_per_read, pr, slice, pc, std::move(trace_state), fwd));
} else {
// range_sstable_reader is not movable so we need to wrap it
- return restrict_reader(make_mutation_reader(std::move(s), _sstables, pr, slice, pc, std::move(trace_state), fwd, fwd_mr));
+ return restrict_reader(make_mutation_reader(std::move(s), std::move(sstables), pr, slice, pc, std::move(trace_state), fwd, fwd_mr));
}
}
@@ -643,7 +661,7 @@ column_family::make_reader(schema_ptr s,
if (_config.enable_cache) {
readers.emplace_back(_cache.make_reader(s, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
} else {
- readers.emplace_back(make_sstable_reader(s, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
+ readers.emplace_back(make_sstable_reader(s, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
}
return make_combined_reader(std::move(readers));
@@ -662,7 +680,7 @@ column_family::make_streaming_reader(schema_ptr s,
readers.emplace_back(mt->make_reader(s, range, slice, pc, nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no));
}
- readers.emplace_back(make_sstable_reader(s, range, slice, pc, nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no));
+ readers.emplace_back(make_sstable_reader(s, _sstables, range, slice, pc, nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no));
return make_combined_reader(std::move(readers));
}
@@ -680,7 +698,7 @@ column_family::make_streaming_reader(schema_ptr s,
for (auto&& mt : *_memtables) {
readers.emplace_back(mt->make_reader(s, range, slice, pc, trace_state, fwd, fwd_mr));
}
- readers.emplace_back(make_sstable_reader(s, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
+ readers.emplace_back(make_sstable_reader(s, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
return make_combined_reader(std::move(readers));
});
@@ -866,11 +884,6 @@ column_family::seal_active_streaming_memtable_immediate() {
// If we ever need to, we'll keep them separate statistics, but we don't want to polute the
// main stats about memtables with streaming memtables.
//
- // Second, we will not bother touching the cache after this flush. The current streaming code
- // will invalidate the ranges it touches, so we won't do it twice. Even when that changes, the
- // cache management code in here will have to differ from the main memtable's one. Please see
- // the comment at flush_streaming_mutations() for details.
- //
// Lastly, we don't have any commitlog RP to update, and we don't need to deal manipulate the
// memtable list, since this memtable was not available for reading up until this point.
return write_memtable_to_sstable(*old, newtab, incremental_backups_enabled(), priority).then([this, newtab, old] {
@@ -878,7 +891,12 @@ column_family::seal_active_streaming_memtable_immediate() {
}).then([this, old, newtab] () {
add_sstable(newtab, {engine().cpu_id()});
trigger_compaction();
- return old->clear_gently();
+ // Cache synchronization must be started atomically with add_sstable()
+ if (_config.enable_cache) {
+ return _cache.update_invalidating(*old);
+ } else {
+ return old->clear_gently();
+ }
}).handle_exception([old] (auto ep) {
dblog.error("failed to write streamed sstable: {}", ep);
return make_exception_future<>(ep);
@@ -1791,7 +1809,7 @@ future<> distributed_loader::load_new_sstables(distributed& db, sstrin
cf.trigger_compaction();
// Drop entire cache for this column family because it may be populated
// with stale data.
- return cf.get_row_cache().clear();
+ return cf.get_row_cache().invalidate();
});
}).then([&db, ks, cf] () mutable {
return smp::submit_to(0, [&db, ks = std::move(ks), cf = std::move(cf)] () mutable {
@@ -1824,6 +1842,7 @@ future distributed_loader::probe_file(distributed();
});
@@ -2566,7 +2585,6 @@ keyspace::make_column_family_config(const schema& s, const db::config& db_config
cfg.streaming_read_concurrency_config = _config.streaming_read_concurrency_config;
cfg.cf_stats = _config.cf_stats;
cfg.enable_incremental_backups = _config.enable_incremental_backups;
- cfg.max_cached_partition_size_in_bytes = db_config.max_cached_partition_size_in_kb() * 1024;
return cfg;
}
@@ -3797,28 +3815,26 @@ future<> column_family::flush_streaming_mutations(utils::UUID plan_id, dht::part
// be to change seal_active_streaming_memtable_delayed to take a range parameter. However, we
// need this code to go away as soon as we can (see FIXME above). So the double gate is a better
// temporary counter measure.
- return with_gate(_streaming_flush_gate, [this, plan_id, ranges = std::move(ranges)] {
- return flush_streaming_big_mutations(plan_id).then([this] {
- return _streaming_memtables->seal_active_memtable(memtable_list::flush_behavior::delayed);
- }).finally([this] {
- return _streaming_flush_phaser.advance_and_await();
- }).finally([this, ranges = std::move(ranges)] {
- if (!_config.enable_cache) {
- return make_ready_future<>();
- }
- return do_with(std::move(ranges), [this] (auto& ranges) {
- return parallel_for_each(ranges, [this](auto&& range) {
- return _cache.invalidate(range);
- });
+ return with_gate(_streaming_flush_gate, [this, plan_id, ranges = std::move(ranges)] () mutable {
+ return flush_streaming_big_mutations(plan_id).then([this, ranges = std::move(ranges)] (auto sstables) mutable {
+ return _streaming_memtables->seal_active_memtable(memtable_list::flush_behavior::delayed).then([this] {
+ return _streaming_flush_phaser.advance_and_await();
+ }).then([this, sstables = std::move(sstables), ranges = std::move(ranges)] () mutable {
+ for (auto&& sst : sstables) {
+ // seal_active_streaming_memtable_big() ensures sst is unshared.
+ this->add_sstable(sst, {engine().cpu_id()});
+ }
+ this->trigger_compaction();
+ return _cache.invalidate(std::move(ranges));
});
});
});
}
-future<> column_family::flush_streaming_big_mutations(utils::UUID plan_id) {
+future> column_family::flush_streaming_big_mutations(utils::UUID plan_id) {
auto it = _streaming_memtables_big.find(plan_id);
if (it == _streaming_memtables_big.end()) {
- return make_ready_future<>();
+ return make_ready_future>(std::vector());
}
auto entry = it->second;
_streaming_memtables_big.erase(it);
@@ -3830,11 +3846,7 @@ future<> column_family::flush_streaming_big_mutations(utils::UUID plan_id) {
return sst->open_data();
});
}).then([this, entry] {
- for (auto&& sst : entry->sstables) {
- // seal_active_streaming_memtable_big() ensures sst is unshared.
- add_sstable(sst, {engine().cpu_id()});
- }
- trigger_compaction();
+ return std::move(entry->sstables);
});
});
}
@@ -3862,7 +3874,7 @@ future<> column_family::clear() {
_streaming_memtables->clear();
_streaming_memtables->add_memtable();
_streaming_memtables_big.clear();
- return _cache.clear();
+ return _cache.invalidate();
}
// NOTE: does not need to be futurized, but might eventually, depending on
@@ -3888,7 +3900,7 @@ future column_family::discard_sstables(db_clock::time_point
_sstables = std::move(pruned);
dblog.debug("cleaning out row cache");
- return _cache.clear().then([rp, remove = std::move(remove)] () mutable {
+ return _cache.invalidate().then([rp, remove = std::move(remove)] () mutable {
return parallel_for_each(remove, [](sstables::shared_sstable s) {
return sstables::delete_atomically({s});
}).then([rp] {
diff --git a/database.hh b/database.hh
index 4692733c59e9..7e20335c7487 100644
--- a/database.hh
+++ b/database.hh
@@ -429,7 +429,6 @@ public:
restricted_mutation_reader_config read_concurrency_config;
restricted_mutation_reader_config streaming_read_concurrency_config;
::cf_stats* cf_stats = nullptr;
- uint64_t max_cached_partition_size_in_bytes;
};
struct no_commitlog {};
struct stats {
@@ -505,7 +504,7 @@ private:
};
std::unordered_map> _streaming_memtables_big;
- future<> flush_streaming_big_mutations(utils::UUID plan_id);
+ future> flush_streaming_big_mutations(utils::UUID plan_id);
void apply_streaming_big_mutation(schema_ptr m_schema, utils::UUID plan_id, const frozen_mutation& m);
future<> seal_active_streaming_memtable_big(streaming_memtable_big& smb);
@@ -575,7 +574,9 @@ private:
private:
void update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable, std::vector&& shards_for_the_sstable);
// Adds new sstable to the set of sstables
- // Doesn't update the cache.
+ // Doesn't update the cache. The cache must be synchronized in order for reads to see
+ // the writes contained in this sstable.
+ // Cache must be synchronized atomically with this, otherwise write atomicity may not be respected.
// Doesn't trigger compaction.
void add_sstable(lw_shared_ptr sstable, std::vector&& shards_for_the_sstable);
// returns an empty pointer if sstable doesn't belong to current shard.
@@ -619,11 +620,12 @@ private:
void remove_ancestors_needed_rewrite(std::unordered_set ancestors);
private:
mutation_source_opt _virtual_reader;
- // Creates a mutation reader which covers sstables.
+ // Creates a mutation reader which covers given sstables.
// Caller needs to ensure that column_family remains live (FIXME: relax this).
// The 'range' parameter must be live as long as the reader is used.
// Mutations returned by the reader will all have given schema.
mutation_reader make_sstable_reader(schema_ptr schema,
+ lw_shared_ptr sstables,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
@@ -632,6 +634,7 @@ private:
mutation_reader::forwarding fwd_mr) const;
mutation_source sstables_as_mutation_source();
+ snapshot_source sstables_as_snapshot_source();
partition_presence_checker make_partition_presence_checker(lw_shared_ptr);
std::chrono::steady_clock::time_point _sstable_writes_disabled_at;
void do_trigger_compaction();
diff --git a/db/config.hh b/db/config.hh
index d8a35bbb23b2..5bc2d9693e5a 100644
--- a/db/config.hh
+++ b/db/config.hh
@@ -373,9 +373,6 @@ public:
val(reduce_cache_sizes_at, double, .85, Invalid, \
"When Java heap usage (after a full concurrent mark sweep (CMS) garbage collection) exceeds this percentage, Cassandra reduces the cache capacity to the fraction of the current size as specified by reduce_cache_capacity_to. To disable, set the value to 1.0." \
) \
- val(max_cached_partition_size_in_kb, uint64_t, 10240uLL, Used, \
- "Partitions with size greater than this value won't be cached." \
- ) \
/* Disks settings */ \
val(stream_throughput_outbound_megabits_per_sec, uint32_t, 400, Unused, \
"Throttles all outbound streaming file transfers on a node to the specified throughput. Cassandra does mostly sequential I/O when streaming data during bootstrap or repair, which can lead to saturating the network connection and degrading client (RPC) performance." \
diff --git a/dht/i_partitioner.cc b/dht/i_partitioner.cc
index 0e035e06e9ef..0a1d183edfe6 100644
--- a/dht/i_partitioner.cc
+++ b/dht/i_partitioner.cc
@@ -442,7 +442,7 @@ bool ring_position::less_compare(const schema& s, const ring_position& other) co
}
int ring_position_comparator::operator()(ring_position_view lh, ring_position_view rh) const {
- auto token_cmp = tri_compare(lh._token, rh._token);
+ auto token_cmp = tri_compare(*lh._token, *rh._token);
if (token_cmp) {
return token_cmp;
}
@@ -464,7 +464,7 @@ int ring_position_comparator::operator()(ring_position_view lh, ring_position_vi
int ring_position_comparator::operator()(ring_position_view lh, sstables::key_view rh) const {
auto rh_token = global_partitioner().get_token(rh);
- auto token_cmp = tri_compare(lh._token, rh_token);
+ auto token_cmp = tri_compare(*lh._token, rh_token);
if (token_cmp) {
return token_cmp;
}
diff --git a/dht/i_partitioner.hh b/dht/i_partitioner.hh
index a568b90f6ae3..5474cb6d8e56 100644
--- a/dht/i_partitioner.hh
+++ b/dht/i_partitioner.hh
@@ -374,6 +374,14 @@ private:
token_bound _token_bound; // valid when !_key
std::experimental::optional _key;
public:
+ static ring_position min() {
+ return { minimum_token(), token_bound::start };
+ }
+
+ static ring_position max() {
+ return { maximum_token(), token_bound::end };
+ }
+
static ring_position starting_at(dht::token token) {
return { std::move(token), token_bound::start };
}
@@ -463,7 +471,7 @@ class ring_position_view {
// For example {_token=t1, _key=nullptr, _weight=1} is ordered after {_token=t1, _key=k1, _weight=0},
// but {_token=t1, _key=nullptr, _weight=-1} is ordered before it.
//
- const dht::token& _token;
+ const dht::token* _token; // always not nullptr
const partition_key* _key; // Can be nullptr
int8_t _weight;
public:
@@ -479,11 +487,11 @@ public:
}
bool is_min() const {
- return _token.is_minimum();
+ return _token->is_minimum();
}
bool is_max() const {
- return _token.is_maximum();
+ return _token->is_maximum();
}
static ring_position_view for_range_start(const partition_range& r) {
@@ -503,11 +511,14 @@ public:
}
ring_position_view(const dht::ring_position& pos, after_key after = after_key::no)
- : _token(pos.token())
+ : _token(&pos.token())
, _key(pos.has_key() ? &*pos.key() : nullptr)
, _weight(pos.has_key() ? bool(after) : pos.relation_to_keys())
{ }
+ ring_position_view(const ring_position_view& pos) = default;
+ ring_position_view& operator=(const ring_position_view& other) = default;
+
ring_position_view(after_key_tag, const ring_position_view& v)
: _token(v._token)
, _key(v._key)
@@ -515,13 +526,13 @@ public:
{ }
ring_position_view(const dht::decorated_key& key, after_key after_key = after_key::no)
- : _token(key.token())
+ : _token(&key.token())
, _key(&key.key())
, _weight(bool(after_key))
{ }
ring_position_view(const dht::token& token, partition_key* key, int8_t weight)
- : _token(token)
+ : _token(&token)
, _key(key)
, _weight(weight)
{ }
diff --git a/hashing_partition_visitor.hh b/hashing_partition_visitor.hh
index 53e284dee8b8..83c5bb785a72 100644
--- a/hashing_partition_visitor.hh
+++ b/hashing_partition_visitor.hh
@@ -63,8 +63,11 @@ public:
rt.feed_hash(_h, _s);
}
- virtual void accept_row(clustering_key_view key, const row_tombstone& deleted_at, const row_marker& rm) {
- key.feed_hash(_h, _s);
+ virtual void accept_row(position_in_partition_view pos, const row_tombstone& deleted_at, const row_marker& rm, is_dummy dummy, is_continuous continuous) override {
+ if (dummy) {
+ return;
+ }
+ pos.key().feed_hash(_h, _s);
feed_hash(_h, deleted_at);
feed_hash(_h, rm);
}
diff --git a/intrusive_set_external_comparator.hh b/intrusive_set_external_comparator.hh
index 6e2c51311f90..b1a0f2fb2889 100644
--- a/intrusive_set_external_comparator.hh
+++ b/intrusive_set_external_comparator.hh
@@ -208,6 +208,10 @@ public:
}
template
iterator insert(const_iterator hint, Elem& value, ElemCompare cmp) {
+ return insert_check(hint, value, std::move(cmp)).first;
+ }
+ template
+ std::pair insert_check(const_iterator hint, Elem& value, ElemCompare cmp) {
algo::insert_commit_data commit_data;
std::pair ret =
algo::insert_unique_check(_header.this_ptr(),
@@ -215,8 +219,8 @@ public:
key_of_value()(value),
key_node_comp(cmp),
commit_data);
- return ret.second ? insert_unique_commit(value, commit_data)
- : iterator(ret.first, priv_value_traits_ptr());
+ return ret.second ? std::make_pair(insert_unique_commit(value, commit_data), true)
+ : std::make_pair(iterator(ret.first, priv_value_traits_ptr()), false);
}
};
diff --git a/mutation.cc b/mutation.cc
index 6dbb47f51016..e801c2a2c5e2 100644
--- a/mutation.cc
+++ b/mutation.cc
@@ -206,37 +206,20 @@ mutation& mutation::operator+=(mutation&& other) {
return *this;
}
-enum class limit_mutation_size { yes, no };
+mutation mutation::sliced(const query::clustering_row_ranges& ranges) const {
+ auto m = mutation(schema(), decorated_key(), mutation_partition(partition(), *schema(), ranges));
+ m.partition().row_tombstones().trim(*schema(), ranges);
+ return m;
+}
-template
class mutation_rebuilder {
mutation _m;
streamed_mutation& _sm;
size_t _remaining_limit;
- template bool check_remaining_limit(const T& e) {
- if (with_limit == limit_mutation_size::no) {
- return true;
- }
- size_t size = e.memory_usage();
- if (_remaining_limit <= size) {
- _remaining_limit = 0;
- } else {
- _remaining_limit -= size;
- }
- return _remaining_limit > 0;
- }
public:
mutation_rebuilder(streamed_mutation& sm)
: _m(sm.decorated_key(), sm.schema()), _sm(sm), _remaining_limit(0) {
- static_assert(with_limit == limit_mutation_size::no,
- "This constructor should be used only for mutation_rebuildeer with no limit");
- }
- mutation_rebuilder(streamed_mutation& sm, size_t limit)
- : _m(sm.decorated_key(), sm.schema()), _sm(sm), _remaining_limit(limit) {
- static_assert(with_limit == limit_mutation_size::yes,
- "This constructor should be used only for mutation_rebuildeer with limit");
- check_remaining_limit(_m.key());
}
stop_iteration consume(tombstone t) {
@@ -245,25 +228,16 @@ class mutation_rebuilder {
}
stop_iteration consume(range_tombstone&& rt) {
- if (!check_remaining_limit(rt)) {
- return stop_iteration::yes;
- }
_m.partition().apply_row_tombstone(*_m.schema(), std::move(rt));
return stop_iteration::no;
}
stop_iteration consume(static_row&& sr) {
- if (!check_remaining_limit(sr)) {
- return stop_iteration::yes;
- }
_m.partition().static_row().apply(*_m.schema(), column_kind::static_column, std::move(sr.cells()));
return stop_iteration::no;
}
stop_iteration consume(clustering_row&& cr) {
- if (!check_remaining_limit(cr)) {
- return stop_iteration::yes;
- }
auto& dr = _m.partition().clustered_row(*_m.schema(), std::move(cr.key()));
dr.apply(cr.tomb());
dr.apply(cr.marker());
@@ -272,29 +246,21 @@ class mutation_rebuilder {
}
mutation_opt consume_end_of_stream() {
- return with_limit == limit_mutation_size::yes && _remaining_limit == 0 ? mutation_opt()
- : mutation_opt(std::move(_m));
+ return mutation_opt(std::move(_m));
}
};
-future
-mutation_from_streamed_mutation_with_limit(streamed_mutation sm, size_t limit) {
- return do_with(std::move(sm), [limit] (auto& sm) {
- return consume(sm, mutation_rebuilder(sm, limit));
- });
-}
-
future mutation_from_streamed_mutation(streamed_mutation_opt sm) {
if (!sm) {
return make_ready_future();
}
return do_with(std::move(*sm), [] (auto& sm) {
- return consume(sm, mutation_rebuilder(sm));
+ return consume(sm, mutation_rebuilder(sm));
});
}
future mutation_from_streamed_mutation(streamed_mutation& sm) {
- return consume(sm, mutation_rebuilder(sm)).then([] (mutation_opt&& mo) {
+ return consume(sm, mutation_rebuilder(sm)).then([] (mutation_opt&& mo) {
return std::move(*mo);
});
}
diff --git a/mutation.hh b/mutation.hh
index 10c29c02db93..f3f634553239 100644
--- a/mutation.hh
+++ b/mutation.hh
@@ -133,6 +133,10 @@ public:
mutation operator+(const mutation& other) const;
mutation& operator+=(const mutation& other);
mutation& operator+=(mutation&& other);
+
+ // Returns a subset of this mutation holding only information relevant for given clustering ranges.
+ // Range tombstones will be trimmed to the boundaries of the clustering ranges.
+ mutation sliced(const query::clustering_row_ranges&) const;
private:
friend std::ostream& operator<<(std::ostream& os, const mutation& m);
};
@@ -185,4 +189,3 @@ boost::iterator_range::const_iterator> slice(
future mutation_from_streamed_mutation(streamed_mutation_opt sm);
future mutation_from_streamed_mutation(streamed_mutation& sm);
-future mutation_from_streamed_mutation_with_limit(streamed_mutation sm, size_t limit);
diff --git a/mutation_partition.cc b/mutation_partition.cc
index 1d1665ddff2b..2a1a54f5b5c8 100644
--- a/mutation_partition.cc
+++ b/mutation_partition.cc
@@ -175,7 +175,7 @@ void revert_intrusive_set_range(const schema& s, mutation_partition::rows_type&
assert(i != dst.end());
rows_entry& dst_e = *i;
- if (e.empty()) {
+ if (e.erased()) {
dst.erase(i);
start = src.erase_and_dispose(start, deleter);
start = src.insert_before(start, dst_e);
@@ -203,18 +203,10 @@ auto apply_reversibly_intrusive_set(const schema& s, mutation_partition::rows_ty
while (src_i != src.end()) {
rows_entry& src_e = *src_i;
- // neutral entries will be given special meaning for the purpose of revert, so
- // get rid of empty rows from the input as if they were not there. This doesn't change
- // the value of src.
- if (src_e.empty()) {
- src_i = src.erase_and_dispose(src_i, current_deleter());
- continue;
- }
-
auto i = dst.lower_bound(src_e, cmp);
if (i == dst.end() || cmp(src_e, *i)) {
- // Construct neutral entry which will represent missing dst entry for revert.
- rows_entry* empty_e = current_allocator().construct(src_e.key());
+ // Construct erased entry which will represent missing dst entry for revert.
+ rows_entry* empty_e = current_allocator().construct(rows_entry::erased_tag{}, src_e);
[&] () noexcept {
src_i = src.erase(src_i);
src_i = src.insert_before(src_i, *empty_e);
@@ -235,6 +227,7 @@ auto apply_reversibly_intrusive_set(const schema& s, mutation_partition::rows_ty
mutation_partition::mutation_partition(const mutation_partition& x)
: _tombstone(x._tombstone)
, _static_row(x._static_row)
+ , _static_row_continuous(x._static_row_continuous)
, _rows()
, _row_tombstones(x._row_tombstones) {
auto cloner = [] (const auto& x) {
@@ -247,6 +240,7 @@ mutation_partition::mutation_partition(const mutation_partition& x, const schema
query::clustering_key_filter_ranges ck_ranges)
: _tombstone(x._tombstone)
, _static_row(x._static_row)
+ , _static_row_continuous(x._static_row_continuous)
, _rows()
, _row_tombstones(x._row_tombstones, range_tombstone_list::copy_comparator_only()) {
try {
@@ -271,6 +265,7 @@ mutation_partition::mutation_partition(mutation_partition&& x, const schema& sch
query::clustering_key_filter_ranges ck_ranges)
: _tombstone(x._tombstone)
, _static_row(std::move(x._static_row))
+ , _static_row_continuous(x._static_row_continuous)
, _rows(std::move(x._rows))
, _row_tombstones(std::move(x._row_tombstones))
{
@@ -319,6 +314,13 @@ mutation_partition::operator=(mutation_partition&& x) noexcept {
return *this;
}
+void mutation_partition::ensure_last_dummy(const schema& s) {
+ if (_rows.empty() || !_rows.rbegin()->position().is_after_all_clustered_rows(s)) {
+ _rows.insert_before(_rows.end(),
+ *current_allocator().construct(s, position_in_partition_view::after_all_clustered_rows(), is_dummy::yes, is_continuous::yes));
+ }
+}
+
void
mutation_partition::apply(const schema& s, const mutation_partition& p, const schema& p_schema) {
if (s.version() != p_schema.version()) {
@@ -507,7 +509,7 @@ mutation_partition::clustered_row(const schema& s, const clustering_key& key) {
}
deletable_row&
-mutation_partition::clustered_row(const schema& s, const clustering_key_view& key) {
+mutation_partition::clustered_row(const schema& s, clustering_key_view key) {
auto i = _rows.find(key, rows_entry::compare(s));
if (i == _rows.end()) {
auto e = current_allocator().construct(key);
@@ -517,6 +519,17 @@ mutation_partition::clustered_row(const schema& s, const clustering_key_view& ke
return i->row();
}
+deletable_row&
+mutation_partition::clustered_row(const schema& s, position_in_partition_view pos, is_dummy dummy, is_continuous continuous) {
+ auto i = _rows.find(pos, rows_entry::compare(s));
+ if (i == _rows.end()) {
+ auto e = current_allocator().construct(s, pos, dummy, continuous);
+ _rows.insert(i, *e, rows_entry::compare(s));
+ return e->row();
+ }
+ return i->row();
+}
+
mutation_partition::rows_type::const_iterator
mutation_partition::lower_bound(const schema& schema, const query::clustering_range& r) const {
auto cmp = rows_entry::key_comparator(clustering_key_prefix::prefix_equality_less_compare(schema));
@@ -759,6 +772,9 @@ mutation_partition::query_compacted(query::result::partition_writer& pw, const s
auto is_reversed = slice.options.contains(query::partition_slice::option::reversed);
auto send_ck = slice.options.contains(query::partition_slice::option::send_clustering_key);
for_each_row(s, query::clustering_range::make_open_ended_both_sides(), is_reversed, [&] (const rows_entry& e) {
+ if (e.dummy()) {
+ return stop_iteration::no;
+ }
auto& row = e.row();
auto row_tombstone = tombstone_for_row(s, e);
@@ -843,13 +859,13 @@ operator<<(std::ostream& os, const deletable_row& dr) {
std::ostream&
operator<<(std::ostream& os, const rows_entry& re) {
- return fprint(os, "{rows_entry: %s %s}", re._key, re._row);
+ return fprint(os, "{rows_entry: cont=%d dummy=%d %s %s}", re.continuous(), re.dummy(), re._key, re._row);
}
std::ostream&
operator<<(std::ostream& os, const mutation_partition& mp) {
- return fprint(os, "{mutation_partition: %s (%s) static %s clustered %s}",
- mp._tombstone, ::join(", ", mp._row_tombstones), mp._static_row,
+ return fprint(os, "{mutation_partition: %s (%s) static cont=%d %s clustered %s}",
+ mp._tombstone, ::join(", ", mp._row_tombstones), mp._static_row_continuous, mp._static_row,
::join(", ", mp._rows));
}
@@ -905,14 +921,30 @@ void deletable_row::revert(const schema& s, deletable_row& src) {
_marker.revert(src._marker);
}
+void deletable_row::apply(const schema& s, deletable_row&& src) {
+ _cells.apply(s, column_kind::regular_column, std::move(src._cells));
+ _marker.apply(src._marker);
+ _deleted_at.apply(src._deleted_at, _marker);
+}
+
bool
rows_entry::equal(const schema& s, const rows_entry& other) const {
return equal(s, other, s);
}
+position_in_partition_view rows_entry::position() const {
+ if (_flags._last) {
+ return position_in_partition_view::after_all_clustered_rows();
+ } else {
+ return position_in_partition_view(
+ position_in_partition_view::clustering_row_tag_t(), _key);
+ }
+}
+
bool
rows_entry::equal(const schema& s, const rows_entry& other, const schema& other_schema) const {
- return key().equal(s, other.key()) // Only representation-compatible changes are allowed
+ position_in_partition::equal_compare eq(s);
+ return eq(position(), other.position())
&& row().equal(column_kind::regular_column, s, other.row(), other_schema);
}
@@ -925,7 +957,7 @@ bool mutation_partition::equal(const schema& this_schema, const mutation_partiti
return false;
}
- if (!std::equal(_rows.begin(), _rows.end(), p._rows.begin(), p._rows.end(),
+ if (!boost::equal(non_dummy_rows(), p.non_dummy_rows(),
[&] (const rows_entry& e1, const rows_entry& e2) {
return e1.equal(this_schema, e2, p_schema);
}
@@ -943,6 +975,16 @@ bool mutation_partition::equal(const schema& this_schema, const mutation_partiti
return _static_row.equal(column_kind::static_column, this_schema, p._static_row, p_schema);
}
+bool mutation_partition::equal_continuity(const schema& s, const mutation_partition& p) const {
+ return _static_row_continuous == p._static_row_continuous
+ && boost::equal(_rows, p._rows, [&] (const rows_entry& e1, const rows_entry& e2) {
+ position_in_partition::equal_compare eq(s);
+ return eq(e1.position(), e2.position())
+ && e1.continuous() == e2.continuous()
+ && e1.dummy() == e2.dummy();
+ });
+}
+
void
apply_reversibly(const column_definition& def, atomic_cell_or_collection& dst, atomic_cell_or_collection& src) {
// Must be run via with_linearized_managed_bytes() context, but assume it is
@@ -1216,8 +1258,10 @@ uint32_t mutation_partition::do_compact(const schema& s,
uint32_t row_count = 0;
auto row_callback = [&] (rows_entry& e) {
+ if (e.dummy()) {
+ return stop_iteration::no;
+ }
deletable_row& row = e.row();
-
row_tombstone tomb = tombstone_for_row(s, e);
bool is_live = row.cells().compact_and_expire(s, column_kind::regular_column, tomb, query_time, can_gc, gc_before);
@@ -1315,7 +1359,7 @@ size_t
mutation_partition::live_row_count(const schema& s, gc_clock::time_point query_time) const {
size_t count = 0;
- for (const rows_entry& e : _rows) {
+ for (const rows_entry& e : non_dummy_rows()) {
tombstone base_tombstone = range_tombstone_for_row(s, e.key());
if (e.row().is_live(s, base_tombstone, query_time)) {
++count;
@@ -1333,6 +1377,7 @@ rows_entry::rows_entry(rows_entry&& o) noexcept
: _link(std::move(o._link))
, _key(std::move(o._key))
, _row(std::move(o._row))
+ , _flags(std::move(o._flags))
{ }
row::row(const row& o)
@@ -1641,7 +1686,10 @@ mutation_partition mutation_partition::difference(schema_ptr s, const mutation_p
auto it_r = other._rows.begin();
rows_entry::compare cmp_r(*s);
for (auto&& r : _rows) {
- while (it_r != other._rows.end() && cmp_r(*it_r, r)) {
+ if (r.dummy()) {
+ continue;
+ }
+ while (it_r != other._rows.end() && (it_r->dummy() || cmp_r(*it_r, r))) {
++it_r;
}
if (it_r == other._rows.end() || !it_r->key().equal(*s, r.key())) {
@@ -1671,7 +1719,7 @@ void mutation_partition::accept(const schema& s, mutation_partition_visitor& v)
}
for (const rows_entry& e : _rows) {
const deletable_row& dr = e.row();
- v.accept_row(e.key(), dr.deleted_at(), dr.marker());
+ v.accept_row(e.position(), dr.deleted_at(), dr.marker(), e.dummy(), e.continuous());
dr.cells().for_each_cell([&] (column_id id, const atomic_cell_or_collection& cell) {
const column_definition& def = s.regular_column_at(id);
if (def.is_atomic()) {
@@ -2069,6 +2117,41 @@ class counter_write_query_result_builder {
}
};
+mutation_partition::mutation_partition(mutation_partition::incomplete_tag, const schema& s, tombstone t)
+ : _tombstone(t)
+ , _static_row_continuous(false)
+ , _rows()
+ , _row_tombstones(s)
+{
+ _rows.insert_before(_rows.end(),
+ *current_allocator().construct(s, position_in_partition_view::after_all_clustered_rows(), is_dummy::yes, is_continuous::no));
+}
+
+bool mutation_partition::is_fully_continuous() const {
+ if (!_static_row_continuous) {
+ return false;
+ }
+ for (auto&& row : _rows) {
+ if (!row.continuous()) {
+ return false;
+ }
+ }
+ return true;
+}
+
+void mutation_partition::make_fully_continuous() {
+ _static_row_continuous = true;
+ auto i = _rows.begin();
+ while (i != _rows.end()) {
+ if (i->dummy()) {
+ i = _rows.erase_and_dispose(i, alloc_strategy_deleter());
+ } else {
+ i->set_continuous(true);
+ ++i;
+ }
+ }
+}
+
future counter_write_query(schema_ptr s, const mutation_source& source,
const dht::decorated_key& dk,
const query::partition_slice& slice,
diff --git a/mutation_partition.hh b/mutation_partition.hh
index b8ff57d5b9ff..ff4b14a9b118 100644
--- a/mutation_partition.hh
+++ b/mutation_partition.hh
@@ -33,6 +33,7 @@
#include "schema.hh"
#include "tombstone.hh"
#include "keys.hh"
+#include "position_in_partition.hh"
#include "atomic_cell_or_collection.hh"
#include "query-result.hh"
#include "mutation_partition_view.hh"
@@ -598,6 +599,9 @@ class deletable_row final {
public:
deletable_row() {}
explicit deletable_row(clustering_row&&);
+ deletable_row(row_tombstone tomb, const row_marker& marker, const row& cells)
+ : _deleted_at(tomb), _marker(marker), _cells(cells)
+ {}
void apply(tombstone deleted_at) {
_deleted_at.apply(deleted_at);
@@ -624,6 +628,10 @@ public:
void apply_reversibly(const schema& s, deletable_row& src);
// See reversibly_mergeable.hh
void revert(const schema& s, deletable_row& src);
+
+ // Weak exception guarantees. After exception, both src and this will commute to the same value as
+ // they would should the exception not happen.
+ void apply(const schema& s, deletable_row&& src);
public:
row_tombstone deleted_at() const { return _deleted_at; }
api::timestamp_type created_at() const { return _marker.timestamp(); }
@@ -642,28 +650,59 @@ class rows_entry {
intrusive_set_external_comparator_member_hook _link;
clustering_key _key;
deletable_row _row;
+ struct flags {
+ bool _continuous : 1; // See doc of is_continuous.
+ bool _dummy : 1;
+ bool _last : 1;
+ bool _erased : 1; // Used only temporarily during apply_reversibly(). Refs #2012.
+ flags() : _continuous(true), _dummy(false), _last(false), _erased(false) { }
+ } _flags{};
friend class mutation_partition;
public:
+ struct erased_tag {};
+ rows_entry(erased_tag, const rows_entry& e)
+ : _key(e._key)
+ {
+ _flags._erased = true;
+ _flags._last = e._flags._last;
+ }
explicit rows_entry(clustering_key&& key)
: _key(std::move(key))
{ }
explicit rows_entry(const clustering_key& key)
: _key(key)
{ }
+ rows_entry(const schema& s, position_in_partition_view pos, is_dummy dummy, is_continuous continuous)
+ : _key(pos.key())
+ {
+ if (!pos.is_clustering_row()) {
+ assert(bool(dummy));
+ assert(pos.is_after_all_clustered_rows(s)); // FIXME: Support insertion at any position
+ _flags._last = true;
+ }
+ _flags._dummy = bool(dummy);
+ _flags._continuous = bool(continuous);
+ }
rows_entry(const clustering_key& key, deletable_row&& row)
: _key(key), _row(std::move(row))
{ }
rows_entry(const clustering_key& key, const deletable_row& row)
: _key(key), _row(row)
{ }
+ rows_entry(const clustering_key& key, row_tombstone tomb, const row_marker& marker, const row& row)
+ : _key(key), _row(tomb, marker, row)
+ { }
rows_entry(rows_entry&& o) noexcept;
rows_entry(const rows_entry& e)
: _key(e._key)
, _row(e._row)
+ , _flags(e._flags)
{ }
+ // Valid only if !dummy()
clustering_key& key() {
return _key;
}
+ // Valid only if !dummy()
const clustering_key& key() const {
return _key;
}
@@ -673,6 +712,11 @@ public:
const deletable_row& row() const {
return _row;
}
+ position_in_partition_view position() const;
+ is_continuous continuous() const { return is_continuous(_flags._continuous); }
+ void set_continuous(bool value) { _flags._continuous = value; }
+ void set_continuous(is_continuous value) { set_continuous(bool(value)); }
+ is_dummy dummy() const { return is_dummy(_flags._dummy); }
void apply(row_tombstone t) {
_row.apply(t);
}
@@ -687,23 +731,54 @@ public:
bool empty() const {
return _row.empty();
}
+ bool erased() const {
+ return _flags._erased;
+ }
+ struct tri_compare {
+ position_in_partition::tri_compare _c;
+ explicit tri_compare(const schema& s) : _c(s) {}
+ int operator()(const rows_entry& e1, const rows_entry& e2) const {
+ return _c(e1.position(), e2.position());
+ }
+ int operator()(const clustering_key& key, const rows_entry& e) const {
+ return _c(position_in_partition_view::for_key(key), e.position());
+ }
+ int operator()(const rows_entry& e, const clustering_key& key) const {
+ return _c(e.position(), position_in_partition_view::for_key(key));
+ }
+ int operator()(const rows_entry& e, position_in_partition_view p) const {
+ return _c(e.position(), p);
+ }
+ int operator()(position_in_partition_view p, const rows_entry& e) const {
+ return _c(p, e.position());
+ }
+ int operator()(position_in_partition_view p1, position_in_partition_view p2) const {
+ return _c(p1, p2);
+ }
+ };
struct compare {
- clustering_key::less_compare _c;
- compare(const schema& s) : _c(s) {}
+ tri_compare _c;
+ explicit compare(const schema& s) : _c(s) {}
bool operator()(const rows_entry& e1, const rows_entry& e2) const {
- return _c(e1._key, e2._key);
+ return _c(e1, e2) < 0;
}
bool operator()(const clustering_key& key, const rows_entry& e) const {
- return _c(key, e._key);
+ return _c(key, e) < 0;
}
bool operator()(const rows_entry& e, const clustering_key& key) const {
- return _c(e._key, key);
+ return _c(e, key) < 0;
}
bool operator()(const clustering_key_view& key, const rows_entry& e) const {
- return _c(key, e._key);
+ return _c(key, e) < 0;
}
bool operator()(const rows_entry& e, const clustering_key_view& key) const {
- return _c(e._key, key);
+ return _c(e, key) < 0;
+ }
+ bool operator()(const rows_entry& e, position_in_partition_view p) const {
+ return _c(e.position(), p) < 0;
+ }
+ bool operator()(position_in_partition_view p, const rows_entry& e) const {
+ return _c(p, e.position()) < 0;
}
};
template
@@ -712,10 +787,16 @@ public:
delegating_compare(Comparator&& c) : _c(std::move(c)) {}
template
bool operator()(const Comparable& v, const rows_entry& e) const {
+ if (e._flags._last) {
+ return true;
+ }
return _c(v, e._key);
}
template
bool operator()(const rows_entry& e, const Comparable& v) const {
+ if (e._flags._last) {
+ return false;
+ }
return _c(e._key, v);
}
};
@@ -728,6 +809,47 @@ public:
bool equal(const schema& s, const rows_entry& other, const schema& other_schema) const;
};
+// Represents a set of writes made to a single partition.
+//
+// The object is schema-dependent. Each instance is governed by some
+// specific schema version. Accessors require a reference to the schema object
+// of that version.
+//
+// There is an operation of addition defined on mutation_partition objects
+// (also called "apply"), which gives as a result an object representing the
+// sum of writes contained in the addends. For instances governed by the same
+// schema, addition is commutative and associative.
+//
+// In addition to representing writes, the object supports specifying a set of
+// partition elements called "continuity". This set can be used to represent
+// lack of information about certain parts of the partition. It can be
+// specified which ranges of clustering keys belong to that set. We say that a
+// key range is continuous if all keys in that range belong to the continuity
+// set, and discontinuous otherwise. By default everything is continuous.
+// The static row may be also continuous or not.
+// Partition tombstone is always continuous.
+//
+// Continuity is ignored by instance equality. It's also transient, not
+// preserved by serialization.
+//
+// Continuity is represented internally using flags on row entries. The key
+// range between two consecutive entries (both ends exclusive) is continuous
+// if and only if rows_entry::continuous() is true for the later entry. The
+// range starting after the last entry is assumed to be continuous. The range
+// corresponding to the key of the entry is continuous if and only if
+// rows_entry::dummy() is false.
+//
+// Adding two fully-continuous instances gives a fully-continuous instance.
+// Continuity doesn't affect how the write part is added.
+//
+// Addition of continuity is not commutative in general, but is associative.
+// Continuity flags on objects representing the same thing (e.g. rows_entry
+// with the same key) are merged such that the information stored in the left-
+// hand operand wins. Flags on objects which are present only in one of the
+// operands are transferred as-is. Such merging rules are useful for layering
+// information in MVCC, where newer versions specify continuity with respect
+// to the combined set of rows in all prior versions, not just in their
+// versions.
class mutation_partition final {
public:
using rows_type = intrusive_set_external_comparator;
@@ -736,6 +858,7 @@ public:
private:
tombstone _tombstone;
row _static_row;
+ bool _static_row_continuous = true;
rows_type _rows;
// Contains only strict prefixes so that we don't have to lookup full keys
// in both _row_tombstones and _rows.
@@ -745,6 +868,12 @@ private:
friend class converting_mutation_partition_applier;
public:
struct copy_comparators_only {};
+ struct incomplete_tag {};
+ // Constructs an empty instance which is fully discontinuous except for the partition tombstone.
+ mutation_partition(incomplete_tag, const schema& s, tombstone);
+ static mutation_partition make_incomplete(const schema& s, tombstone t = {}) {
+ return mutation_partition(incomplete_tag(), s, t);
+ }
mutation_partition(schema_ptr s)
: _rows()
, _row_tombstones(*s)
@@ -762,6 +891,7 @@ public:
mutation_partition& operator=(mutation_partition&& x) noexcept;
bool equal(const schema&, const mutation_partition&) const;
bool equal(const schema& this_schema, const mutation_partition& p, const schema& p_schema) const;
+ bool equal_continuity(const schema&, const mutation_partition&) const;
// Consistent with equal()
template
void feed_hash(Hasher& h, const schema& s) const {
@@ -770,6 +900,13 @@ public:
}
friend std::ostream& operator<<(std::ostream& os, const mutation_partition& mp);
public:
+ // Makes sure there is a dummy entry after all clustered rows. Doesn't affect continuity.
+ // Doesn't invalidate iterators.
+ void ensure_last_dummy(const schema&);
+ bool static_row_continuous() const { return _static_row_continuous; }
+ void set_static_row_continuous(bool value) { _static_row_continuous = value; }
+ bool is_fully_continuous() const;
+ void make_fully_continuous();
void apply(tombstone t) { _tombstone.apply(t); }
void apply_delete(const schema& schema, const clustering_key_prefix& prefix, tombstone t);
void apply_delete(const schema& schema, range_tombstone rt);
@@ -866,7 +1003,8 @@ public:
public:
deletable_row& clustered_row(const schema& s, const clustering_key& key);
deletable_row& clustered_row(const schema& s, clustering_key&& key);
- deletable_row& clustered_row(const schema& s, const clustering_key_view& key);
+ deletable_row& clustered_row(const schema& s, clustering_key_view key);
+ deletable_row& clustered_row(const schema& s, position_in_partition_view pos, is_dummy, is_continuous);
public:
tombstone partition_tombstone() const { return _tombstone; }
row& static_row() { return _static_row; }
@@ -879,6 +1017,7 @@ public:
const row* find_row(const schema& s, const clustering_key& key) const;
tombstone range_tombstone_for_row(const schema& schema, const clustering_key& key) const;
row_tombstone tombstone_for_row(const schema& schema, const clustering_key& key) const;
+ // Can be called only for non-dummy entries
row_tombstone tombstone_for_row(const schema& schema, const rows_entry& e) const;
boost::iterator_range range(const schema& schema, const query::clustering_range& r) const;
rows_type::const_iterator lower_bound(const schema& schema, const query::clustering_range& r) const;
@@ -886,6 +1025,11 @@ public:
rows_type::iterator lower_bound(const schema& schema, const query::clustering_range& r);
rows_type::iterator upper_bound(const schema& schema, const query::clustering_range& r);
boost::iterator_range range(const schema& schema, const query::clustering_range& r);
+ // Returns an iterator range of rows_entry, with only non-dummy entries.
+ auto non_dummy_rows() const {
+ return boost::make_iterator_range(_rows.begin(), _rows.end())
+ | boost::adaptors::filtered([] (const rows_entry& e) { return bool(!e.dummy()); });
+ }
// Writes this partition using supplied query result writer.
// The partition should be first compacted with compact_for_query(), otherwise
// results may include data which is deleted/expired.
diff --git a/mutation_partition_applier.hh b/mutation_partition_applier.hh
index 23a2600e4f79..2414150add31 100644
--- a/mutation_partition_applier.hh
+++ b/mutation_partition_applier.hh
@@ -50,8 +50,8 @@ public:
_p.apply_row_tombstone(_schema, rt);
}
- virtual void accept_row(clustering_key_view key, const row_tombstone& deleted_at, const row_marker& rm) override {
- deletable_row& r = _p.clustered_row(_schema, key);
+ virtual void accept_row(position_in_partition_view key, const row_tombstone& deleted_at, const row_marker& rm, is_dummy dummy, is_continuous continuous) override {
+ deletable_row& r = _p.clustered_row(_schema, key, dummy, continuous);
r.apply(rm);
r.apply(deleted_at);
_current_row = &r;
diff --git a/mutation_partition_serializer.cc b/mutation_partition_serializer.cc
index 9413f61c577b..7788e0fbed6f 100644
--- a/mutation_partition_serializer.cc
+++ b/mutation_partition_serializer.cc
@@ -196,7 +196,7 @@ void mutation_partition_serializer::write_serialized(Writer&& writer, const sche
auto row_tombstones = write_row_cells(std::move(srow_writer), mp.static_row(), s, column_kind::static_column).end_static_row().start_range_tombstones();
write_tombstones(s, row_tombstones, mp.row_tombstones());
auto clustering_rows = std::move(row_tombstones).end_range_tombstones().start_rows();
- for (auto&& cr : mp.clustered_rows()) {
+ for (auto&& cr : mp.non_dummy_rows()) {
write_row(clustering_rows.add(), s, cr.key(), cr.row().cells(), cr.row().marker(), cr.row().deleted_at());
}
std::move(clustering_rows).end_rows().end_mutation_partition();
diff --git a/mutation_partition_view.cc b/mutation_partition_view.cc
index f96645014c83..d2d555c602ce 100644
--- a/mutation_partition_view.cc
+++ b/mutation_partition_view.cc
@@ -210,7 +210,7 @@ mutation_partition_view::accept(const column_mapping& cm, mutation_partition_vis
for (auto&& cr : mpv.rows()) {
auto t = row_tombstone(cr.deleted_at(), shadowable_tombstone(cr.shadowable_deleted_at()));
- visitor.accept_row(cr.key(), t, read_row_marker(cr.marker()));
+ visitor.accept_row(position_in_partition_view::for_key(cr.key()), t, read_row_marker(cr.marker()));
struct cell_visitor {
mutation_partition_visitor& _visitor;
diff --git a/mutation_partition_visitor.hh b/mutation_partition_visitor.hh
index 3e3f014bcdf4..9a7c5ac50c65 100644
--- a/mutation_partition_visitor.hh
+++ b/mutation_partition_visitor.hh
@@ -29,6 +29,19 @@
class row_marker;
class row_tombstone;
+// When used on an entry, marks the range between this entry and the previous
+// one as continuous or discontinuous, excluding the keys of both entries.
+// This information doesn't apply to continuity of the entries themselves,
+// that is specified by is_dummy flag.
+// See class doc of mutation_partition.
+using is_continuous = bool_class;
+
+// Dummy entry is an entry which is incomplete.
+// Typically used for marking bounds of continuity range.
+// See class doc of mutation_partition.
+class dummy_tag {};
+using is_dummy = bool_class;
+
// Guarantees:
//
// - any tombstones which affect cell's liveness are visited before that cell
@@ -56,7 +69,8 @@ public:
virtual void accept_row_tombstone(const range_tombstone&) = 0;
- virtual void accept_row(clustering_key_view key, const row_tombstone& deleted_at, const row_marker& rm) = 0;
+ virtual void accept_row(position_in_partition_view key, const row_tombstone& deleted_at, const row_marker& rm,
+ is_dummy = is_dummy::no, is_continuous = is_continuous::yes) = 0;
virtual void accept_row_cell(column_id id, atomic_cell_view) = 0;
diff --git a/mutation_reader.cc b/mutation_reader.cc
index 4f28c64891f6..8eb1cc861f15 100644
--- a/mutation_reader.cc
+++ b/mutation_reader.cc
@@ -153,8 +153,8 @@ class reader_returning final : public mutation_reader::impl {
}
};
-mutation_reader make_reader_returning(mutation m) {
- return make_mutation_reader(streamed_mutation_from_mutation(std::move(m)));
+mutation_reader make_reader_returning(mutation m, streamed_mutation::forwarding fwd) {
+ return make_mutation_reader(streamed_mutation_from_mutation(std::move(m), std::move(fwd)));
}
mutation_reader make_reader_returning(streamed_mutation m) {
@@ -324,3 +324,36 @@ make_multi_range_reader(schema_ptr s, mutation_source source, const dht::partiti
return make_mutation_reader(std::move(s), std::move(source), ranges,
slice, pc, std::move(trace_state), fwd, fwd_mr);
}
+
+snapshot_source make_empty_snapshot_source() {
+ return snapshot_source([] {
+ return make_empty_mutation_source();
+ });
+}
+
+mutation_source make_empty_mutation_source() {
+ return mutation_source([](schema_ptr s,
+ const dht::partition_range& pr,
+ const query::partition_slice& slice,
+ const io_priority_class& pc,
+ tracing::trace_state_ptr tr,
+ streamed_mutation::forwarding fwd) {
+ return make_empty_reader();
+ });
+}
+
+mutation_source make_combined_mutation_source(std::vector addends) {
+ return mutation_source([addends = std::move(addends)] (schema_ptr s,
+ const dht::partition_range& pr,
+ const query::partition_slice& slice,
+ const io_priority_class& pc,
+ tracing::trace_state_ptr tr,
+ streamed_mutation::forwarding fwd) {
+ std::vector rd;
+ rd.reserve(addends.size());
+ for (auto&& ms : addends) {
+ rd.emplace_back(ms(s, pr, slice, pc, tr, fwd));
+ }
+ return make_combined_reader(std::move(rd));
+ });
+}
diff --git a/mutation_reader.hh b/mutation_reader.hh
index c7aa94d6db5c..cee317a067b7 100644
--- a/mutation_reader.hh
+++ b/mutation_reader.hh
@@ -159,7 +159,7 @@ public:
mutation_reader make_combined_reader(std::vector);
mutation_reader make_combined_reader(mutation_reader&& a, mutation_reader&& b);
// reads from the input readers, in order
-mutation_reader make_reader_returning(mutation);
+mutation_reader make_reader_returning(mutation, streamed_mutation::forwarding fwd = streamed_mutation::forwarding::no);
mutation_reader make_reader_returning(streamed_mutation);
mutation_reader make_reader_returning_many(std::vector,
const query::partition_slice& slice = query::full_slice,
@@ -279,34 +279,36 @@ class mutation_source {
// We could have our own version of std::function<> that is nothrow
// move constructible and save some indirection and allocation.
// Probably not worth the effort though.
- std::unique_ptr _fn;
+ lw_shared_ptr _fn;
private:
mutation_source() = default;
explicit operator bool() const { return bool(_fn); }
friend class optimized_optional;
public:
- mutation_source(func_type fn) : _fn(std::make_unique(std::move(fn))) {}
+ mutation_source(func_type fn) : _fn(make_lw_shared(std::move(fn))) {}
+ // For sources which don't care about the mutation_reader::forwarding flag (always fast forwardable)
+ mutation_source(std::function fn)
+ : _fn(make_lw_shared([fn = std::move(fn)] (schema_ptr s, partition_range range, const query::partition_slice& slice, io_priority pc, tracing::trace_state_ptr tr, streamed_mutation::forwarding fwd, mutation_reader::forwarding) {
+ return fn(s, range, slice, pc, std::move(tr), fwd);
+ })) {}
mutation_source(std::function fn)
- : _fn(std::make_unique([fn = std::move(fn)] (schema_ptr s, partition_range range, const query::partition_slice& slice, io_priority pc, tracing::trace_state_ptr, streamed_mutation::forwarding, mutation_reader::forwarding) {
+ : _fn(make_lw_shared([fn = std::move(fn)] (schema_ptr s, partition_range range, const query::partition_slice& slice, io_priority pc, tracing::trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding) {
+ assert(!fwd);
return fn(s, range, slice, pc);
})) {}
mutation_source(std::function fn)
- : _fn(std::make_unique([fn = std::move(fn)] (schema_ptr s, partition_range range, const query::partition_slice& slice, io_priority, tracing::trace_state_ptr, streamed_mutation::forwarding, mutation_reader::forwarding) {
+ : _fn(make_lw_shared([fn = std::move(fn)] (schema_ptr s, partition_range range, const query::partition_slice& slice, io_priority, tracing::trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding) {
+ assert(!fwd);
return fn(s, range, slice);
})) {}
mutation_source(std::function fn)
- : _fn(std::make_unique([fn = std::move(fn)] (schema_ptr s, partition_range range, const query::partition_slice&, io_priority, tracing::trace_state_ptr, streamed_mutation::forwarding, mutation_reader::forwarding) {
+ : _fn(make_lw_shared([fn = std::move(fn)] (schema_ptr s, partition_range range, const query::partition_slice&, io_priority, tracing::trace_state_ptr, streamed_mutation::forwarding fwd, mutation_reader::forwarding) {
+ assert(!fwd);
return fn(s, range);
})) {}
- mutation_source(const mutation_source& other)
- : _fn(std::make_unique(*other._fn)) { }
-
- mutation_source& operator=(const mutation_source& other) {
- _fn = std::make_unique(*other._fn);
- return *this;
- }
-
+ mutation_source(const mutation_source& other) = default;
+ mutation_source& operator=(const mutation_source& other) = default;
mutation_source(mutation_source&&) = default;
mutation_source& operator=(mutation_source&&) = default;
@@ -326,6 +328,32 @@ public:
}
};
+// Returns a mutation_source which is the sum of given mutation_sources.
+//
+// Adding two mutation sources gives a mutation source which contains
+// the sum of writes contained in the addends.
+mutation_source make_combined_mutation_source(std::vector);
+
+// Represent mutation_source which can be snapshotted.
+class snapshot_source {
+private:
+ std::function _func;
+public:
+ snapshot_source(std::function func)
+ : _func(std::move(func))
+ { }
+
+ // Creates a new snapshot.
+ // The returned mutation_source represents all earlier writes and only those.
+ // Note though that the mutations in the snapshot may get compacted over time.
+ mutation_source operator()() {
+ return _func();
+ }
+};
+
+mutation_source make_empty_mutation_source();
+snapshot_source make_empty_snapshot_source();
+
template<>
struct move_constructor_disengages {
enum { value = true };
diff --git a/partition_builder.hh b/partition_builder.hh
index f26a9d2f3604..e04ce2ff99a4 100644
--- a/partition_builder.hh
+++ b/partition_builder.hh
@@ -56,8 +56,8 @@ public:
_partition.apply_row_tombstone(_schema, rt);
}
- virtual void accept_row(clustering_key_view key, const row_tombstone& deleted_at, const row_marker& rm) override {
- deletable_row& r = _partition.clustered_row(_schema, key);
+ virtual void accept_row(position_in_partition_view key, const row_tombstone& deleted_at, const row_marker& rm, is_dummy dummy, is_continuous continuous) override {
+ deletable_row& r = _partition.clustered_row(_schema, key, dummy, continuous);
r.apply(rm);
r.apply(deleted_at);
_current_row = &r;
diff --git a/partition_snapshot_reader.hh b/partition_snapshot_reader.hh
index 3173d20c0281..a1445382bb18 100644
--- a/partition_snapshot_reader.hh
+++ b/partition_snapshot_reader.hh
@@ -30,6 +30,26 @@ struct partition_snapshot_reader_dummy_accounter {
};
extern partition_snapshot_reader_dummy_accounter no_accounter;
+inline void maybe_merge_versions(lw_shared_ptr& snp,
+ logalloc::region& lsa_region,
+ logalloc::allocating_section& read_section) {
+ if (!snp.owned()) {
+ return;
+ }
+ // If no one else is using this particular snapshot try to merge partition
+ // versions.
+ with_allocator(lsa_region.allocator(), [&snp, &lsa_region, &read_section] {
+ return with_linearized_managed_bytes([&snp, &lsa_region, &read_section] {
+ try {
+ read_section(lsa_region, [&snp] {
+ snp->merge_partition_versions();
+ });
+ } catch (...) { }
+ snp = {};
+ });
+ });
+}
+
template
class partition_snapshot_reader : public streamed_mutation::impl, public MemoryAccounter {
struct rows_position {
@@ -45,21 +65,6 @@ class partition_snapshot_reader : public streamed_mutation::impl, public MemoryA
return _cmp(*b._position, *a._position);
}
};
- class rows_entry_compare {
- position_in_partition::less_compare _cmp;
- public:
- explicit rows_entry_compare(const schema& s) : _cmp(s) { }
- bool operator()(const rows_entry& a, const position_in_partition& b) const {
- position_in_partition_view a_view(position_in_partition_view::clustering_row_tag_t(),
- a.key());
- return _cmp(a_view, b);
- }
- bool operator()(const position_in_partition& a, const rows_entry& b) const {
- position_in_partition_view b_view(position_in_partition_view::clustering_row_tag_t(),
- b.key());
- return _cmp(a, b_view);
- }
- };
private:
// Keeps shared pointer to the container we read mutation from to make sure
// that its lifetime is appropriately extended.
@@ -70,8 +75,8 @@ private:
query::clustering_row_ranges::const_iterator _ck_range_end;
bool _in_ck_range = false;
- rows_entry_compare _cmp;
- clustering_key_prefix::equality _eq;
+ rows_entry::compare _cmp;
+ position_in_partition::equal_compare _eq;
heap_compare _heap_cmp;
lw_shared_ptr _snapshot;
@@ -94,8 +99,14 @@ private:
void refresh_iterators() {
_clustering_rows.clear();
- if (!_in_ck_range && _current_ck_range == _ck_range_end) {
- return;
+ if (!_in_ck_range) {
+ if (_current_ck_range == _ck_range_end) {
+ _end_of_stream = true;
+ return;
+ }
+ for (auto&& v : _snapshot->versions()) {
+ _range_tombstones.apply(v.partition().row_tombstones(), *_current_ck_range);
+ }
}
for (auto&& v : _snapshot->versions()) {
@@ -117,14 +128,27 @@ private:
boost::range::make_heap(_clustering_rows, _heap_cmp);
}
- void pop_clustering_row() {
+ // Valid if has_more_rows()
+ const rows_entry& pop_clustering_row() {
+ boost::range::pop_heap(_clustering_rows, _heap_cmp);
auto& current = _clustering_rows.back();
+ const rows_entry& e = *current._position;
current._position = std::next(current._position);
if (current._position == current._end) {
_clustering_rows.pop_back();
} else {
boost::range::push_heap(_clustering_rows, _heap_cmp);
}
+ return e;
+ }
+
+ // Valid if has_more_rows()
+ const rows_entry& peek_row() const {
+ return *_clustering_rows.front()._position;
+ }
+
+ bool has_more_rows() const {
+ return !_clustering_rows.empty();
}
mutation_fragment_opt read_static_row() {
@@ -143,20 +167,18 @@ private:
}
mutation_fragment_opt read_next() {
- if (!_clustering_rows.empty()) {
- auto mf = _range_tombstones.get_next(*_clustering_rows.front()._position);
+ while (has_more_rows()) {
+ auto mf = _range_tombstones.get_next(peek_row());
if (mf) {
return mf;
}
-
- boost::range::pop_heap(_clustering_rows, _heap_cmp);
- clustering_row result = *_clustering_rows.back()._position;
- pop_clustering_row();
- while (!_clustering_rows.empty() && _eq(_clustering_rows.front()._position->key(), result.key())) {
- boost::range::pop_heap(_clustering_rows, _heap_cmp);
- auto& current = _clustering_rows.back();
- result.apply(*_schema, *current._position);
- pop_clustering_row();
+ const rows_entry& e = pop_clustering_row();
+ if (e.dummy()) {
+ continue;
+ }
+ clustering_row result = e;
+ while (has_more_rows() && _eq(peek_row().position(), result.position())) {
+ result.apply(*_schema, pop_clustering_row());
}
_last_entry = position_in_partition(result.position());
return mutation_fragment(std::move(result));
@@ -184,18 +206,13 @@ private:
}
while (!is_end_of_stream() && !is_buffer_full()) {
- if (_in_ck_range && _clustering_rows.empty()) {
- _in_ck_range = false;
- _current_ck_range = std::next(_current_ck_range);
- refresh_iterators();
- continue;
- }
-
auto mfopt = read_next();
if (mfopt) {
emplace_mutation_fragment(std::move(*mfopt));
} else {
- _end_of_stream = true;
+ _in_ck_range = false;
+ _current_ck_range = std::next(_current_ck_range);
+ refresh_iterators();
}
}
}
@@ -226,31 +243,11 @@ public:
, _range_tombstones(*s)
, _lsa_region(region)
, _read_section(read_section) {
- for (auto&& v : _snapshot->versions()) {
- auto&& rt_list = v.partition().row_tombstones();
- for (auto&& range : _ck_ranges.ranges()) {
- _range_tombstones.apply(rt_list, range);
- }
- }
do_fill_buffer();
}
~partition_snapshot_reader() {
- if (!_snapshot.owned()) {
- return;
- }
- // If no one else is using this particular snapshot try to merge partition
- // versions.
- with_allocator(_lsa_region.allocator(), [this] {
- return with_linearized_managed_bytes([this] {
- try {
- _read_section(_lsa_region, [this] {
- _snapshot->merge_partition_versions();
- });
- } catch (...) { }
- _snapshot = {};
- });
- });
+ maybe_merge_versions(_snapshot, _lsa_region, _read_section);
}
virtual future<> fill_buffer() override {
diff --git a/partition_snapshot_row_cursor.hh b/partition_snapshot_row_cursor.hh
new file mode 100644
index 000000000000..e91fe7294ca7
--- /dev/null
+++ b/partition_snapshot_row_cursor.hh
@@ -0,0 +1,208 @@
+/*
+ * Copyright (C) 2017 ScyllaDB
+ */
+
+/*
+ * This file is part of Scylla.
+ *
+ * Scylla is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * Scylla is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Scylla. If not, see .
+ */
+
+#pragma once
+
+#include "partition_version.hh"
+
+// Allows iterating over rows of mutation_partition represented by given partition_snapshot.
+//
+// The cursor initially has a position before all rows and is not pointing at any row.
+// To position the cursor, use advance_to().
+//
+// All methods should be called with the region of the snapshot locked. The cursor is invalidated
+// when that lock section is left, or if the snapshot is modified.
+//
+// When the cursor is invalidated, it still maintains its previous position. It can be brought
+// back to validity by calling maybe_refresh(), or advance_to().
+//
+class partition_snapshot_row_cursor final {
+ struct position_in_version {
+ mutation_partition::rows_type::iterator it;
+ mutation_partition::rows_type::iterator end;
+ int version_no;
+
+ struct less_compare {
+ rows_entry::tri_compare _cmp;
+ public:
+ explicit less_compare(const schema& s) : _cmp(s) { }
+ bool operator()(const position_in_version& a, const position_in_version& b) {
+ auto res = _cmp(*a.it, *b.it);
+ return res > 0 || (res == 0 && a.version_no > b.version_no);
+ }
+ };
+ };
+
+ const schema& _schema;
+ logalloc::region& _region;
+ partition_snapshot& _snp;
+ std::vector _heap;
+ std::vector _current_row;
+ position_in_partition _position;
+ uint64_t _last_reclaim_count = 0;
+ size_t _last_versions_count = 0;
+
+ // Removes the next row from _heap and puts it into _current_row
+ void recreate_current_row() {
+ position_in_version::less_compare heap_less(_schema);
+ position_in_partition::equal_compare eq(_schema);
+ do {
+ boost::range::pop_heap(_heap, heap_less);
+ _current_row.push_back(_heap.back());
+ _heap.pop_back();
+ } while (!_heap.empty() && eq(_current_row[0].it->position(), _heap[0].it->position()));
+ _position = position_in_partition(_current_row[0].it->position());
+ }
+public:
+ partition_snapshot_row_cursor(const schema& s, logalloc::region& region, partition_snapshot& snp)
+ : _schema(s)
+ , _region(region)
+ , _snp(snp)
+ , _position(position_in_partition::static_row_tag_t{})
+ { }
+ bool has_up_to_date_row_from_latest_version() const {
+ return up_to_date() && _current_row[0].version_no == 0;
+ }
+ mutation_partition::rows_type::iterator get_iterator_in_latest_version() const {
+ return _current_row[0].it;
+ }
+ bool up_to_date() const {
+ return _region.reclaim_counter() == _last_reclaim_count && _last_versions_count == _snp.version_count();
+ }
+
+ // Brings back the cursor to validity.
+ // Can be only called when cursor is pointing at a row.
+ //
+ // Semantically equivalent to:
+ //
+ // advance_to(position());
+ //
+ // but avoids work if not necessary.
+ bool maybe_refresh() {
+ if (!up_to_date()) {
+ return advance_to(_position);
+ }
+ return true;
+ }
+
+ // Moves the cursor to the first entry with position >= pos.
+ //
+ // The caller must ensure that such entry exists.
+ //
+ // Returns true iff there can't be any clustering row entries
+ // between lower_bound (inclusive) and the entry to which the cursor
+ // was advanced.
+ //
+ // May be called when cursor is not valid.
+ // The cursor is valid after the call.
+ // Must be called under reclaim lock.
+ bool advance_to(position_in_partition_view lower_bound) {
+ rows_entry::compare less(_schema);
+ position_in_version::less_compare heap_less(_schema);
+ _heap.clear();
+ _current_row.clear();
+ int version_no = 0;
+ for (auto&& v : _snp.versions()) {
+ auto& rows = v.partition().clustered_rows();
+ auto pos = rows.lower_bound(lower_bound, less);
+ auto end = rows.end();
+ if (pos != end) {
+ _heap.push_back({pos, end, version_no});
+ }
+ ++version_no;
+ }
+ boost::range::make_heap(_heap, heap_less);
+ _last_reclaim_count = _region.reclaim_counter();
+ _last_versions_count = _snp.version_count();
+ bool found = no_clustering_row_between(_schema, lower_bound, _heap[0].it->position());
+ recreate_current_row();
+ return found;
+ }
+
+ // Advances the cursor to the next row.
+ // If there is no next row, returns false and the cursor is no longer pointing at a row.
+ // Can be only called on a valid cursor pointing at a row.
+ bool next() {
+ position_in_version::less_compare heap_less(_schema);
+ assert(up_to_date());
+ for (auto&& curr : _current_row) {
+ ++curr.it;
+ if (curr.it != curr.end) {
+ _heap.push_back(curr);
+ boost::range::push_heap(_heap, heap_less);
+ }
+ }
+ _current_row.clear();
+ if (_heap.empty()) {
+ return false;
+ }
+ recreate_current_row();
+ return true;
+ }
+
+ // Can be called only when cursor is valid and pointing at a row.
+ bool continuous() const { return bool(_current_row[0].it->continuous()); }
+
+ // Can be called only when cursor is valid and pointing at a row.
+ bool dummy() const { return bool(_current_row[0].it->dummy()); }
+
+ // Can be called only when cursor is valid and pointing at a row, and !dummy().
+ const clustering_key& key() const { return _current_row[0].it->key(); }
+
+ // Can be called only when cursor is valid and pointing at a row.
+ clustering_row row() const {
+ clustering_row result(key());
+ for (auto&& v : _current_row) {
+ result.apply(_schema, *v.it);
+ }
+ return result;
+ }
+
+ // Can be called when cursor is pointing at a row, even when invalid.
+ const position_in_partition& position() const {
+ return _position;
+ }
+
+ bool is_in_latest_version() const;
+ bool previous_row_in_latest_version_has_key(const clustering_key_prefix& key) const;
+ void set_continuous(bool val);
+};
+
+inline
+bool partition_snapshot_row_cursor::is_in_latest_version() const {
+ return _current_row[0].version_no == 0;
+}
+
+inline
+bool partition_snapshot_row_cursor::previous_row_in_latest_version_has_key(const clustering_key_prefix& key) const {
+ if (_current_row[0].it == _snp.version()->partition().clustered_rows().begin()) {
+ return false;
+ }
+ auto prev_it = _current_row[0].it;
+ --prev_it;
+ clustering_key_prefix::tri_compare tri_comp(_schema);
+ return tri_comp(prev_it->key(), key) == 0;
+}
+
+inline
+void partition_snapshot_row_cursor::set_continuous(bool val) {
+ _current_row[0].it->set_continuous(val);
+}
diff --git a/partition_version.cc b/partition_version.cc
index 6cbb0eb1744d..fa52861831c0 100644
--- a/partition_version.cc
+++ b/partition_version.cc
@@ -20,6 +20,7 @@
*/
#include
+#include
#include "partition_version.hh"
@@ -62,6 +63,72 @@ partition_version::~partition_version()
}
}
+namespace {
+
+GCC6_CONCEPT(
+
+// A functor which transforms objects from Domain into objects from CoDomain
+template
+concept bool Mapper() {
+ return requires(U obj, const Domain& src) {
+ { obj(src) } -> const CoDomain&
+ };
+}
+
+// A functor which merges two objects from Domain into one. The result is stored in the first argument.
+template
+concept bool Reducer() {
+ return requires(U obj, Domain& dst, const Domain& src) {
+ { obj(dst, src) } -> void;
+ };
+}
+
+)
+
+// Calculates the value of particular part of mutation_partition represented by
+// the version chain starting from v.
+// |map| extracts the part from each version.
+// |reduce| Combines parts from the two versions.
+template
+GCC6_CONCEPT(
+requires Mapper