Skip to content

Commit

Permalink
compaction_manager: flush_all_tables before major compaction
Browse files Browse the repository at this point in the history
Major compaction already flushes each table to make
sure it considers any mutations that are present in the
memtable for the purpose of tombstone purging.
See 64ec1c6

However, tombstone purging may be inhibited by data
in commitlog segments based on `gc_time_min` in the
`tombstone_gc_state` (See f42eb4d).

Flushing all sstables in the database release
all references to commitlog segments and there
it maximizes the potential for tombstone purging,
which is typically the reason for running major compaction.

However, flushing all tables too frequently might
result in tiny sstables.  Since when flushing all
keyspaces using `nodetool flush` the `force_keyspace_compaction`
api is invoked for keyspace successively, we need a mechanism
to prevent too frequent flushes by major compaction.

Hence a `compaction_flush_all_tables_before_major_seconds` interval
configuration option is added (defaults to 24 hours).

In the case that not all tables are flushed prior
to major compaction, we revert to the old behavior of
flushing each table in the keyspace before major-compacting it.

Fixes scylladb#15777

Signed-off-by: Benny Halevy <bhalevy@scylladb.com>
(cherry picked from commit 66ba983)
  • Loading branch information
bhalevy authored and tchaikov committed Jan 12, 2024
1 parent 7bb6386 commit 5d88e99
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 2 deletions.
29 changes: 28 additions & 1 deletion compaction/task_manager_module.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
#include "sstables/sstables.hh"
#include "sstables/sstable_directory.hh"
#include "utils/pretty_printers.hh"
#include "db/config.hh"

using namespace std::chrono_literals;

namespace replica {

Expand Down Expand Up @@ -263,11 +266,35 @@ sstring major_compaction_task_impl::to_string(flush_mode fm) {
__builtin_unreachable();
}

static future<bool> maybe_flush_all_tables(sharded<replica::database>& db) {
auto interval = db.local().get_config().compaction_flush_all_tables_before_major_seconds();
if (interval) {
auto when = db_clock::now() - interval * 1s;
if (co_await replica::database::get_all_tables_flushed_at(db) <= when) {
co_await db.invoke_on_all([&] (replica::database& db) -> future<> {
co_await db.flush_all_tables();
});
co_return true;
}
}
co_return false;
}

future<> major_keyspace_compaction_task_impl::run() {
// TODO: implement a `compact_all_keyspaces` api
// that will flush all tables once for all keyspaces
// rather than for each keyspace, using a mechanism similar to `run_table_tasks`.
// It can be called from `scylla-nodetool compact` with keyspace arg.
bool flushed_all_tables = false;
if (_flush_mode == flush_mode::all_tables) {
flushed_all_tables = co_await maybe_flush_all_tables(_db);
}

flush_mode fm = flushed_all_tables ? flush_mode::skip : _flush_mode;
co_await _db.invoke_on_all([&] (replica::database& db) -> future<> {
tasks::task_info parent_info{_status.id, _status.shard};
auto& module = db.get_compaction_manager().get_task_manager_module();
auto task = co_await module.make_and_start_task<shard_major_keyspace_compaction_task_impl>(parent_info, _status.keyspace, _status.id, db, _table_infos, _flush_mode);
auto task = co_await module.make_and_start_task<shard_major_keyspace_compaction_task_impl>(parent_info, _status.keyspace, _status.id, db, _table_infos, fm);
co_await task->done();
});
}
Expand Down
1 change: 0 additions & 1 deletion compaction/task_manager_module.hh
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ public:
enum class flush_mode {
skip, // Skip flushing. Useful when application explicitly flushes all tables prior to compaction
compacted_tables, // Flush only the compacted keyspace/tables
// FIXME: flushing all_tables is not implemented yet
all_tables // Flush all tables in the database prior to compaction
};

Expand Down
4 changes: 4 additions & 0 deletions db/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,10 @@ db::config::config(std::shared_ptr<db::extensions> exts)
"If set to higher than 0, ignore the controller's output and set the compaction shares statically. Do not set this unless you know what you are doing and suspect a problem in the controller. This option will be retired when the controller reaches more maturity")
, compaction_enforce_min_threshold(this, "compaction_enforce_min_threshold", liveness::LiveUpdate, value_status::Used, false,
"If set to true, enforce the min_threshold option for compactions strictly. If false (default), Scylla may decide to compact even if below min_threshold")
, compaction_flush_all_tables_before_major_seconds(this, "compaction_flush_all_tables_before_major_seconds", value_status::Used, 86400,
"Set the minimum interval in seconds between flushing all tables before each major compaction (default is 86400). "
"This option is useful for maximizing tombstone garbage collection by releasing all active commitlog segments. "
"Set to 0 to disable automatic flushing all tables before major compaction")
/**
* @Group Initialization properties
* @GroupDescription The minimal properties needed for configuring a cluster.
Expand Down
1 change: 1 addition & 0 deletions db/config.hh
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public:
named_value<float> memtable_flush_static_shares;
named_value<float> compaction_static_shares;
named_value<bool> compaction_enforce_min_threshold;
named_value<uint32_t> compaction_flush_all_tables_before_major_seconds;
named_value<sstring> cluster_name;
named_value<sstring> listen_address;
named_value<sstring> listen_interface;
Expand Down
11 changes: 11 additions & 0 deletions replica/database.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2518,6 +2518,17 @@ future<> database::flush_all_tables() {
co_await get_tables_metadata().parallel_for_each_table([] (table_id, lw_shared_ptr<table> t) {
return t->flush();
});
_all_tables_flushed_at = db_clock::now();
}

future<db_clock::time_point> database::get_all_tables_flushed_at(sharded<database>& sharded_db) {
db_clock::time_point min_all_tables_flushed_at;
co_await sharded_db.map_reduce0([&] (const database& db) {
return db._all_tables_flushed_at;
}, db_clock::now(), [] (db_clock::time_point l, db_clock::time_point r) {
return std::min(l, r);
});
co_return min_all_tables_flushed_at;
}

future<> database::drop_cache_for_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name) {
Expand Down
4 changes: 4 additions & 0 deletions replica/database.hh
Original file line number Diff line number Diff line change
Expand Up @@ -1446,6 +1446,8 @@ private:
serialized_action _update_memtable_flush_static_shares_action;
utils::observer<float> _memtable_flush_static_shares_observer;

db_clock::time_point _all_tables_flushed_at;

public:
data_dictionary::database as_data_dictionary() const;
db::commitlog* commitlog_for(const schema_ptr& schema);
Expand Down Expand Up @@ -1738,6 +1740,8 @@ public:
// flushing all tables will allow reclaiming of all commitlog segments
future<> flush_all_tables();

static future<db_clock::time_point> get_all_tables_flushed_at(sharded<database>& sharded_db);

static future<> drop_cache_for_table_on_all_shards(sharded<database>& sharded_db, table_id id);
static future<> drop_cache_for_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name);

Expand Down

0 comments on commit 5d88e99

Please sign in to comment.