Skip to content

Commit

Permalink
Merge 'Store and propagage GC timestamp markers from commitlog' from …
Browse files Browse the repository at this point in the history
…Calle Wilund

Fixes scylladb#14870

(Originally suggested by @avikivity). Use commit log stored GC clock min positions to narrow compaction GC bounds.
(Still requires augmented manual flush:es with extensive CL clearing to pass various dtest, but this does not affect "real" execution).

Adds a lowest timestamp of GC clock whenever a CF is added to a CL segment the first time. Because GC clock is wall
clock time and only connected to TTL (not cell/row timestamps), this gives a fairly accurate view of GC low bounds
per segment. This is then (in a rather ugly way) propagated to tombstone_gc_state to narrow the allowed GC bounds for
a CF, based on what is currently left in CL.

Note: this is a rather unoptimized version - no caching or anything. But even so, should not be excessively expensive,
esp. since various other code paths already cache the results.

Closes scylladb#15060

* github.com:scylladb/scylladb:
  main/cql_test_env: Augment compaction mgr tombstone_gc_state with CL GC info
  tombstone_gc_state: Add optional callback to augment GC bounds
  commitlog: Add keeping track of approximate lowest GC clock for CF entries
  database: Force new commitlog segment on user initiated flush
  commitlog: Add helper to force new active segment
  • Loading branch information
avikivity committed Oct 17, 2023
2 parents 7718f76 + 3378c24 commit f42eb4d
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 11 deletions.
39 changes: 39 additions & 0 deletions db/commitlog/commitlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,9 @@ class db::commitlog::segment_manager : public ::enable_shared_from_this<segment_
void discard_unused_segments() noexcept;
void discard_completed_segments(const cf_id_type&) noexcept;
void discard_completed_segments(const cf_id_type&, const rp_set&) noexcept;

future<> force_new_active_segment() noexcept;

void on_timer();
void sync();
void arm(uint32_t extra = 0) {
Expand All @@ -480,6 +483,8 @@ class db::commitlog::segment_manager : public ::enable_shared_from_this<segment_
future<std::vector<descriptor>> list_descriptors(sstring dir) const;
future<std::vector<sstring>> get_segments_to_replay() const;

gc_clock::time_point min_gc_time(const cf_id_type&) const;

flush_handler_id add_flush_handler(flush_handler h) {
auto id = ++_flush_ids;
_flush_handlers[id] = std::move(h);
Expand Down Expand Up @@ -689,6 +694,7 @@ class db::commitlog::segment : public enable_shared_from_this<segment>, public c
buffer_type _buffer;
fragmented_temporary_buffer::ostream _buffer_ostream;
std::unordered_map<cf_id_type, uint64_t> _cf_dirty;
std::unordered_map<cf_id_type, gc_clock::time_point> _cf_min_time;
time_point _sync_time;
utils::flush_queue<replay_position, std::less<replay_position>, clock_type> _pending_ops;

Expand Down Expand Up @@ -1194,6 +1200,7 @@ class db::commitlog::segment : public enable_shared_from_this<segment>, public c
auto es = entry_size + entry_overhead_size;

_cf_dirty[id]++; // increase use count for cf.
_cf_min_time.emplace(id, gc_clock::now()); // if value already exists this does nothing.

rp_handle h(static_pointer_cast<cf_holder>(shared_from_this()), std::move(id), rp);

Expand Down Expand Up @@ -1297,6 +1304,10 @@ class db::commitlog::segment : public enable_shared_from_this<segment>, public c
sstring get_segment_name() const {
return _desc.filename();
}
gc_clock::time_point min_time(const cf_id_type& id) const {
auto i = _cf_min_time.find(id);
return i == _cf_min_time.end() ? gc_clock::time_point::max() : i->second;
}
};

template<typename T, typename R>
Expand Down Expand Up @@ -1483,6 +1494,14 @@ future<std::vector<sstring>> db::commitlog::segment_manager::get_segments_to_rep
co_return segments_to_replay;
}

gc_clock::time_point db::commitlog::segment_manager::min_gc_time(const cf_id_type& id) const {
auto res = gc_clock::time_point::max();
for (auto& s : _segments) {
res = std::min(res, s->min_time(id));
}
return res;
}

