Skip to content

Commit

Permalink
Merge "Implement partial cache" from Tomasz and Piotr
Browse files Browse the repository at this point in the history
"This series enables cache to keep partial partitions.
Reads no longer have to read whole partition from sstables
in order to cache the result.

The 10MB threshold for partition size in cache is lifted.

Known issues:

 - There is no partial eviction yet, whole partitions are still evicted,
   and partition snapshots held by active reads are not evictable at all
 - Information about range continuity is not recorded if that
   would require inserting a dummy entry, or if previous entry
   doesn't belong to the latest snapshot
 - Cache update after memtable flush happening concurrently with reads
   may inhibit that reads' ability to populate cache (new issue)
 - Cache update from flushed memtables has partition granularity,
   so may cause latency problems with large partition
 - Schema is still tracked per-partition, so after schema changes
   reads may induce high latency due to whole partition needing
   to be converted atomically
 - Range tombstones are repeated in the stream for every range between
   cache entries they cover (new issue)
 - Populating scans for both small and large partitions (perf_fast_forward)
   experienced a 40% reduction of throughput, CPU bound

How was this tested:

 - test.py --mode release
 - row_cache_stress_test -c1 -m1G
 - perf_fast_forward, passes except for the test case checking range continuity population
   which would require inserting a dummy entry (mentioned above)
 - perf_simple_query (-c1 -m1G --duration 32):
     before: 90k [ops/s] stdev: 4k [ops/s]
     after:  94k [ops/s] stdev: 2k [ops/s]"

* tag 'tgrabiec/introduce-partial-cache-v8' of github.com:cloudius-systems/seastar-dev: (130 commits)
  tests: row_cache: Add test_tombstone_merging_in_partial_partition test case
  tests: Introduce row_cache_stress_test
  utils: Add helpers for dealing with nonwrapping_range<int>
  tests: simple_schema: Allow passing the tombstone to make_range_tombstone()
  tests: simple_schema: Accept value by reference
  tests: simple_schema: Make add_row() accept optional timestamp
  tests: simple_schema: Make new_timestamp() public
  tests: simple_schema: Introduce make_ckeys()
  tests: simple_schema: Introduce get_value(const clustered_row&) helper
  tests: simple_schema: Fix comment
  tests: simple_schema: Add missing include
  row_cache: Introduce evict()
  tests: Add cache_streamed_mutation_test
  tests: mutation_assertions: Allow expecting fragments
  mutation_fragment: Implement equality check
  tests: row_cache: Add test for population of random partitions
  tests: row_cache: Add test for partition tombstone population
  tests: row_cache: Test reading randomly populated partition
  tests: row_cache: Add test_single_partition_update()
  tests: row_cache: Add test_scan_with_partial_partitions
  ...
  • Loading branch information
avikivity committed Jun 26, 2017
2 parents 555621b + b0bcf2b commit 9b21a9b
Show file tree
Hide file tree
Showing 54 changed files with 5,954 additions and 1,568 deletions.
482 changes: 482 additions & 0 deletions cache_streamed_mutation.hh

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion clustering_bounds_comparator.hh
Expand Up @@ -54,8 +54,8 @@ static inline bound_kind flip_bound_kind(bound_kind bk)
}

