From c27d212f4bc29b94a6fa5445fd0ce3e35fb95059 Mon Sep 17 00:00:00 2001 From: Tomasz Grabiec Date: Wed, 13 Sep 2023 00:57:11 +0200 Subject: [PATCH] api, storage_service: Recalculate table digests on relocal_schema api call Currently, the API call recalculates only per-node schema version. To workaround issues like #4485 we want to recalculate per-table digests. One way to do that is to restart the node, but that's slow and has impact on availability. Use like this: curl -X POST http://127.0.0.1:10000/storage_service/relocal_schema Fixes #15380 Closes #15381 --- api/api-doc/storage_service.json | 2 +- api/storage_service.cc | 8 +++----- replica/database.cc | 13 +++++++++++++ replica/database.hh | 3 +++ service/storage_service.cc | 10 ++++++++++ service/storage_service.hh | 3 +++ 6 files changed, 33 insertions(+), 6 deletions(-) diff --git a/api/api-doc/storage_service.json b/api/api-doc/storage_service.json index a01a3abbdbb4..ed70c27e87fb 100644 --- a/api/api-doc/storage_service.json +++ b/api/api-doc/storage_service.json @@ -1954,7 +1954,7 @@ "operations":[ { "method":"POST", - "summary":"Reset local schema", + "summary":"Forces this node to recalculate versions of schema objects.", "type":"void", "nickname":"reset_local_schema", "produces":[ diff --git a/api/storage_service.cc b/api/storage_service.cc index b7071c85b51c..e6ab11934b40 100644 --- a/api/storage_service.cc +++ b/api/storage_service.cc @@ -1014,13 +1014,11 @@ void set_storage_service(http_context& ctx, routes& r, sharded(res); }); - ss::reset_local_schema.set(r, [&ctx, &sys_ks](std::unique_ptr req) { + ss::reset_local_schema.set(r, [&ss](std::unique_ptr req) -> future { // FIXME: We should truncate schema tables if more than one node in the cluster. - auto& fs = ctx.sp.local().features(); apilog.info("reset_local_schema"); - return db::schema_tables::recalculate_schema_version(sys_ks, ctx.sp, fs).then([] { - return make_ready_future(json_void()); - }); + co_await ss.local().reload_schema(); + co_return json_void(); }); ss::set_trace_probability.set(r, [](std::unique_ptr req) { diff --git a/replica/database.cc b/replica/database.cc index 8b12bd4225d9..09ef1ba093cd 100644 --- a/replica/database.cc +++ b/replica/database.cc @@ -2460,6 +2460,12 @@ future<> database::flush_table_on_all_shards(sharded& sharded_db, tabl }); } +future<> database::drop_cache_for_table_on_all_shards(sharded& sharded_db, table_id id) { + return sharded_db.invoke_on_all([id] (replica::database& db) { + return db.find_column_family(id).get_row_cache().invalidate(row_cache::external_updater([] {})); + }); +} + future<> database::flush_table_on_all_shards(sharded& sharded_db, std::string_view ks_name, std::string_view table_name) { return flush_table_on_all_shards(sharded_db, sharded_db.local().find_uuid(ks_name, table_name)); } @@ -2477,6 +2483,13 @@ future<> database::flush_keyspace_on_all_shards(sharded& sharded_db, s }); } +future<> database::drop_cache_for_keyspace_on_all_shards(sharded& sharded_db, std::string_view ks_name) { + auto& ks = sharded_db.local().find_keyspace(ks_name); + return parallel_for_each(ks.metadata()->cf_meta_data(), [&] (auto& pair) { + return drop_cache_for_table_on_all_shards(sharded_db, pair.second->id()); + }); +} + future<> database::snapshot_table_on_all_shards(sharded& sharded_db, std::string_view ks_name, sstring table_name, sstring tag, db::snapshot_ctl::snap_views snap_views, bool skip_flush) { if (!skip_flush) { co_await flush_table_on_all_shards(sharded_db, ks_name, table_name); diff --git a/replica/database.hh b/replica/database.hh index 54e10cbd3f0e..3c778acc9259 100644 --- a/replica/database.hh +++ b/replica/database.hh @@ -1734,6 +1734,9 @@ public: // flush all tables in a keyspace on all shards. static future<> flush_keyspace_on_all_shards(sharded& sharded_db, std::string_view ks_name); + static future<> drop_cache_for_table_on_all_shards(sharded& sharded_db, table_id id); + static future<> drop_cache_for_keyspace_on_all_shards(sharded& sharded_db, std::string_view ks_name); + static future<> snapshot_table_on_all_shards(sharded& sharded_db, std::string_view ks_name, sstring table_name, sstring tag, db::snapshot_ctl::snap_views, bool skip_flush); static future<> snapshot_tables_on_all_shards(sharded& sharded_db, std::string_view ks_name, std::vector table_names, sstring tag, db::snapshot_ctl::snap_views, bool skip_flush); static future<> snapshot_keyspace_on_all_shards(sharded& sharded_db, std::string_view ks_name, sstring tag, bool skip_flush); diff --git a/service/storage_service.cc b/service/storage_service.cc index 5f00db251990..746cfabf3cac 100644 --- a/service/storage_service.cc +++ b/service/storage_service.cc @@ -5175,6 +5175,16 @@ future storage_service::node_ops_cmd_handler(gms::inet_ad }); } +future<> storage_service::reload_schema() { + // Flush memtables and clear cache so that we use the same state we would after node restart + // to rule out potential discrepancies which could stem from merging with memtable/cache readers. + co_await replica::database::flush_keyspace_on_all_shards(_db, db::schema_tables::v3::NAME); + co_await replica::database::drop_cache_for_keyspace_on_all_shards(_db, db::schema_tables::v3::NAME); + co_await _migration_manager.invoke_on(0, [] (auto& mm) { + return mm.reload_schema(); + }); +} + future<> storage_service::drain() { return run_with_api_lock(sstring("drain"), [] (storage_service& ss) { if (ss._operation_mode == mode::DRAINED) { diff --git a/service/storage_service.hh b/service/storage_service.hh index a06bf05f1ba5..0072214d687d 100644 --- a/service/storage_service.hh +++ b/service/storage_service.hh @@ -687,6 +687,9 @@ public: */ future<> drain(); + // Recalculates schema digests on this node from contents of tables on disk. + future<> reload_schema(); + future> get_ownership(); future> effective_ownership(sstring keyspace_name);