Skip to content

Commit

Permalink
Merge "Introduce sharded<system_keyspace> instance" from Pavel Emelyanov
Browse files Browse the repository at this point in the history
"
Making the system-keyspace into a standard sharded instance will
help to fix several dependency knots.

First, the global qctx and local-cache both will be moved onto the
sys-ks, all their users will be patched to depend on system-keyspace.
Now it's not quite so, but we're moving towards this state.

Second, snitch instance now sits in the middle of another dependency
loop. To untie one the preferred ip and dc/rack info should be
moved onto system keyspace altogether (now it's scattered over several
places). The sys-ks thus needs to be a sharded service with some
state.

This set makes system-keyspace sharded instance, equipps it with all
the dependencies it needs and passes it as dependency into storage
service, migration manager and API. This helps eliminating a good
portion of global qctx/cache usage and prepares the ground for snitch
rework.

tests: unit(dev)
       v1: unit(debug), dtest.simple_boot_shutdown(dev)
"

* 'br-sharded-system-keyspace-instance-2' of https://github.com/xemul/scylla: (25 commits)
  system_keyspace: Make load_host_ids non-static
  system_keyspace: Make load_tokens non-static
  system_keyspace: Make remove_endpoint and update_tokens non-static
  system_keyspace: Coroutinize update_tokens
  system_keyspace: Coroutinize remove_endpoint
  system_keyspace: Make update_cached_values non-static
  system_keyspace: Coroutinuze update_peer_info
  system_keyspace: Make update_schema_version non-static
  schema_tables: Add sharded<system_keyspace> argument to update_schema_version_and_announce
  replica: Push sharded<system_keyspace> down to parse_system_tables
  api: Carry sharded<system_keyspace> reference along
  storage_service: Keep sharded<system_keyspace> reference
  migration_manager: Keep sharded<system_keyspace> reference
  system_keyspace: Remove temporary qp variable
  system_keyspace: Make get_preferred_ips non-static
  system_keyspace: Make cache_truncation_record non-static
  system_keyspace: Make check_health non-static
  system_keyspace: Make build_bootstrap_info non-static
  system_keyspace: Make build_dc_rack_info non-static
  system_keyspace: Make setup_version non-static
  ...
  • Loading branch information
denesb committed Mar 17, 2022
2 parents a1604aa + e8ba395 commit c450508
Show file tree
Hide file tree
Showing 18 changed files with 199 additions and 145 deletions.
6 changes: 3 additions & 3 deletions api/api.cc
Expand Up @@ -96,9 +96,9 @@ future<> unset_rpc_controller(http_context& ctx) {
return ctx.http_server.set_routes([&ctx] (routes& r) { unset_rpc_controller(ctx, r); });
}