class bound_view {
const static thread_local clustering_key empty_prefix;
public:
const static thread_local clustering_key empty_prefix;
const clustering_key_prefix& prefix;
bound_kind kind;
bound_view(const clustering_key_prefix& prefix, bound_kind kind)
Expand Down
3 changes: 3 additions & 0 deletions configure.py
Expand Up @@ -184,6 +184,8 @@ def endswith(self, end):
'tests/perf/perf_cql_parser',
'tests/perf/perf_simple_query',
'tests/perf/perf_fast_forward',
'tests/cache_streamed_mutation_test',
'tests/row_cache_stress_test',
'tests/memory_footprint',
'tests/perf/perf_sstable',
'tests/cql_query_test',
Expand Down Expand Up @@ -625,6 +627,7 @@ def endswith(self, end):
'tests/message',
'tests/perf/perf_simple_query',
'tests/perf/perf_fast_forward',
'tests/row_cache_stress_test',
'tests/memory_footprint',
'tests/gossip',
'tests/perf/perf_sstable',
Expand Down
19 changes: 15 additions & 4 deletions converting_mutation_partition_applier.hh
Expand Up @@ -22,6 +22,7 @@
#pragma once

#include "mutation_partition_view.hh"
#include "mutation_partition.hh"
#include "schema.hh"

// Mutation partition visitor which applies visited data into
Expand All @@ -37,12 +38,12 @@ private:
static bool is_compatible(const column_definition& new_def, const data_type& old_type, column_kind kind) {
return ::is_compatible(new_def.kind, kind) && new_def.type->is_value_compatible_with(*old_type);
}
void accept_cell(row& dst, column_kind kind, const column_definition& new_def, const data_type& old_type, atomic_cell_view cell) {
static void accept_cell(row& dst, column_kind kind, const column_definition& new_def, const data_type& old_type, atomic_cell_view cell) {
if (is_compatible(new_def, old_type, kind) && cell.timestamp() > new_def.dropped_at()) {
dst.apply(new_def, atomic_cell_or_collection(cell));
}
}
void accept_cell(row& dst, column_kind kind, const column_definition& new_def, const data_type& old_type, collection_mutation_view cell) {
static void accept_cell(row& dst, column_kind kind, const column_definition& new_def, const data_type& old_type, collection_mutation_view cell) {
if (!is_compatible(new_def, old_type, kind)) {
return;
}
Expand Down Expand Up @@ -94,8 +95,8 @@ public:
_p.apply_row_tombstone(_p_schema, rt);
}

virtual void accept_row(clustering_key_view key, const row_tombstone& deleted_at, const row_marker& rm) override {
deletable_row& r = _p.clustered_row(_p_schema, key);
virtual void accept_row(position_in_partition_view key, const row_tombstone& deleted_at, const row_marker& rm, is_dummy dummy, is_continuous continuous) override {
deletable_row& r = _p.clustered_row(_p_schema, key, dummy, continuous);
r.apply(rm);
r.apply(deleted_at);
_current_row = &r;
Expand All @@ -116,4 +117,14 @@ public:
accept_cell(_current_row->cells(), column_kind::regular_column, *def, col.type(), collection);
}
}

// Appends the cell to dst upgrading it to the new schema.
// Cells must have monotonic names.
static void append_cell(row& dst, column_kind kind, const column_definition& new_def, const data_type& old_type, const atomic_cell_or_collection& cell) {
if (new_def.is_atomic()) {
accept_cell(dst, kind, new_def, old_type, cell.as_atomic_cell());
} else {
accept_cell(dst, kind, new_def, old_type, cell.as_collection_mutation());
}
}
};
2 changes: 1 addition & 1 deletion cql3/statements/batch_statement.cc
Expand Up @@ -233,7 +233,7 @@ void batch_statement::verify_batch_size(const std::vector<mutation>& mutations)
size += v.data.size();
}
void accept_row_tombstone(const range_tombstone&) override {}
void accept_row(clustering_key_view, const row_tombstone&, const row_marker&) override {}
void accept_row(position_in_partition_view, const row_tombstone&, const row_marker&, is_dummy, is_continuous) override {}
void accept_row_cell(column_id, atomic_cell_view v) override {
size += v.value().size();
}
Expand Down
86 changes: 49 additions & 37 deletions database.cc
Expand Up @@ -144,7 +144,7 @@ column_family::column_family(schema_ptr schema, config config, db::commitlog* cl
, _streaming_memtables(_config.enable_disk_writes ? make_streaming_memtable_list() : make_memory_only_memtable_list())
, _compaction_strategy(make_compaction_strategy(_schema->compaction_strategy(), _schema->compaction_strategy_options()))
, _sstables(make_lw_shared(_compaction_strategy.make_sstable_set(_schema)))
, _cache(_schema, sstables_as_mutation_source(), global_cache_tracker(), _config.max_cached_partition_size_in_bytes)
, _cache(_schema, sstables_as_snapshot_source(), global_cache_tracker())
, _commitlog(cl)
, _compaction_manager(compaction_manager)
, _flush_queue(std::make_unique<memtable_flush_queue>())
Expand Down Expand Up @@ -183,7 +183,24 @@ column_family::sstables_as_mutation_source() {
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
return make_sstable_reader(std::move(s), r, slice, pc, std::move(trace_state), fwd, fwd_mr);
return make_sstable_reader(std::move(s), _sstables, r, slice, pc, std::move(trace_state), fwd, fwd_mr);
});
}

