Skip to content

Commit

Permalink
db: Avoiding checking bloom filters during compaction
Browse files Browse the repository at this point in the history
Checking bloom filters of sstables to compute max purgeable timestamp
for compaction is expensive in terms of CPU time. We can avoid
calculating it if we're not about to GC any tombstone.

This patch changes compacting functions to accept a function instead
of ready value for max_purgeable.

I verified that bloom filter operations no longer appear on flame
graphs during compaction-heavy workload (without tombstones).

Refs #1322.
  • Loading branch information
tgrabiec committed Jul 10, 2016
1 parent 74ff30a commit 8c4b5e4
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 40 deletions.
2 changes: 1 addition & 1 deletion db/schema_tables.hh
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ void feed_hash_for_schema_digest(Hasher& h, const mutation& m) {
// See https://issues.apache.org/jira/browse/CASSANDRA-6862.
// We achieve similar effect with compact_for_compaction().
mutation m_compacted(m);
m_compacted.partition().compact_for_compaction(*m.schema(), api::max_timestamp, gc_clock::time_point::max());
m_compacted.partition().compact_for_compaction(*m.schema(), always_gc, gc_clock::time_point::max());
feed_hash(h, m_compacted);
}

Expand Down
32 changes: 24 additions & 8 deletions mutation_compactor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ class compact_mutation {
gc_clock::time_point _query_time;
gc_clock::time_point _gc_before;
std::function<api::timestamp_type(const dht::decorated_key&)> _get_max_purgeable;
api::timestamp_type _max_purgeable = api::max_timestamp;
can_gc_fn _can_gc;
api::timestamp_type _max_purgeable = api::missing_timestamp;
const query::partition_slice& _slice;
uint32_t _row_limit{};
uint32_t _partition_limit{};
Expand Down Expand Up @@ -103,14 +104,30 @@ private:
}

bool can_purge_tombstone(const tombstone& t) {
return (!sstable_compaction() || t.timestamp < _max_purgeable) && t.deletion_time < _gc_before;
return t.deletion_time < _gc_before && can_gc(t);
};

bool can_gc(tombstone t) {
if (!sstable_compaction()) {
return true;
}
if (!t) {
return false;
}
if (_max_purgeable == api::missing_timestamp) {
_max_purgeable = _get_max_purgeable(*_dk);
}
return t.timestamp < _max_purgeable;
};
public:
compact_mutation(compact_mutation&&) = delete; // Because 'this' is captured

compact_mutation(const schema& s, gc_clock::time_point query_time, const query::partition_slice& slice, uint32_t limit,
uint32_t partition_limit, CompactedMutationsConsumer consumer)
: _schema(s)
, _query_time(query_time)
, _gc_before(query_time - s.gc_grace_seconds())
, _can_gc(always_gc)
, _slice(slice)
, _row_limit(limit)
, _partition_limit(partition_limit)
Expand All @@ -126,6 +143,7 @@ public:
, _query_time(compaction_time)
, _gc_before(_query_time - s.gc_grace_seconds())
, _get_max_purgeable(std::move(get_max_purgeable))
, _can_gc([this] (tombstone t) { return can_gc(t); })
, _slice(query::full_slice)
, _consumer(std::move(consumer))
{
Expand All @@ -143,9 +161,7 @@ public:
_current_tombstone = { };
_partition_tombstone = { };
_current_partition_limit = std::min(_row_limit, _partition_row_limit);
if (sstable_compaction()) {
_max_purgeable = _get_max_purgeable(dk);
}
_max_purgeable = api::missing_timestamp;
}

