Skip to content

Commit

Permalink
Merge 'system_keyspace: remove flushes when writing to system tables'…
Browse files Browse the repository at this point in the history
… from Petr Gusev

There are several system tables with strict durability requirements.
This means that if we have written to such a table, we want to be sure
that the write won't be lost in case of node failure. We currently
accomplish this by accompanying each write to these tables with
`db.flush()` on all shards. This is expensive, since it causes all the
memtables to be written to sstables, which causes a lot of disk writes.
This overheads can become painful during node startup, when we write the
current boot state to `system.local`/`system.scylla_local` or during
topology change, when `update_peer_info`/`update_tokens` write to
`system.peers`.

In this series we remove flushes on writes to the `system.local`,
`system.peers`, `system.scylla_local` and `system.cdc_local` tables and
start using schema commitlog for durability.

Fixes: #15133

Closes #15279

* github.com:scylladb/scylladb:
  system_keyspace: switch CDC_LOCAL to schema commitlog
  system_keyspace: scylla_local: use schema commitlog
  database.cc: make _uses_schema_commitlog optional
  system_keyspace: drop load phases
  database.hh: add_column_family: add readonly parameter
  schema_tables: merge_tables_and_views: delay events until tables/views are created on all shards
  system_keyspace: switch system.peers to schema commitlog
  system_keyspace: switch system.local to schema commitlog
  main.cc: move schema commitlog replay earlier
  sstables_format_selector: extract listener
  sstables_format_selector: wrap when_enabled with seastar::async
  main.cc: inline and split system_keyspace.setup
  system_keyspace: refactor save_system_schema function
  system_keyspace: move initialize_virtual_tables into virtual_tables.hh
  system_keyspace: remove unused parameter
  config.cc: drop db::config::host_id
  main.cc:: extract local_info initialization into function
  schema.cc: check static_props for sanity
  system_keyspace: set null sharder when configuring schema commitlog
  system_keyspace: rename static variables
  system_keyspace: remove redundant wait_for_sync_to_commitlog
  • Loading branch information
kbr-scylla committed Sep 14, 2023
2 parents 25457fc + 082cd3b commit bff9ced
Show file tree
Hide file tree
Showing 36 changed files with 422 additions and 383 deletions.
2 changes: 1 addition & 1 deletion api/storage_service.cc
Expand Up @@ -467,7 +467,7 @@ static future<json::json_return_type> describe_ring_as_json(sharded<service::sto

void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss, gms::gossiper& g, sharded<db::system_keyspace>& sys_ks) {
ss::local_hostid.set(r, [&ctx](std::unique_ptr<http::request> req) {
auto id = ctx.db.local().get_config().host_id;
auto id = ctx.db.local().get_token_metadata().get_my_id();
return make_ready_future<json::json_return_type>(id.to_sstring());
});

Expand Down
1 change: 0 additions & 1 deletion db/config.hh
Expand Up @@ -460,7 +460,6 @@ public:

const db::extensions& extensions() const;

locator::host_id host_id;
utils::updateable_value<std::unordered_map<sstring, s3::endpoint_config>> object_storage_config;

named_value<std::vector<error_injection_at_startup>> error_injections_at_startup;
Expand Down
28 changes: 10 additions & 18 deletions db/schema_tables.cc
Expand Up @@ -88,15 +88,9 @@ static logging::logger diff_logger("schema_diff");
/** system.schema_* tables used to store keyspace/table/type attributes prior to C* 3.0 */
namespace db {
namespace {
const auto set_null_sharder = schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) {
if (ks_name == schema_tables::NAME) {
props.use_null_sharder = true;
}
});
const auto set_use_schema_commitlog = schema_builder::register_static_configurator([](const sstring& ks_name, const sstring& cf_name, schema_static_props& props) {
if (ks_name == schema_tables::NAME) {
props.use_schema_commitlog = true;
props.load_phase = system_table_load_phase::phase2;
props.enable_schema_commitlog();
}
});
}
Expand Down Expand Up @@ -215,7 +209,7 @@ using namespace v3;

using days = std::chrono::duration<int, std::ratio<24 * 3600>>;

