Skip to content

Commit

Permalink
api, storage_service: Recalculate table digests on relocal_schema api…
Browse files Browse the repository at this point in the history
… call

Currently, the API call recalculates only per-node schema version. To
workaround issues like #4485 we want to recalculate per-table
digests. One way to do that is to restart the node, but that's slow
and has impact on availability.

Use like this:

  curl -X POST http://127.0.0.1:10000/storage_service/relocal_schema

Fixes #15380

Closes #15381
  • Loading branch information
tgrabiec authored and denesb committed Sep 13, 2023
1 parent 0a5d953 commit c27d212
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 6 deletions.
2 changes: 1 addition & 1 deletion api/api-doc/storage_service.json
Expand Up @@ -1954,7 +1954,7 @@
"operations":[
{
"method":"POST",
"summary":"Reset local schema",
"summary":"Forces this node to recalculate versions of schema objects.",
"type":"void",
"nickname":"reset_local_schema",
"produces":[
Expand Down
8 changes: 3 additions & 5 deletions api/storage_service.cc
Expand Up @@ -1014,13 +1014,11 @@ void set_storage_service(http_context& ctx, routes& r, sharded<service::storage_
return make_ready_future<json::json_return_type>(res);
});

ss::reset_local_schema.set(r, [&ctx, &sys_ks](std::unique_ptr<http::request> req) {
ss::reset_local_schema.set(r, [&ss](std::unique_ptr<http::request> req) -> future<json::json_return_type> {
// FIXME: We should truncate schema tables if more than one node in the cluster.
auto& fs = ctx.sp.local().features();
apilog.info("reset_local_schema");
return db::schema_tables::recalculate_schema_version(sys_ks, ctx.sp, fs).then([] {
return make_ready_future<json::json_return_type>(json_void());
});
co_await ss.local().reload_schema();
co_return json_void();
});

ss::set_trace_probability.set(r, [](std::unique_ptr<http::request> req) {
Expand Down
13 changes: 13 additions & 0 deletions replica/database.cc
Expand Up @@ -2460,6 +2460,12 @@ future<> database::flush_table_on_all_shards(sharded<database>& sharded_db, tabl
});
}

future<> database::drop_cache_for_table_on_all_shards(sharded<database>& sharded_db, table_id id) {
return sharded_db.invoke_on_all([id] (replica::database& db) {
return db.find_column_family(id).get_row_cache().invalidate(row_cache::external_updater([] {}));
});
}

future<> database::flush_table_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::string_view table_name) {
return flush_table_on_all_shards(sharded_db, sharded_db.local().find_uuid(ks_name, table_name));
}
Expand All @@ -2477,6 +2483,13 @@ future<> database::flush_keyspace_on_all_shards(sharded<database>& sharded_db, s
});
}

future<> database::drop_cache_for_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name) {
auto& ks = sharded_db.local().find_keyspace(ks_name);
return parallel_for_each(ks.metadata()->cf_meta_data(), [&] (auto& pair) {
return drop_cache_for_table_on_all_shards(sharded_db, pair.second->id());
});
}

future<> database::snapshot_table_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring table_name, sstring tag, db::snapshot_ctl::snap_views snap_views, bool skip_flush) {
if (!skip_flush) {
co_await flush_table_on_all_shards(sharded_db, ks_name, table_name);
Expand Down
3 changes: 3 additions & 0 deletions replica/database.hh
Expand Up @@ -1734,6 +1734,9 @@ public:
// flush all tables in a keyspace on all shards.
static future<> flush_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name);

static future<> drop_cache_for_table_on_all_shards(sharded<database>& sharded_db, table_id id);
static future<> drop_cache_for_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name);

static future<> snapshot_table_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring table_name, sstring tag, db::snapshot_ctl::snap_views, bool skip_flush);
static future<> snapshot_tables_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, std::vector<sstring> table_names, sstring tag, db::snapshot_ctl::snap_views, bool skip_flush);
static future<> snapshot_keyspace_on_all_shards(sharded<database>& sharded_db, std::string_view ks_name, sstring tag, bool skip_flush);
Expand Down
10 changes: 10 additions & 0 deletions service/storage_service.cc
Expand Up @@ -5175,6 +5175,16 @@ future<node_ops_cmd_response> storage_service::node_ops_cmd_handler(gms::inet_ad
});
}

future<> storage_service::reload_schema() {
// Flush memtables and clear cache so that we use the same state we would after node restart
// to rule out potential discrepancies which could stem from merging with memtable/cache readers.
co_await replica::database::flush_keyspace_on_all_shards(_db, db::schema_tables::v3::NAME);
co_await replica::database::drop_cache_for_keyspace_on_all_shards(_db, db::schema_tables::v3::NAME);
co_await _migration_manager.invoke_on(0, [] (auto& mm) {
return mm.reload_schema();
});
}

future<> storage_service::drain() {
return run_with_api_lock(sstring("drain"), [] (storage_service& ss) {
if (ss._operation_mode == mode::DRAINED) {
Expand Down
3 changes: 3 additions & 0 deletions service/storage_service.hh
Expand Up @@ -687,6 +687,9 @@ public:
*/
future<> drain();

// Recalculates schema digests on this node from contents of tables on disk.
future<> reload_schema();

future<std::map<gms::inet_address, float>> get_ownership();

future<std::map<gms::inet_address, float>> effective_ownership(sstring keyspace_name);
Expand Down

0 comments on commit c27d212

Please sign in to comment.