future<> db::commitlog::segment_manager::init() {
auto descs = co_await list_descriptors(cfg.commit_log_location);

Expand Down Expand Up @@ -1899,6 +1918,18 @@ void db::commitlog::segment_manager::discard_completed_segments(const cf_id_type
discard_unused_segments();
}

future<> db::commitlog::segment_manager::force_new_active_segment() noexcept {
if (_segments.empty() || !_segments.back()->is_still_allocating()) {
co_return;
}

auto& s = _segments.back();
if (s->position()) { // check used.
co_await s->close();
discard_unused_segments();
}
}

namespace db {

std::ostream& operator<<(std::ostream& out, const db::commitlog::segment& s) {
Expand Down Expand Up @@ -2517,6 +2548,10 @@ void db::commitlog::discard_completed_segments(const cf_id_type& id) {
_segment_manager->discard_completed_segments(id);
}

future<> db::commitlog::force_new_active_segment() noexcept {
co_await _segment_manager->force_new_active_segment();
}

future<> db::commitlog::sync_all_segments() {
return _segment_manager->sync_all_segments();
}
Expand Down Expand Up @@ -2968,6 +3003,10 @@ future<std::vector<sstring>> db::commitlog::list_existing_segments(const sstring
});
}

gc_clock::time_point db::commitlog::min_gc_time(const cf_id_type& id) const {
return _segment_manager->min_gc_time(id);
}

future<std::vector<sstring>> db::commitlog::get_segments_to_replay() const {
return _segment_manager->get_segments_to_replay();
}
Expand Down
10 changes: 10 additions & 0 deletions db/commitlog/commitlog.hh
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "replay_position.hh"
#include "commitlog_entry.hh"
#include "db/timeout_clock.hh"
#include "gc_clock.hh"
#include "utils/fragmented_temporary_buffer.hh"

namespace seastar { class file; }
Expand Down Expand Up @@ -228,6 +229,13 @@ public:

void discard_completed_segments(const cf_id_type&);

/**
* Forces active segment switch.
* Called from API calls to help tests that need predictable
* compaction behaviour.
*/
future<> force_new_active_segment() noexcept;

/**
* A 'flush_handler' is invoked when the CL determines that size on disk has
* exceeded allowable threshold. It is called once for every currently active
Expand Down Expand Up @@ -350,6 +358,8 @@ public:
future<std::vector<sstring>> list_existing_segments() const;
future<std::vector<sstring>> list_existing_segments(const sstring& dir) const;

gc_clock::time_point min_gc_time(const cf_id_type&) const;

typedef std::function<future<>(buffer_and_replay_position)> commit_load_reader_func;

class segment_error : public std::exception {};
Expand Down
18 changes: 18 additions & 0 deletions main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,24 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl
// making compaction manager api available, after system keyspace has already been established.
api::set_server_compaction_manager(ctx).get();

cm.invoke_on_all([&](compaction_manager& cm) {
auto cl = db.local().commitlog();
auto scl = db.local().schema_commitlog();
if (cl && scl) {
cm.get_tombstone_gc_state().set_gc_time_min_source([cl, scl](const table_id& id) {
return std::min(cl->min_gc_time(id), scl->min_gc_time(id));
});
} else if (cl) {
cm.get_tombstone_gc_state().set_gc_time_min_source([cl](const table_id& id) {
return cl->min_gc_time(id);
});
} else if (scl) {
cm.get_tombstone_gc_state().set_gc_time_min_source([scl](const table_id& id) {
return scl->min_gc_time(id);
});
}
}).get();

supervisor::notify("loading tablet metadata");
ss.local().load_tablet_metadata().get();

Expand Down
27 changes: 22 additions & 5 deletions replica/database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2483,15 +2483,32 @@ future<> database::flush_table_on_all_shards(sharded<database>& sharded_db, std:
}

future<> database::flush_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names) {
return parallel_for_each(table_names, [&, ks_name] (const auto& table_name) {
return flush_table_on_all_shards(sharded_db, ks_name, table_name);
/**
* #14870
* To ensure tests which use nodetool flush to force data
* to sstables and do things post this get what they expect,
* we do an extra call here and below, asking commitlog
* to discard the currently active segment, This ensures we get
* as sstable-ish a universe as we can, as soon as we can.
*/
return sharded_db.invoke_on_all([] (replica::database& db) {
return db._commitlog->force_new_active_segment();
}).then([&, ks_name, table_names = std::move(table_names)] {
return parallel_for_each(table_names, [&, ks_name] (const auto& table_name) {
return flush_table_on_all_shards(sharded_db, ks_name, table_name);
});
});
}