future<> save_system_schema(cql3::query_processor& qp, const sstring & ksname) {
static future<> save_system_schema_to_keyspace(cql3::query_processor& qp, const sstring & ksname) {
auto ks = qp.db().find_keyspace(ksname);
auto ksm = ks.metadata();

Expand All @@ -231,9 +225,10 @@ future<> save_system_schema(cql3::query_processor& qp, const sstring & ksname) {
}
}

/** add entries to system_schema.* for the hardcoded system definitions */
future<> save_system_keyspace_schema(cql3::query_processor& qp) {
return save_system_schema(qp, NAME);
future<> save_system_schema(cql3::query_processor& qp) {
co_await save_system_schema_to_keyspace(qp, schema_tables::NAME);
// #2514 - make sure "system" is written to system_schema.keyspaces.
co_await save_system_schema_to_keyspace(qp, system_keyspace::NAME);
}

namespace v3 {
Expand Down Expand Up @@ -1531,22 +1526,19 @@ static future<> merge_tables_and_views(distributed<service::storage_proxy>& prox
// In order to avoid possible races we first create the tables and only then the views.
// That way if a view seeks information about its base table it's guarantied to find it.
co_await max_concurrent_for_each(tables_diff.created, max_concurrent, [&] (global_schema_ptr& gs) -> future<> {
co_await db.add_column_family_and_make_directory(gs);
co_await db.add_column_family_and_make_directory(gs, false);
});
co_await max_concurrent_for_each(views_diff.created, max_concurrent, [&] (global_schema_ptr& gs) -> future<> {
co_await db.add_column_family_and_make_directory(gs);
co_await db.add_column_family_and_make_directory(gs, false);
});
for (auto&& gs : boost::range::join(tables_diff.created, views_diff.created)) {
db.find_column_family(gs).mark_ready_for_writes();
co_await coroutine::maybe_yield();
}
});
co_await db.invoke_on_all([&](replica::database& db) -> future<> {
std::vector<bool> columns_changed;
columns_changed.reserve(tables_diff.altered.size() + views_diff.altered.size());
for (auto&& altered : boost::range::join(tables_diff.altered, views_diff.altered)) {
columns_changed.push_back(db.update_column_family(altered.new_schema));
co_await coroutine::maybe_yield();
}

auto it = columns_changed.begin();
auto notify = [&] (auto& r, auto&& f) -> future<> {
co_await max_concurrent_for_each(r, max_concurrent, std::move(f));
Expand Down
8 changes: 3 additions & 5 deletions db/schema_tables.hh
Expand Up @@ -176,11 +176,9 @@ std::vector<schema_ptr> all_tables(schema_features);
// Like all_tables(), but returns schema::cf_name() of each table.
std::vector<sstring> all_table_names(schema_features);

// saves/creates "ks" + all tables etc, while first deleting all old schema entries (will be rewritten)
future<> save_system_schema(cql3::query_processor& qp, const sstring & ks);

// saves/creates "system_schema" keyspace
future<> save_system_keyspace_schema(cql3::query_processor& qp);
// saves/creates all the system objects in the appropriate keyspaces;
// deletes them first, so they will be effectively overwritten.
future<> save_system_schema(cql3::query_processor& qp);

future<table_schema_version> calculate_schema_digest(distributed<service::storage_proxy>& proxy, schema_features, noncopyable_function<bool(std::string_view)> accept_keyspace);
// Calculates schema digest for all non-system keyspaces
Expand Down
76 changes: 47 additions & 29 deletions db/sstables-format-selector.cc
Expand Up @@ -24,57 +24,75 @@ static const sstring SSTABLE_FORMAT_PARAM_NAME = "sstable_format";
void feature_enabled_listener::on_enabled() {
if (!_started) {
_started = true;
_selector.maybe_select_format(_format).get();
_listener.maybe_select_format(_format).get();
}
}

sstables_format_selector::sstables_format_selector(gms::gossiper& g, sharded<gms::feature_service>& f, sharded<replica::database>& db, db::system_keyspace& sys_ks)
sstables_format_selector::sstables_format_selector(sharded<replica::database>& db)
: _db(db)
{
}

future<> sstables_format_selector::on_system_tables_loaded(db::system_keyspace& sys_ks) {
_sys_ks = &sys_ks;
return read_sstables_format();
}

future<> sstables_format_selector::read_sstables_format() {
std::optional<sstring> format_opt = co_await _sys_ks->get_scylla_local_param(SSTABLE_FORMAT_PARAM_NAME);
if (format_opt) {
sstables::sstable_version_types format = sstables::version_from_string(*format_opt);
co_await select_format(format);
}
}

future<> sstables_format_selector::update_format(sstables::sstable_version_types new_format) {
if (!_sys_ks) {
on_internal_error(logger, format("system keyspace is not loaded"));
}
co_await _sys_ks->set_scylla_local_param(SSTABLE_FORMAT_PARAM_NAME, fmt::to_string(new_format), true);
co_await select_format(new_format);
}

future<> sstables_format_selector::select_format(sstables::sstable_version_types format) {
logger.info("Selected {} sstables format", format);
_selected_format = format;
co_await _db.invoke_on_all([this] (replica::database& db) {
db.set_format(_selected_format);
});
}

sstables_format_listener::sstables_format_listener(gms::gossiper& g, sharded<gms::feature_service>& f, sstables_format_selector& selector)
: _gossiper(g)
, _features(f)
, _db(db)
, _sys_ks(sys_ks)
, _selector(selector)
, _md_feature_listener(*this, sstables::sstable_version_types::md)
, _me_feature_listener(*this, sstables::sstable_version_types::me)
{ }

future<> sstables_format_selector::maybe_select_format(sstables::sstable_version_types new_format) {
future<> sstables_format_listener::maybe_select_format(sstables::sstable_version_types new_format) {
auto hg = _sel.hold();
auto units = co_await get_units(_sem, 1);

if (new_format > _selected_format) {
co_await _sys_ks.set_scylla_local_param(SSTABLE_FORMAT_PARAM_NAME, fmt::to_string(new_format));
co_await select_format(new_format);
if (new_format > _selector.selected_format()) {
co_await _selector.update_format(new_format);
// FIXME discarded future
(void)_gossiper.add_local_application_state(gms::application_state::SUPPORTED_FEATURES,
gms::versioned_value::supported_features(_features.local().supported_feature_set())).finally([h = std::move(hg)] {});
}
}

future<> sstables_format_selector::start() {
future<> sstables_format_listener::start() {
assert(this_shard_id() == 0);
co_await read_sstables_format();
_features.local().me_sstable.when_enabled(_me_feature_listener);
_features.local().md_sstable.when_enabled(_md_feature_listener);
// The listener may fire immediately, create a thread for that case.
co_await seastar::async([this] {
_features.local().me_sstable.when_enabled(_me_feature_listener);
_features.local().md_sstable.when_enabled(_md_feature_listener);
});
}

future<> sstables_format_selector::stop() {
future<> sstables_format_listener::stop() {
co_await _sel.close();
}

future<> sstables_format_selector::read_sstables_format() {
std::optional<sstring> format_opt = co_await _sys_ks.get_scylla_local_param(SSTABLE_FORMAT_PARAM_NAME);
if (format_opt) {
sstables::sstable_version_types format = sstables::version_from_string(*format_opt);
co_await select_format(format);
}
}

future<> sstables_format_selector::select_format(sstables::sstable_version_types format) {
logger.info("Selected {} sstables format", format);
_selected_format = format;
co_await _db.invoke_on_all([this] (replica::database& db) {
db.set_format(_selected_format);
});
}

} // namespace sstables
35 changes: 23 additions & 12 deletions db/sstables-format-selector.hh
Expand Up @@ -30,37 +30,48 @@ class feature_service;
namespace db {

class system_keyspace;
class sstables_format_selector;
class sstables_format_listener;

class feature_enabled_listener : public gms::feature::listener {
sstables_format_selector& _selector;
sstables_format_listener& _listener;
sstables::sstable_version_types _format;

public:
feature_enabled_listener(sstables_format_selector& s, sstables::sstable_version_types format)
: _selector(s)
feature_enabled_listener(sstables_format_listener& l, sstables::sstable_version_types format)
: _listener(l)
, _format(format)
{ }
void on_enabled() override;
};

class sstables_format_selector {
sharded<replica::database>& _db;
db::system_keyspace* _sys_ks = nullptr;
sstables::sstable_version_types _selected_format = sstables::sstable_version_types::mc;
future<> select_format(sstables::sstable_version_types new_format);
future<> read_sstables_format();
public:
explicit sstables_format_selector(sharded<replica::database>& db);

future<> on_system_tables_loaded(db::system_keyspace& sys_ks);

inline sstables::sstable_version_types selected_format() const noexcept {
return _selected_format;
}
future<> update_format(sstables::sstable_version_types new_format);
};

class sstables_format_listener {
gms::gossiper& _gossiper;
sharded<gms::feature_service>& _features;
sharded<replica::database>& _db;
db::system_keyspace& _sys_ks;
sstables_format_selector& _selector;
seastar::named_semaphore _sem = {1, named_semaphore_exception_factory{"feature listeners"}};
seastar::gate _sel;

feature_enabled_listener _md_feature_listener;
feature_enabled_listener _me_feature_listener;

sstables::sstable_version_types _selected_format = sstables::sstable_version_types::mc;
future<> select_format(sstables::sstable_version_types new_format);
future<> read_sstables_format();

public:
sstables_format_selector(gms::gossiper& g, sharded<gms::feature_service>& f, sharded<replica::database>& db, db::system_keyspace& sys_ks);
sstables_format_listener(gms::gossiper& g, sharded<gms::feature_service>& f, sstables_format_selector& selector);

future<> start();
future<> stop();
Expand Down
4 changes: 2 additions & 2 deletions db/system_distributed_keyspace.cc
Expand Up @@ -375,7 +375,7 @@ future<std::unordered_map<locator::host_id, sstring>> system_distributed_keyspac
}

future<> system_distributed_keyspace::start_view_build(sstring ks_name, sstring view_name) const {
auto host_id = _sp.local_db().get_config().host_id;
auto host_id = _sp.local_db().get_token_metadata().get_my_id();
return _qp.execute_internal(
format("INSERT INTO {}.{} (keyspace_name, view_name, host_id, status) VALUES (?, ?, ?, ?)", NAME, VIEW_BUILD_STATUS),
db::consistency_level::ONE,
Expand All @@ -385,7 +385,7 @@ future<> system_distributed_keyspace::start_view_build(sstring ks_name, sstring
}

future<> system_distributed_keyspace::finish_view_build(sstring ks_name, sstring view_name) const {
auto host_id = _sp.local_db().get_config().host_id;
auto host_id = _sp.local_db().get_token_metadata().get_my_id();
return _qp.execute_internal(
format("UPDATE {}.{} SET status = ? WHERE keyspace_name = ? AND view_name = ? AND host_id = ?", NAME, VIEW_BUILD_STATUS),
db::consistency_level::ONE,
Expand Down

0 comments on commit bff9ced

Please sign in to comment.