snapshot_source
column_family::sstables_as_snapshot_source() {
return snapshot_source([this] () {
// FIXME: Will keep sstables on disk until next memtable flush. Make compaction force cache refresh.
auto sst_set = _sstables;
return mutation_source([this, sst_set = std::move(sst_set)] (schema_ptr s,
const dht::partition_range& r,
const query::partition_slice& slice,
const io_priority_class& pc,
tracing::trace_state_ptr trace_state,
streamed_mutation::forwarding fwd,
mutation_reader::forwarding fwd_mr) {
return make_sstable_reader(std::move(s), sst_set, r, slice, pc, std::move(trace_state), fwd, fwd_mr);
});
});
}

Expand Down Expand Up @@ -529,6 +546,7 @@ class single_key_sstable_reader final : public mutation_reader::impl {

mutation_reader
column_family::make_sstable_reader(schema_ptr s,
lw_shared_ptr<sstables::sstable_set> sstables,
const dht::partition_range& pr,
const query::partition_slice& slice,
const io_priority_class& pc,
Expand All @@ -555,11 +573,11 @@ column_family::make_sstable_reader(schema_ptr s,
if (dht::shard_of(pos.token()) != engine().cpu_id()) {
return make_empty_reader(); // range doesn't belong to this shard
}
return restrict_reader(make_mutation_reader<single_key_sstable_reader>(const_cast<column_family*>(this), std::move(s), _sstables,
return restrict_reader(make_mutation_reader<single_key_sstable_reader>(const_cast<column_family*>(this), std::move(s), std::move(sstables),
_stats.estimated_sstable_per_read, pr, slice, pc, std::move(trace_state), fwd));
} else {
// range_sstable_reader is not movable so we need to wrap it
return restrict_reader(make_mutation_reader<range_sstable_reader>(std::move(s), _sstables, pr, slice, pc, std::move(trace_state), fwd, fwd_mr));
return restrict_reader(make_mutation_reader<range_sstable_reader>(std::move(s), std::move(sstables), pr, slice, pc, std::move(trace_state), fwd, fwd_mr));
}
}

Expand Down Expand Up @@ -643,7 +661,7 @@ column_family::make_reader(schema_ptr s,
if (_config.enable_cache) {
readers.emplace_back(_cache.make_reader(s, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
} else {
readers.emplace_back(make_sstable_reader(s, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
readers.emplace_back(make_sstable_reader(s, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
}

return make_combined_reader(std::move(readers));
Expand All @@ -662,7 +680,7 @@ column_family::make_streaming_reader(schema_ptr s,
readers.emplace_back(mt->make_reader(s, range, slice, pc, nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no));
}

readers.emplace_back(make_sstable_reader(s, range, slice, pc, nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no));
readers.emplace_back(make_sstable_reader(s, _sstables, range, slice, pc, nullptr, streamed_mutation::forwarding::no, mutation_reader::forwarding::no));

return make_combined_reader(std::move(readers));
}
Expand All @@ -680,7 +698,7 @@ column_family::make_streaming_reader(schema_ptr s,
for (auto&& mt : *_memtables) {
readers.emplace_back(mt->make_reader(s, range, slice, pc, trace_state, fwd, fwd_mr));
}
readers.emplace_back(make_sstable_reader(s, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
readers.emplace_back(make_sstable_reader(s, _sstables, range, slice, pc, std::move(trace_state), fwd, fwd_mr));
return make_combined_reader(std::move(readers));
});

Expand Down Expand Up @@ -866,19 +884,19 @@ column_family::seal_active_streaming_memtable_immediate() {
// If we ever need to, we'll keep them separate statistics, but we don't want to polute the
// main stats about memtables with streaming memtables.
//
// Second, we will not bother touching the cache after this flush. The current streaming code
// will invalidate the ranges it touches, so we won't do it twice. Even when that changes, the
// cache management code in here will have to differ from the main memtable's one. Please see
// the comment at flush_streaming_mutations() for details.
//
// Lastly, we don't have any commitlog RP to update, and we don't need to deal manipulate the
// memtable list, since this memtable was not available for reading up until this point.
return write_memtable_to_sstable(*old, newtab, incremental_backups_enabled(), priority).then([this, newtab, old] {
return newtab->open_data();
}).then([this, old, newtab] () {
add_sstable(newtab, {engine().cpu_id()});
trigger_compaction();
return old->clear_gently();
// Cache synchronization must be started atomically with add_sstable()
if (_config.enable_cache) {
return _cache.update_invalidating(*old);
} else {
return old->clear_gently();
}
}).handle_exception([old] (auto ep) {
dblog.error("failed to write streamed sstable: {}", ep);
return make_exception_future<>(ep);
Expand Down Expand Up @@ -1791,7 +1809,7 @@ future<> distributed_loader::load_new_sstables(distributed<database>& db, sstrin
cf.trigger_compaction();
// Drop entire cache for this column family because it may be populated
// with stale data.
return cf.get_row_cache().clear();
return cf.get_row_cache().invalidate();
});
}).then([&db, ks, cf] () mutable {
return smp::submit_to(0, [&db, ks = std::move(ks), cf = std::move(cf)] () mutable {
Expand Down Expand Up @@ -1824,6 +1842,7 @@ future<sstables::entry_descriptor> distributed_loader::probe_file(distributed<da
return cf.open_sstable(std::move(info), sstdir, comps.generation, comps.version, comps.format).then([&cf] (sstables::shared_sstable sst) mutable {
if (sst) {
cf.load_sstable(sst);
return cf.get_row_cache().invalidate();
}
return make_ready_future<>();
});
Expand Down Expand Up @@ -2566,7 +2585,6 @@ keyspace::make_column_family_config(const schema& s, const db::config& db_config
cfg.streaming_read_concurrency_config = _config.streaming_read_concurrency_config;
cfg.cf_stats = _config.cf_stats;
cfg.enable_incremental_backups = _config.enable_incremental_backups;
cfg.max_cached_partition_size_in_bytes = db_config.max_cached_partition_size_in_kb() * 1024;

return cfg;
}
Expand Down Expand Up @@ -3797,28 +3815,26 @@ future<> column_family::flush_streaming_mutations(utils::UUID plan_id, dht::part
// be to change seal_active_streaming_memtable_delayed to take a range parameter. However, we
// need this code to go away as soon as we can (see FIXME above). So the double gate is a better
// temporary counter measure.
return with_gate(_streaming_flush_gate, [this, plan_id, ranges = std::move(ranges)] {
return flush_streaming_big_mutations(plan_id).then([this] {
return _streaming_memtables->seal_active_memtable(memtable_list::flush_behavior::delayed);
}).finally([this] {
return _streaming_flush_phaser.advance_and_await();
}).finally([this, ranges = std::move(ranges)] {
if (!_config.enable_cache) {
return make_ready_future<>();
}
return do_with(std::move(ranges), [this] (auto& ranges) {
return parallel_for_each(ranges, [this](auto&& range) {
return _cache.invalidate(range);
});
return with_gate(_streaming_flush_gate, [this, plan_id, ranges = std::move(ranges)] () mutable {
return flush_streaming_big_mutations(plan_id).then([this, ranges = std::move(ranges)] (auto sstables) mutable {
return _streaming_memtables->seal_active_memtable(memtable_list::flush_behavior::delayed).then([this] {
return _streaming_flush_phaser.advance_and_await();
}).then([this, sstables = std::move(sstables), ranges = std::move(ranges)] () mutable {
for (auto&& sst : sstables) {
// seal_active_streaming_memtable_big() ensures sst is unshared.
this->add_sstable(sst, {engine().cpu_id()});
}
this->trigger_compaction();
return _cache.invalidate(std::move(ranges));
});
});
});
}

future<> column_family::flush_streaming_big_mutations(utils::UUID plan_id) {
future<std::vector<sstables::shared_sstable>> column_family::flush_streaming_big_mutations(utils::UUID plan_id) {
auto it = _streaming_memtables_big.find(plan_id);
if (it == _streaming_memtables_big.end()) {
return make_ready_future<>();
return make_ready_future<std::vector<sstables::shared_sstable>>(std::vector<sstables::shared_sstable>());
}
auto entry = it->second;
_streaming_memtables_big.erase(it);
Expand All @@ -3830,11 +3846,7 @@ future<> column_family::flush_streaming_big_mutations(utils::UUID plan_id) {
return sst->open_data();
});
}).then([this, entry] {
for (auto&& sst : entry->sstables) {
// seal_active_streaming_memtable_big() ensures sst is unshared.
add_sstable(sst, {engine().cpu_id()});
}
trigger_compaction();
return std::move(entry->sstables);
});
});
}
Expand Down Expand Up @@ -3862,7 +3874,7 @@ future<> column_family::clear() {
_streaming_memtables->clear();
_streaming_memtables->add_memtable();
_streaming_memtables_big.clear();
return _cache.clear();
return _cache.invalidate();
}

// NOTE: does not need to be futurized, but might eventually, depending on
Expand All @@ -3888,7 +3900,7 @@ future<db::replay_position> column_family::discard_sstables(db_clock::time_point

_sstables = std::move(pruned);
dblog.debug("cleaning out row cache");
return _cache.clear().then([rp, remove = std::move(remove)] () mutable {
return _cache.invalidate().then([rp, remove = std::move(remove)] () mutable {
return parallel_for_each(remove, [](sstables::shared_sstable s) {
return sstables::delete_atomically({s});
}).then([rp] {
Expand Down
11 changes: 7 additions & 4 deletions database.hh
Expand Up @@ -429,7 +429,6 @@ public:
restricted_mutation_reader_config read_concurrency_config;
restricted_mutation_reader_config streaming_read_concurrency_config;
::cf_stats* cf_stats = nullptr;
uint64_t max_cached_partition_size_in_bytes;
};
struct no_commitlog {};
struct stats {
Expand Down Expand Up @@ -505,7 +504,7 @@ private:
};
std::unordered_map<utils::UUID, lw_shared_ptr<streaming_memtable_big>> _streaming_memtables_big;

future<> flush_streaming_big_mutations(utils::UUID plan_id);
future<std::vector<sstables::shared_sstable>> flush_streaming_big_mutations(utils::UUID plan_id);
void apply_streaming_big_mutation(schema_ptr m_schema, utils::UUID plan_id, const frozen_mutation& m);
future<> seal_active_streaming_memtable_big(streaming_memtable_big& smb);

Expand Down Expand Up @@ -575,7 +574,9 @@ private:
private:
void update_stats_for_new_sstable(uint64_t disk_space_used_by_sstable, std::vector<unsigned>&& shards_for_the_sstable);
// Adds new sstable to the set of sstables
// Doesn't update the cache.
// Doesn't update the cache. The cache must be synchronized in order for reads to see
// the writes contained in this sstable.
// Cache must be synchronized atomically with this, otherwise write atomicity may not be respected.
// Doesn't trigger compaction.
void add_sstable(lw_shared_ptr<sstables::sstable> sstable, std::vector<unsigned>&& shards_for_the_sstable);
// returns an empty pointer if sstable doesn't belong to current shard.
Expand Down Expand Up @@ -619,11 +620,12 @@ private:
void remove_ancestors_needed_rewrite(std::unordered_set<uint64_t> ancestors);
private:
mutation_source_opt _virtual_reader;
// Creates a mutation reader which covers sstables.
// Creates a mutation reader which covers given sstables.
// Caller needs to ensure that column_family remains live (FIXME: relax this).
// The 'range' parameter must be live as long as the reader is used.
// Mutations returned by the reader will all have given schema.
mutation_reader make_sstable_reader(schema_ptr schema,
lw_shared_ptr<sstables::sstable_set> sstables,
const dht::partition_range& range,
const query::partition_slice& slice,
const io_priority_class& pc,
Expand All @@ -632,6 +634,7 @@ private:
mutation_reader::forwarding fwd_mr) const;

mutation_source sstables_as_mutation_source();
snapshot_source sstables_as_snapshot_source();
partition_presence_checker make_partition_presence_checker(lw_shared_ptr<sstables::sstable_set>);
std::chrono::steady_clock::time_point _sstable_writes_disabled_at;
void do_trigger_compaction();
Expand Down
3 changes: 0 additions & 3 deletions db/config.hh
Expand Up @@ -373,9 +373,6 @@ public:
val(reduce_cache_sizes_at, double, .85, Invalid, \
"When Java heap usage (after a full concurrent mark sweep (CMS) garbage collection) exceeds this percentage, Cassandra reduces the cache capacity to the fraction of the current size as specified by reduce_cache_capacity_to. To disable, set the value to 1.0." \
) \
val(max_cached_partition_size_in_kb, uint64_t, 10240uLL, Used, \
"Partitions with size greater than this value won't be cached." \
) \
/* Disks settings */ \
val(stream_throughput_outbound_megabits_per_sec, uint32_t, 400, Unused, \
"Throttles all outbound streaming file transfers on a node to the specified throughput. Cassandra does mostly sequential I/O when streaming data during bootstrap or repair, which can lead to saturating the network connection and degrading client (RPC) performance." \
Expand Down

0 comments on commit 9b21a9b

Please sign in to comment.