Skip to content

Commit

Permalink
Merge 'distributed_loader: Restore separate processing of keyspace in…
Browse files Browse the repository at this point in the history
…it prio/normal' from Calle Wilund

Fixes #11349

In 7396de7 (and refactorings before it) the set of prioritized keyspaces (and processing thereof)
was removed, due to apparent non-usage (which is true for open-source version).

This functionality is however required for certain features of the enterprise version (ear).
As such is needs to be restored and reenabled. This patch set does so, adapted
to the recent version of this file.

Closes #11350

* github.com:scylladb/scylladb:
  distributed_loader: Restore separate processing of keyspace init prio/normal
  Revert "distributed_loader: Remove unused load-prio manipulations"
  • Loading branch information
avikivity committed Aug 23, 2022
2 parents 5d1ff17 + 54aca8e commit fd9d8dd
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 26 deletions.
80 changes: 54 additions & 26 deletions replica/distributed_loader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,23 @@ static const std::unordered_set<std::string_view> system_keyspaces = {
db::system_keyspace::NAME, db::schema_tables::NAME
};

// Not super nice. Adding statefulness to the file.
static std::unordered_set<sstring> load_prio_keyspaces;
static bool population_started = false;

void replica::distributed_loader::mark_keyspace_as_load_prio(const sstring& ks) {
assert(!population_started);
load_prio_keyspaces.insert(ks);
}

bool is_system_keyspace(std::string_view name) {
return system_keyspaces.contains(name);
}

bool is_load_prio_keyspace(const sstring& name) {
return load_prio_keyspaces.contains(name);
}

static const std::unordered_set<std::string_view> internal_keyspaces = {
db::system_distributed_keyspace::NAME,
db::system_distributed_keyspace::NAME_EVERYWHERE,
Expand Down Expand Up @@ -569,6 +582,8 @@ future<> distributed_loader::populate_keyspace(distributed<replica::database>& d
}

future<> distributed_loader::init_system_keyspace(distributed<replica::database>& db, distributed<service::storage_service>& ss, sharded<gms::gossiper>& g, db::config& cfg, db::table_selector& tables) {
population_started = true;

return seastar::async([&db, &ss, &cfg, &g, &tables] {
db.invoke_on_all([&db, &ss, &cfg, &g, &tables] (replica::database&) {
return db::system_keyspace::make(db, ss, g, cfg, tables);
Expand Down Expand Up @@ -638,33 +653,46 @@ future<> distributed_loader::init_non_system_keyspaces(distributed<replica::data
}
}).get();

std::vector<future<>> futures;

// treat "dirs" as immutable to avoid modifying it while still in
// a range-iteration. Also to simplify the "finally"
for (auto i = dirs.begin(); i != dirs.end();) {
auto& ks_name = i->first;
auto e = dirs.equal_range(ks_name).second;
auto j = i++;
// might have more than one dir for a keyspace iff data_file_directories is > 1 and
// somehow someone placed sstables in more than one of them for a given ks. (import?)
futures.emplace_back(parallel_for_each(j, e, [&](const std::pair<sstring, sstring>& p) {
auto& datadir = p.second;
return distributed_loader::populate_keyspace(db, datadir, ks_name);
}).finally([&] {
return db.invoke_on_all([ks_name] (replica::database& db) {
// can be false if running test environment
// or ks_name was just a borked directory not representing
// a keyspace in schema tables.
if (db.has_keyspace(ks_name)) {
db.find_keyspace(ks_name).mark_as_populated();
}
return make_ready_future<>();
});
}));
}
for (bool prio_only : { true, false}) {
std::vector<future<>> futures;

// treat "dirs" as immutable to avoid modifying it while still in
// a range-iteration. Also to simplify the "finally"
for (auto i = dirs.begin(); i != dirs.end();) {
auto& ks_name = i->first;
auto j = i++;

/**
* Must process in two phases: Prio and non-prio.
* This looks like it is not needed. And it is not
* in open-source version. But essential for enterprise.
* Do _not_ remove or refactor away.
*/
if (prio_only != is_load_prio_keyspace(ks_name)) {
continue;
}

when_all_succeed(futures.begin(), futures.end()).discard_result().get();
auto e = dirs.equal_range(ks_name).second;
// might have more than one dir for a keyspace iff data_file_directories is > 1 and
// somehow someone placed sstables in more than one of them for a given ks. (import?)
futures.emplace_back(parallel_for_each(j, e, [&](const std::pair<sstring, sstring>& p) {
auto& datadir = p.second;
return distributed_loader::populate_keyspace(db, datadir, ks_name);
}).finally([&] {
return db.invoke_on_all([ks_name] (replica::database& db) {
// can be false if running test environment
// or ks_name was just a borked directory not representing
// a keyspace in schema tables.
if (db.has_keyspace(ks_name)) {
db.find_keyspace(ks_name).mark_as_populated();
}
return make_ready_future<>();
});
}));
}

when_all_succeed(futures.begin(), futures.end()).discard_result().get();
}

db.invoke_on_all([] (replica::database& db) {
return parallel_for_each(db.get_non_system_column_families(), [] (lw_shared_ptr<replica::table> table) {
Expand Down
11 changes: 11 additions & 0 deletions replica/distributed_loader.hh
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ public:
static future<> init_system_keyspace(distributed<replica::database>& db, distributed<service::storage_service>& ss, sharded<gms::gossiper>& g, db::config& cfg, db::table_selector&);
static future<> init_non_system_keyspaces(distributed<replica::database>& db, distributed<service::storage_proxy>& proxy, sharded<db::system_keyspace>& sys_ks);

/**
* Marks a keyspace (by name) as "prioritized" on bootstrap.
* This will effectively let it bypass concurrency control.
* The only real use for this is to avoid certain chicken and
* egg issues.
*
* May only be called pre-bootstrap on main shard.
* Required for enterprise. Do _not_ remove.
*/
static void mark_keyspace_as_load_prio(const sstring&);

// Scan sstables under upload directory. Return a vector with smp::count entries.
// Each entry with index of idx should be accessed on shard idx only.
// Each entry contains a vector of sstables for this shard.
Expand Down

0 comments on commit fd9d8dd

Please sign in to comment.