void consume(tombstone t) {
Expand All @@ -159,7 +175,7 @@ public:
stop_iteration consume(static_row&& sr) {
bool is_live = sr.cells().compact_and_expire(_schema, column_kind::static_column,
_partition_tombstone,
_query_time, _max_purgeable, _gc_before);
_query_time, _can_gc, _gc_before);
_static_row_live = is_live;
if (is_live || (!only_live() && !sr.empty())) {
partition_is_not_empty();
Expand All @@ -174,8 +190,8 @@ public:
if (cr.tomb() <= _current_tombstone || can_purge_tombstone(cr.tomb())) {
cr.remove_tombstone();
}
bool is_live = cr.marker().compact_and_expire(t, _query_time, _max_purgeable, _gc_before);
is_live |= cr.cells().compact_and_expire(_schema, column_kind::regular_column, t, _query_time, _max_purgeable, _gc_before);
bool is_live = cr.marker().compact_and_expire(t, _query_time, _can_gc, _gc_before);
is_live |= cr.cells().compact_and_expire(_schema, column_kind::regular_column, t, _query_time, _can_gc, _gc_before);
if (only_live() && is_live) {
partition_is_not_empty();
_consumer.consume(std::move(cr), true);
Expand Down
40 changes: 21 additions & 19 deletions mutation_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1144,30 +1144,30 @@ uint32_t mutation_partition::do_compact(const schema& s,
const std::vector<query::clustering_range>& row_ranges,
bool reverse,
uint32_t row_limit,
api::timestamp_type max_purgeable)
can_gc_fn& can_gc)
{
assert(row_limit > 0);

auto gc_before = query_time - s.gc_grace_seconds();

auto should_purge_tombstone = [&] (const tombstone& t) {
return t.deletion_time < gc_before && can_gc(t);
};

bool static_row_live = _static_row.compact_and_expire(s, column_kind::static_column, _tombstone,
query_time, max_purgeable, gc_before);
query_time, can_gc, gc_before);

uint32_t row_count = 0;

auto can_purge_tombstone = [&] (const tombstone& t) {
return t.timestamp < max_purgeable && t.deletion_time < gc_before;
};

auto row_callback = [&] (rows_entry& e) {
deletable_row& row = e.row();

tombstone tomb = tombstone_for_row(s, e);

bool is_live = row.cells().compact_and_expire(s, column_kind::regular_column, tomb, query_time, max_purgeable, gc_before);
is_live |= row.marker().compact_and_expire(tomb, query_time, max_purgeable, gc_before);
bool is_live = row.cells().compact_and_expire(s, column_kind::regular_column, tomb, query_time, can_gc, gc_before);
is_live |= row.marker().compact_and_expire(tomb, query_time, can_gc, gc_before);

if (can_purge_tombstone(row.deleted_at())) {
if (should_purge_tombstone(row.deleted_at())) {
row.remove_tombstone();
}

Expand Down Expand Up @@ -1198,9 +1198,9 @@ uint32_t mutation_partition::do_compact(const schema& s,
}

_row_tombstones.erase_where([&] (auto&& rt) {
return can_purge_tombstone(rt.tomb) || rt.tomb.timestamp <= _tombstone.timestamp;
return should_purge_tombstone(rt.tomb) || rt.tomb.timestamp <= _tombstone.timestamp;
});
if (can_purge_tombstone(_tombstone)) {
if (should_purge_tombstone(_tombstone)) {
_tombstone = tombstone();
}

Expand All @@ -1217,17 +1217,17 @@ mutation_partition::compact_for_query(
bool reverse,
uint32_t row_limit)
{
return do_compact(s, query_time, row_ranges, reverse, row_limit, api::max_timestamp);
return do_compact(s, query_time, row_ranges, reverse, row_limit, always_gc);
}

void mutation_partition::compact_for_compaction(const schema& s,
api::timestamp_type max_purgeable, gc_clock::time_point compaction_time)
can_gc_fn& can_gc, gc_clock::time_point compaction_time)
{
static const std::vector<query::clustering_range> all_rows = {
query::clustering_range::make_open_ended_both_sides()
};

do_compact(s, compaction_time, all_rows, false, query::max_rows, max_purgeable);
do_compact(s, compaction_time, all_rows, false, query::max_rows, can_gc);
}

// Returns true if there is no live data or tombstones.
Expand Down Expand Up @@ -1486,7 +1486,7 @@ void row::revert(const schema& s, column_kind kind, row& other) noexcept {
}

bool row::compact_and_expire(const schema& s, column_kind kind, tombstone tomb, gc_clock::time_point query_time,
api::timestamp_type max_purgeable, gc_clock::time_point gc_before)
can_gc_fn& can_gc, gc_clock::time_point gc_before)
{
bool any_live = false;
remove_if([&] (column_id id, atomic_cell_or_collection& c) {
Expand All @@ -1499,7 +1499,7 @@ bool row::compact_and_expire(const schema& s, column_kind kind, tombstone tomb,
} else if (cell.has_expired(query_time)) {
c = atomic_cell::make_dead(cell.timestamp(), cell.deletion_time());
} else if (!cell.is_live()) {
erase = cell.timestamp() < max_purgeable && cell.deletion_time() < gc_before;
erase = cell.deletion_time() < gc_before && can_gc(tombstone(cell.timestamp(), cell.deletion_time()));
} else {
any_live |= true;
}
Expand All @@ -1508,7 +1508,7 @@ bool row::compact_and_expire(const schema& s, column_kind kind, tombstone tomb,
auto&& ctype = static_pointer_cast<const collection_type_impl>(def.type);
auto m_view = ctype->deserialize_mutation_form(cell);
collection_type_impl::mutation m = m_view.materialize();
any_live |= m.compact_and_expire(tomb, query_time, max_purgeable, gc_before);
any_live |= m.compact_and_expire(tomb, query_time, can_gc, gc_before);
if (m.cells.empty() && m.tomb <= tomb) {
erase = true;
} else {
Expand Down Expand Up @@ -1814,7 +1814,8 @@ future<data_query_result> data_query(schema_ptr s, const mutation_source& source
auto is_reversed = slice.options.contains(query::partition_slice::option::reversed);

auto qrb = query_result_builder(*s, builder);
auto cfq = compact_for_query<emit_only_live_rows::yes, query_result_builder>(*s, query_time, slice, row_limit, partition_limit, std::move(qrb));
auto cfq = make_stable_flattened_mutations_consumer<compact_for_query<emit_only_live_rows::yes, query_result_builder>>(
*s, query_time, slice, row_limit, partition_limit, std::move(qrb));

auto reader = source(s, range, query::clustering_key_filtering_context::create(s, slice), service::get_local_sstable_query_read_priority());
return consume_flattened(std::move(reader), std::move(cfq), is_reversed);
Expand Down Expand Up @@ -1890,7 +1891,8 @@ mutation_query(schema_ptr s,
auto is_reversed = slice.options.contains(query::partition_slice::option::reversed);

auto rrb = reconcilable_result_builder(*s, slice);
auto cfq = compact_for_query<emit_only_live_rows::no, reconcilable_result_builder>(*s, query_time, slice, row_limit, partition_limit, std::move(rrb));
auto cfq = make_stable_flattened_mutations_consumer<compact_for_query<emit_only_live_rows::no, reconcilable_result_builder>>(
*s, query_time, slice, row_limit, partition_limit, std::move(rrb));

auto reader = source(s, range, query::clustering_key_filtering_context::create(s, slice), service::get_local_sstable_query_read_priority());
return consume_flattened(std::move(reader), std::move(cfq), is_reversed);
Expand Down
10 changes: 5 additions & 5 deletions mutation_partition.hh
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public:
// and max_purgeable. Removes cells covered by tomb.
// Returns true iff there are any live cells left.
bool compact_and_expire(const schema& s, column_kind kind, tombstone tomb, gc_clock::time_point query_time,
api::timestamp_type max_purgeable, gc_clock::time_point gc_before);
can_gc_fn&, gc_clock::time_point gc_before);

row difference(const schema&, column_kind, const row& other) const;

Expand Down Expand Up @@ -344,7 +344,7 @@ public:
// tombstones.
// Returns true if row marker is live.
bool compact_and_expire(tombstone tomb, gc_clock::time_point now,
api::timestamp_type max_purgeable, gc_clock::time_point gc_before) {
can_gc_fn& can_gc, gc_clock::time_point gc_before) {
if (is_missing()) {
return false;
}
Expand All @@ -356,7 +356,7 @@ public:
_expiry -= _ttl;
_ttl = dead;
}
if (_ttl == dead && _timestamp < max_purgeable && _expiry < gc_before) {
if (_ttl == dead && _expiry < gc_before && can_gc(tombstone(_timestamp, _expiry))) {
_timestamp = api::missing_timestamp;
}
return !is_missing() && _ttl != dead;
Expand Down Expand Up @@ -626,7 +626,7 @@ private:
const std::vector<query::clustering_range>& row_ranges,
bool reverse,
uint32_t row_limit,
api::timestamp_type max_purgeable);
can_gc_fn&);

// Calls func for each row entry inside row_ranges until func returns stop_iteration::yes.
// Removes all entries for which func didn't return stop_iteration::no or wasn't called at all.
Expand Down Expand Up @@ -661,7 +661,7 @@ public:
// - expires cells based on compaction_time
// - drops cells covered by higher-level tombstones
// - drops expired tombstones which timestamp is before max_purgeable
void compact_for_compaction(const schema& s, api::timestamp_type max_purgeable,
void compact_for_compaction(const schema& s, can_gc_fn&,
gc_clock::time_point compaction_time);

// Returns the minimal mutation_partition that when applied to "other" will
Expand Down
3 changes: 2 additions & 1 deletion sstables/compaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ compact_sstables(std::vector<shared_sstable> sstables, column_family& cf, std::f
return get_max_purgeable_timestamp(schema, not_compacted_sstables, dk);
};
auto cr = compacting_sstable_writer(*schema, creator, partitions_per_sstable, max_sstable_size, sstable_level, rp, std::move(ancestors), *info);
auto cfc = compact_for_compaction<compacting_sstable_writer>(*schema, gc_clock::now(), std::move(cr), get_max_purgeable);
auto cfc = make_stable_flattened_mutations_consumer<compact_for_compaction<compacting_sstable_writer>>(
*schema, gc_clock::now(), std::move(cr), get_max_purgeable);

auto filter = [cleanup, sorted_owned_ranges = std::move(owned_ranges)] (const streamed_mutation& sm) {
if (dht::shard_of(sm.decorated_key().token()) != engine().cpu_id()) {
Expand Down
4 changes: 2 additions & 2 deletions tests/mutation_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1045,7 +1045,7 @@ SEASTAR_TEST_CASE(test_mutation_hash) {

static mutation compacted(const mutation& m) {
auto result = m;
result.partition().compact_for_compaction(*result.schema(), api::max_timestamp, gc_clock::now());
result.partition().compact_for_compaction(*result.schema(), always_gc, gc_clock::now());
return result;
}

Expand Down Expand Up @@ -1307,7 +1307,7 @@ SEASTAR_TEST_CASE(test_tombstone_purge) {
tombstone tomb(api::new_timestamp(), gc_clock::now() - std::chrono::seconds(1));
m.partition().apply(tomb);
BOOST_REQUIRE(!m.partition().empty());
m.partition().compact_for_compaction(*s, api::max_timestamp, gc_clock::now());
m.partition().compact_for_compaction(*s, always_gc, gc_clock::now());
// Check that row was covered by tombstone.
BOOST_REQUIRE(m.partition().empty());
// Check that tombstone was purged after compact_for_compaction().
Expand Down
7 changes: 7 additions & 0 deletions tombstone.hh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

#pragma once

#include <functional>

#include "timestamp.hh"
#include "gc_clock.hh"
#include "hashing.hh"
Expand Down Expand Up @@ -118,3 +120,8 @@ struct appending_hash<tombstone> {
feed_hash(h, t.deletion_time);
}
};

// Determines whether tombstone may be GC-ed.
using can_gc_fn = std::function<bool(tombstone)>;

static can_gc_fn always_gc = [] (tombstone) { return true; };
6 changes: 3 additions & 3 deletions types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1914,7 +1914,7 @@ do_serialize_mutation_form(
}

bool collection_type_impl::mutation::compact_and_expire(tombstone base_tomb, gc_clock::time_point query_time,
api::timestamp_type max_purgeable, gc_clock::time_point gc_before)
can_gc_fn& can_gc, gc_clock::time_point gc_before)
{
bool any_live = false;
tomb.apply(base_tomb);
Expand All @@ -1928,7 +1928,7 @@ bool collection_type_impl::mutation::compact_and_expire(tombstone base_tomb, gc_
survivors.emplace_back(std::make_pair(
std::move(name_and_cell.first), atomic_cell::make_dead(cell.timestamp(), cell.deletion_time())));
} else if (!cell.is_live()) {
if (cell.timestamp() >= max_purgeable || cell.deletion_time() >= gc_before) {
if (cell.deletion_time() >= gc_before || !can_gc(tombstone(cell.timestamp(), cell.deletion_time()))) {
survivors.emplace_back(std::move(name_and_cell));
}
} else {
Expand All @@ -1937,7 +1937,7 @@ bool collection_type_impl::mutation::compact_and_expire(tombstone base_tomb, gc_
}
}
cells = std::move(survivors);
if (tomb.timestamp < max_purgeable && tomb.deletion_time < gc_before) {
if (tomb.deletion_time < gc_before && can_gc(tomb)) {
tomb = tombstone();
}
return any_live;
Expand Down
2 changes: 1 addition & 1 deletion types.hh
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ public:
// Expires cells based on query_time. Expires tombstones based on max_purgeable and gc_before.
// Removes cells covered by tomb or this->tomb.
bool compact_and_expire(tombstone tomb, gc_clock::time_point query_time,
api::timestamp_type max_purgeable, gc_clock::time_point gc_before);
can_gc_fn&, gc_clock::time_point gc_before);
};
struct mutation_view {
tombstone tomb;
Expand Down

0 comments on commit 8c4b5e4

Please sign in to comment.