future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, sharded<gms::gossiper>& g, sharded<cdc::generation_service>& cdc_gs) {
return register_api(ctx, "storage_service", "The storage service API", [&ss, &g, &cdc_gs] (http_context& ctx, routes& r) {
set_storage_service(ctx, r, ss, g.local(), cdc_gs);
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, sharded<gms::gossiper>& g, sharded<cdc::generation_service>& cdc_gs, sharded<db::system_keyspace>& sys_ks) {
return register_api(ctx, "storage_service", "The storage service API", [&ss, &g, &cdc_gs, &sys_ks] (http_context& ctx, routes& r) {
set_storage_service(ctx, r, ss, g.local(), cdc_gs, sys_ks);
});
}

Expand Down
3 changes: 2 additions & 1 deletion api/api_init.hh
Expand Up @@ -42,6 +42,7 @@ class config;
namespace view {
class view_builder;
}
class system_keyspace;
}
namespace netw { class messaging_service; }
class repair_service;
Expand Down Expand Up @@ -76,7 +77,7 @@ struct http_context {
future<> set_server_init(http_context& ctx);
future<> set_server_config(http_context& ctx, const db::config& cfg);
future<> set_server_snitch(http_context& ctx);
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, sharded<gms::gossiper>& g, sharded<cdc::generation_service>& cdc_gs);
future<> set_server_storage_service(http_context& ctx, sharded<service::storage_service>& ss, sharded<gms::gossiper>& g, sharded<cdc::generation_service>& cdc_gs, sharded<db::system_keyspace>& sys_ks);
future<> set_server_sstables_loader(http_context& ctx, sharded<sstables_loader>& sst_loader);
future<> unset_server_sstables_loader(http_context& ctx);
future<> set_server_view_builder(http_context& ctx, sharded<db::view::view_builder>& vb);
Expand Down
6 changes: 3 additions & 3 deletions api/storage_service.cc
Expand Up @@ -394,7 +394,7 @@ static future<json::json_return_type> describe_ring_as_json(sharded<service::sto
co_return json::json_return_type(stream_range_as_array(co_await ss.local().describe_ring(keyspace), token_range_endpoints_to_json));
}

void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss, gms::gossiper& g, sharded<cdc::generation_service>& cdc_gs) {
void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss, gms::gossiper& g, sharded<cdc::generation_service>& cdc_gs, sharded<db::system_keyspace>& sys_ks) {
ss::local_hostid.set(r, [](std::unique_ptr<request> req) {
return db::system_keyspace::load_local_host_id().then([](const utils::UUID& id) {
return make_ready_future<json::json_return_type>(id.to_sstring());
Expand Down Expand Up @@ -964,11 +964,11 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return make_ready_future<json::json_return_type>(res);
});

ss::reset_local_schema.set(r, [](std::unique_ptr<request> req) {
ss::reset_local_schema.set(r, [&sys_ks](std::unique_ptr<request> req) {
// FIXME: We should truncate schema tables if more than one node in the cluster.
auto& sp = service::get_storage_proxy();
auto& fs = sp.local().features();
return db::schema_tables::recalculate_schema_version(sp, fs).then([] {
return db::schema_tables::recalculate_schema_version(sys_ks, sp, fs).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
});
Expand Down
3 changes: 2 additions & 1 deletion api/storage_service.hh
Expand Up @@ -19,6 +19,7 @@ class snapshot_ctl;
namespace view {
class view_builder;
}
class system_keyspace;
}
namespace netw { class messaging_service; }
class repair_service;
Expand All @@ -42,7 +43,7 @@ sstring validate_keyspace(http_context& ctx, const parameters& param);
// containing the description of the respective no_such_column_family error.
std::vector<sstring> parse_tables(const sstring& ks_name, http_context& ctx, const std::unordered_map<sstring, sstring>& query_params, sstring param_name);

void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss, gms::gossiper& g, sharded<cdc::generation_service>& cdc_gs);
void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_service>& ss, gms::gossiper& g, sharded<cdc::generation_service>& cdc_gs, sharded<db::system_keyspace>& sys_ls);
void set_sstables_loader(http_context& ctx, routes& r, sharded<sstables_loader>& sst_loader);
void unset_sstables_loader(http_context& ctx, routes& r);
void set_view_builder(http_context& ctx, routes& r, sharded<db::view::view_builder>& vb);
Expand Down
12 changes: 6 additions & 6 deletions db/schema_tables.cc
Expand Up @@ -858,9 +858,9 @@ static future<> with_merge_lock(noncopyable_function<future<> ()> func) {
}

static
future<> update_schema_version_and_announce(distributed<service::storage_proxy>& proxy, schema_features features) {
future<> update_schema_version_and_announce(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, schema_features features) {
auto uuid = co_await calculate_schema_digest(proxy, features);
co_await db::system_keyspace::update_schema_version(uuid);
co_await sys_ks.local().update_schema_version(uuid);
co_await proxy.local().get_db().invoke_on_all([uuid] (replica::database& db) {
db.update_version(uuid);
});
Expand All @@ -876,18 +876,18 @@ future<> update_schema_version_and_announce(distributed<service::storage_proxy>&
* @throws ConfigurationException If one of metadata attributes has invalid value
* @throws IOException If data was corrupted during transportation or failed to apply fs operations
*/
future<> merge_schema(distributed<service::storage_proxy>& proxy, gms::feature_service& feat, std::vector<mutation> mutations)
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, gms::feature_service& feat, std::vector<mutation> mutations)
{
co_await with_merge_lock([&] () mutable -> future<> {
bool flush_schema = proxy.local().get_db().local().get_config().flush_schema_tables_after_modification();
co_await do_merge_schema(proxy, std::move(mutations), flush_schema);
co_await update_schema_version_and_announce(proxy, feat.cluster_schema_features());
co_await update_schema_version_and_announce(sys_ks, proxy, feat.cluster_schema_features());
});
}

future<> recalculate_schema_version(distributed<service::storage_proxy>& proxy, gms::feature_service& feat) {
future<> recalculate_schema_version(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, gms::feature_service& feat) {
co_await with_merge_lock([&] () -> future<> {
co_await update_schema_version_and_announce(proxy, feat.cluster_schema_features());
co_await update_schema_version_and_announce(sys_ks, proxy, feat.cluster_schema_features());
});
}

Expand Down
5 changes: 3 additions & 2 deletions db/schema_tables.hh
Expand Up @@ -56,6 +56,7 @@ class feature_service;

namespace db {

class system_keyspace;
class extensions;
class config;

Expand Down Expand Up @@ -170,14 +171,14 @@ future<mutation> read_keyspace_mutation(distributed<service::storage_proxy>&, co
// Must be called on shard 0.
future<semaphore_units<>> hold_merge_lock() noexcept;

future<> merge_schema(distributed<service::storage_proxy>& proxy, gms::feature_service& feat, std::vector<mutation> mutations);
future<> merge_schema(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, gms::feature_service& feat, std::vector<mutation> mutations);

// Recalculates the local schema version.
//
// It is safe to call concurrently with recalculate_schema_version() and merge_schema() in which case it
// is guaranteed that the schema version we end up with after all calls will reflect the most recent state
// of feature_service and schema tables.
future<> recalculate_schema_version(distributed<service::storage_proxy>& proxy, gms::feature_service& feat);
future<> recalculate_schema_version(sharded<db::system_keyspace>& sys_ks, distributed<service::storage_proxy>& proxy, gms::feature_service& feat);

future<std::set<sstring>> merge_keyspaces(distributed<service::storage_proxy>& proxy, schema_result&& before, schema_result&& after);

Expand Down

0 comments on commit c450508

Please sign in to comment.