future<> database::flush_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name) {
auto& ks = sharded_db.local().find_keyspace(ks_name);
return parallel_for_each(ks.metadata()->cf_meta_data(), [&] (auto& pair) {
return flush_table_on_all_shards(sharded_db, pair.second->id());
// see above
return sharded_db.invoke_on_all([] (replica::database& db) {
return db._commitlog->force_new_active_segment();
}).then([&, ks_name] {
auto& ks = sharded_db.local().find_keyspace(ks_name);
return parallel_for_each(ks.metadata()->cf_meta_data(), [&] (auto& pair) {
return flush_table_on_all_shards(sharded_db, pair.second->id());
});
});
}

Expand Down
18 changes: 18 additions & 0 deletions test/lib/cql_test_env.cc
Original file line number Diff line number Diff line change
Expand Up @@ -750,6 +750,24 @@ class single_node_cql_env : public cql_test_env {
return db::initialize_virtual_tables(_db, _ss, _gossiper, _group0_registry, _sys_ks, *cfg);
}).get();

_cm.invoke_on_all([&](compaction_manager& cm) {
auto cl = _db.local().commitlog();
auto scl = _db.local().schema_commitlog();
if (cl && scl) {
cm.get_tombstone_gc_state().set_gc_time_min_source([cl, scl](const table_id& id) {
return std::min(cl->min_gc_time(id), scl->min_gc_time(id));
});
} else if (cl) {
cm.get_tombstone_gc_state().set_gc_time_min_source([cl](const table_id& id) {
return cl->min_gc_time(id);
});
} else if (scl) {
cm.get_tombstone_gc_state().set_gc_time_min_source([scl](const table_id& id) {
return scl->min_gc_time(id);
});
}
}).get();

replica::distributed_loader::init_non_system_keyspaces(_db, _proxy, _sys_ks).get();

_db.invoke_on_all([] (replica::database& db) {
Expand Down
21 changes: 15 additions & 6 deletions tombstone_gc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ tombstone_gc_state::get_gc_before_for_range_result tombstone_gc_state::get_gc_be
switch (options.mode()) {
case tombstone_gc_mode::timeout: {
dblog.trace("Get gc_before for ks={}, table={}, range={}, mode=timeout", s->ks_name(), s->cf_name(), range);
auto gc_before = saturating_subtract(query_time, s->gc_grace_seconds());
auto gc_before = check_min(s, saturating_subtract(query_time, s->gc_grace_seconds()));
return {gc_before, gc_before, knows_entire_range};
}
case tombstone_gc_mode::disabled: {
Expand All @@ -77,7 +77,8 @@ tombstone_gc_state::get_gc_before_for_range_result tombstone_gc_state::get_gc_be
}
case tombstone_gc_mode::immediate: {
dblog.trace("Get gc_before for ks={}, table={}, range={}, mode=immediate", s->ks_name(), s->cf_name(), range);
return {query_time, query_time, knows_entire_range};
auto t = check_min(s, query_time);
return {t, t, knows_entire_range};
}
case tombstone_gc_mode::repair: {
const std::chrono::seconds& propagation_delay = options.propagation_delay_in_seconds();
Expand Down Expand Up @@ -109,8 +110,8 @@ tombstone_gc_state::get_gc_before_for_range_result tombstone_gc_state::get_gc_be
min_repair_timestamp = min;
max_repair_timestamp = max;
}
min_gc_before = saturating_subtract(min_repair_timestamp, propagation_delay);
max_gc_before = saturating_subtract(max_repair_timestamp, propagation_delay);
min_gc_before = check_min(s, saturating_subtract(min_repair_timestamp, propagation_delay));
max_gc_before = check_min(s, saturating_subtract(max_repair_timestamp, propagation_delay));
};
dblog.trace("Get gc_before for ks={}, table={}, range={}, mode=repair, min_repair_timestamp={}, max_repair_timestamp={}, propagation_delay={}, min_gc_before={}, max_gc_before={}, hits={}, knows_entire_range={}",
s->ks_name(), s->cf_name(), range, min_repair_timestamp, max_repair_timestamp, propagation_delay.count(), min_gc_before, max_gc_before, hits, knows_entire_range);
Expand All @@ -124,6 +125,13 @@ bool tombstone_gc_state::cheap_to_get_gc_before(const schema& s) const noexcept
return s.tombstone_gc_options().mode() != tombstone_gc_mode::repair;
}

gc_clock::time_point tombstone_gc_state::check_min(schema_ptr s, gc_clock::time_point t) const {
if (_gc_min_source && t != gc_clock::time_point::min()) {
return std::min(t, _gc_min_source(s->id()));
}
return t;
}

gc_clock::time_point tombstone_gc_state::get_gc_before_for_key(schema_ptr s, const dht::decorated_key& dk, const gc_clock::time_point& query_time) const {
// if mode = timeout // default option, if user does not specify tombstone_gc options
// if mode = disabled // never gc tombstone
Expand All @@ -133,13 +141,13 @@ gc_clock::time_point tombstone_gc_state::get_gc_before_for_key(schema_ptr s, con
switch (options.mode()) {
case tombstone_gc_mode::timeout:
dblog.trace("Get gc_before for ks={}, table={}, dk={}, mode=timeout", s->ks_name(), s->cf_name(), dk);
return saturating_subtract(query_time, s->gc_grace_seconds());
return check_min(s, saturating_subtract(query_time, s->gc_grace_seconds()));
case tombstone_gc_mode::disabled:
dblog.trace("Get gc_before for ks={}, table={}, dk={}, mode=disabled", s->ks_name(), s->cf_name(), dk);
return gc_clock::time_point::min();
case tombstone_gc_mode::immediate:
dblog.trace("Get gc_before for ks={}, table={}, dk={}, mode=immediate", s->ks_name(), s->cf_name(), dk);
return query_time;
return check_min(s, query_time);
case tombstone_gc_mode::repair:
const std::chrono::seconds& propagation_delay = options.propagation_delay_in_seconds();
auto gc_before = gc_clock::time_point::min();
Expand All @@ -154,6 +162,7 @@ gc_clock::time_point tombstone_gc_state::get_gc_before_for_key(schema_ptr s, con
gc_before = saturating_subtract(repair_timestamp, propagation_delay);
}
}
gc_before = check_min(s, gc_before);
dblog.trace("Get gc_before for ks={}, table={}, dk={}, mode=repair, repair_timestamp={}, propagation_delay={}, gc_before={}",
s->ks_name(), s->cf_name(), dk, repair_timestamp, propagation_delay.count(), gc_before);
return gc_before;
Expand Down
8 changes: 8 additions & 0 deletions tombstone_gc.hh
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ using per_table_history_maps = std::unordered_map<table_id, seastar::lw_shared_p

class tombstone_gc_options;

using gc_time_min_source = std::function<gc_clock::time_point(const table_id&)>;

class tombstone_gc_state {
gc_time_min_source _gc_min_source;
per_table_history_maps* _repair_history_maps;
gc_clock::time_point check_min(schema_ptr, gc_clock::time_point) const;
public:
tombstone_gc_state() = delete;
tombstone_gc_state(per_table_history_maps* maps) noexcept : _repair_history_maps(maps) {}
Expand All @@ -44,6 +48,10 @@ public:
return _repair_history_maps != nullptr;
}

void set_gc_time_min_source(gc_time_min_source src) {
_gc_min_source = std::move(src);
}

// Returns true if it's cheap to retrieve gc_before, e.g. the mode will not require accessing a system table.
bool cheap_to_get_gc_before(const schema& s) const noexcept;

Expand Down

0 comments on commit f42eb4d

Please sign in to comment.