diff --git a/configure.py b/configure.py index f6bfa81dc64d..280826a43d69 100755 --- a/configure.py +++ b/configure.py @@ -1126,6 +1126,7 @@ def find_ninja(): 'utils/lister.cc', 'repair/repair.cc', 'repair/row_level.cc', + 'repair/table_check.cc', 'exceptions/exceptions.cc', 'auth/allow_all_authenticator.cc', 'auth/allow_all_authorizer.cc', diff --git a/repair/repair.cc b/repair/repair.cc index f63972dc0c66..2ce721791621 100644 --- a/repair/repair.cc +++ b/repair/repair.cc @@ -14,6 +14,7 @@ #include "gms/inet_address.hh" #include "gms/gossiper.hh" #include "message/messaging_service.hh" +#include "repair/table_check.hh" #include "replica/database.hh" #include "service/migration_manager.hh" #include "service/storage_service.hh" @@ -740,9 +741,12 @@ future<> repair::shard_repair_task_impl::repair_range(const dht::token_range& ra co_return; } try { - co_await repair_cf_range_row_level(*this, cf, table.id, range, neighbors, _small_table_optimization); - } catch (replica::no_such_column_family&) { - dropped_tables.insert(cf); + auto dropped = co_await with_table_drop_silenced(db.local(), mm, table.id, [&] (const table_id& uuid) { + return repair_cf_range_row_level(*this, cf, table.id, range, neighbors, _small_table_optimization); + }); + if (dropped) { + dropped_tables.insert(cf); + } } catch (...) { nr_failed_ranges++; throw; diff --git a/repair/table_check.cc b/repair/table_check.cc new file mode 100644 index 000000000000..7095749b89f3 --- /dev/null +++ b/repair/table_check.cc @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2024-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#include "replica/database.hh" +#include "repair/table_check.hh" +#include "service/migration_manager.hh" + +namespace repair { + +future table_sync_and_check(replica::database& db, service::migration_manager& mm, const table_id& uuid) { + if (mm.use_raft()) { + abort_on_expiry aoe(lowres_clock::now() + std::chrono::seconds{10}); + auto& as = aoe.abort_source(); + auto sub = mm.get_abort_source().subscribe([&as] () noexcept { + if (!as.abort_requested()) { + as.request_abort(); + } + }); + + // Trigger read barrier to synchronize schema. + co_await mm.get_group0_barrier().trigger(as); + } + + co_return !db.column_family_exists(uuid); +} + +future with_table_drop_silenced(replica::database& db, service::migration_manager& mm, const table_id& uuid, + std::function(const table_id&)> f) { + std::exception_ptr ex = nullptr; + try { + co_await f(uuid); + co_return table_dropped::no; + } catch (replica::no_such_column_family&) { + // No need to synchronize while we know the table was dropped. + } catch (...) { + // This node may still see a table while it is dropped on the remote node + // and so the remote node returns an error. In that case we want to skip + // that table and continue with the operation. + // + // But since RPC does not enable returning the exception type, the cause + // of the failure cannot be determined. Synchronize schema to see the latest + // changes and determine whether the table was dropped. + ex = std::current_exception(); + } + + if (ex) { + auto dropped = co_await table_sync_and_check(db, mm, uuid); + if (!dropped) { + co_await coroutine::return_exception_ptr(std::move(ex)); + } + } + co_return table_dropped::yes; +} + +} diff --git a/repair/table_check.hh b/repair/table_check.hh new file mode 100644 index 000000000000..587ba87b5a83 --- /dev/null +++ b/repair/table_check.hh @@ -0,0 +1,42 @@ +/* + * Copyright (C) 2024-present ScyllaDB + */ + +/* + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +#pragma once + +#include +#include + +#include "schema/schema_fwd.hh" + +using table_dropped = bool_class; + +namespace raft { +class server; +} + +namespace replica { +class database; +} + +namespace service { +class migration_manager; +} + +namespace repair { + +class database; + +future table_sync_and_check(replica::database& db, service::migration_manager& mm, const table_id& uuid); + +// Runs function f on given table. If f throws and the table is dropped, the exception is swallowed. +// Function is aimed to handle no_such_column_family on remote node or different shard, as it synchronizes +// schema before checking the table. Prefer standard error handling whenever possible. +future with_table_drop_silenced(replica::database& db, service::migration_manager& mm, const table_id& uuid, + std::function(const table_id&)> f); + +} diff --git a/service/migration_manager.hh b/service/migration_manager.hh index 71e9d645fc48..f90d5756afe9 100644 --- a/service/migration_manager.hh +++ b/service/migration_manager.hh @@ -94,6 +94,11 @@ public: const migration_notifier& get_notifier() const { return _notifier; } service::storage_proxy& get_storage_proxy() { return _storage_proxy; } const service::storage_proxy& get_storage_proxy() const { return _storage_proxy; } + abort_source& get_abort_source() noexcept { return _as; } + const abort_source& get_abort_source() const noexcept { return _as; } + serialized_action& get_group0_barrier() noexcept { return _group0_barrier; } + const serialized_action& get_group0_barrier() const noexcept { return _group0_barrier; } + bool use_raft() const noexcept { return !_enable_schema_pulls; } // Disable schema pulls when Raft group 0 is fully responsible for managing schema. future<> disable_schema_pulls(); diff --git a/streaming/stream_manager.hh b/streaming/stream_manager.hh index 5b8e036603df..b080dba3edab 100644 --- a/streaming/stream_manager.hh +++ b/streaming/stream_manager.hh @@ -127,6 +127,7 @@ public: replica::database& db() noexcept { return _db.local(); } netw::messaging_service& ms() noexcept { return _ms.local(); } + service::migration_manager& mm() noexcept { return _mm.local(); } const std::unordered_map>& get_initiated_streams() const { return _initiated_streams; diff --git a/streaming/stream_session.cc b/streaming/stream_session.cc index 4347e8bc643b..f511cce17618 100644 --- a/streaming/stream_session.cc +++ b/streaming/stream_session.cc @@ -191,6 +191,9 @@ void stream_manager::init_messaging_service_handler(abort_source& as) { // Make sure the table with cf_id is still present at this point. // Close the sink in case the table is dropped. auto& table = _db.local().find_column_family(cf_id); + utils::get_local_injector().inject("stream_mutation_fragments_table_dropped", [this] () { + _db.local().find_column_family(table_id::create_null_id()); + }); auto op = table.stream_in_progress(); auto sharder_ptr = std::make_unique(table.shared_from_this()); auto& sharder = *sharder_ptr; diff --git a/streaming/stream_transfer_task.cc b/streaming/stream_transfer_task.cc index f78e3912cf6f..a40746dd6880 100644 --- a/streaming/stream_transfer_task.cc +++ b/streaming/stream_transfer_task.cc @@ -30,7 +30,9 @@ #include #include "sstables/sstables.hh" #include "replica/database.hh" +#include "repair/table_check.hh" #include "gms/feature_service.hh" +#include "utils/error_injection.hh" namespace streaming { @@ -203,63 +205,59 @@ future<> send_mutation_fragments(lw_shared_ptr si) { future<> stream_transfer_task::execute() { auto plan_id = session->plan_id(); auto cf_id = this->cf_id; - auto dst_cpu_id = session->dst_cpu_id; auto id = netw::messaging_service::msg_addr{session->peer, session->dst_cpu_id}; - sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}", plan_id, cf_id); - sort_and_merge_ranges(); - auto reason = session->get_reason(); auto& sm = session->manager(); - auto topo_guard = session->topo_guard(); - return sm.container().invoke_on_all([plan_id, cf_id, id, dst_cpu_id, ranges=this->_ranges, reason, topo_guard] (stream_manager& sm) mutable { - auto tbl = sm.db().find_column_family(cf_id).shared_from_this(); - return sm.db().obtain_reader_permit(*tbl, "stream-transfer-task", db::no_timeout, {}).then([&sm, tbl, plan_id, cf_id, id, dst_cpu_id, ranges=std::move(ranges), reason, topo_guard] (reader_permit permit) mutable { - auto si = make_lw_shared(sm.ms(), plan_id, tbl, std::move(permit), std::move(ranges), id, dst_cpu_id, reason, topo_guard, [&sm, plan_id, addr = id.addr] (size_t sz) { - sm.update_progress(plan_id, addr, streaming::progress_info::direction::OUT, sz); - }); - return si->has_relevant_range_on_this_shard().then([si, plan_id, cf_id] (bool has_relevant_range_on_this_shard) { - if (!has_relevant_range_on_this_shard) { - sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}: ignore ranges on shard={}", - plan_id, cf_id, this_shard_id()); - return make_ready_future<>(); - } - return send_mutation_fragments(std::move(si)); - }).finally([si] { - return si->reader.close(); - }); - }); - }).then([this, plan_id, cf_id, id, &sm] { - sslog.debug("[Stream #{}] SEND STREAM_MUTATION_DONE to {}, cf_id={}", plan_id, id, cf_id); - return sm.ms().send_stream_mutation_done(id, plan_id, _ranges, - cf_id, session->dst_cpu_id).handle_exception([plan_id, id] (auto ep) { - sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION_DONE to {}: {}", plan_id, id, ep); + auto table_dropped = co_await repair::with_table_drop_silenced(sm.db(), sm.mm(), cf_id, [this, &sm, cf_id, plan_id, id] (const table_id &) { + auto dst_cpu_id = session->dst_cpu_id; + sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}", plan_id, cf_id); + sort_and_merge_ranges(); + auto reason = session->get_reason(); + auto topo_guard = session->topo_guard(); + return sm.container().invoke_on_all([plan_id, cf_id, id, dst_cpu_id, ranges=this->_ranges, reason, topo_guard] (stream_manager& sm) mutable { + auto tbl = sm.db().find_column_family(cf_id).shared_from_this(); + return sm.db().obtain_reader_permit(*tbl, "stream-transfer-task", db::no_timeout, {}).then([&sm, tbl, plan_id, cf_id, id, dst_cpu_id, ranges=std::move(ranges), reason, topo_guard] (reader_permit permit) mutable { + auto si = make_lw_shared(sm.ms(), plan_id, tbl, std::move(permit), std::move(ranges), id, dst_cpu_id, reason, topo_guard, [&sm, plan_id, addr = id.addr] (size_t sz) { + sm.update_progress(plan_id, addr, streaming::progress_info::direction::OUT, sz); + }); + return si->has_relevant_range_on_this_shard().then([si, plan_id, cf_id] (bool has_relevant_range_on_this_shard) { + if (!has_relevant_range_on_this_shard) { + sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}: ignore ranges on shard={}", + plan_id, cf_id, this_shard_id()); + return make_ready_future<>(); + } + return send_mutation_fragments(std::move(si)); + }).finally([si] { + return si->reader.close(); + }); + }); + }).then([this, plan_id, cf_id, id, &sm] { + sslog.debug("[Stream #{}] SEND STREAM_MUTATION_DONE to {}, cf_id={}", plan_id, id, cf_id); + return sm.ms().send_stream_mutation_done(id, plan_id, _ranges, + cf_id, session->dst_cpu_id).handle_exception([plan_id, id] (auto ep) { + sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION_DONE to {}: {}", plan_id, id, ep); + std::rethrow_exception(ep); + }); + }).then([this, id, plan_id] { + _mutation_done_sent = true; + sslog.debug("[Stream #{}] GOT STREAM_MUTATION_DONE Reply from {}", plan_id, id.addr); + }).handle_exception([plan_id, id, &sm] (std::exception_ptr ep) { + sslog.warn("[Stream #{}] stream_transfer_task: Fail to send to {}: {}", plan_id, id, ep); + utils::get_local_injector().inject("stream_mutation_fragments_table_dropped", [&sm] () { + sm.db().find_column_family(table_id::create_null_id()); + }); std::rethrow_exception(ep); }); - }).then([this, id, plan_id] { - _mutation_done_sent = true; - sslog.debug("[Stream #{}] GOT STREAM_MUTATION_DONE Reply from {}", plan_id, id.addr); - }).handle_exception([this, plan_id, cf_id, id] (std::exception_ptr ep) { - // If the table is dropped during streaming, we can ignore the - // errors and make the stream successful. This allows user to - // drop tables during node operations like decommission or - // bootstrap. - // - // The db table metadata on different shards are not necessarily in - // sync, but if the table is dropped on any one of them, the exception - // is thrown. So we need to check on all shards. - return session->manager().db().container().map_reduce0( - [cf_id] (const replica::database& db) { return db.column_family_exists(cf_id); }, - true, std::logical_and()).then([this, plan_id, cf_id, id, ep] (bool cf_exists) { - if (cf_exists) { - sslog.warn("[Stream #{}] stream_transfer_task: Fail to send to {}: {}", plan_id, id, ep); - std::rethrow_exception(ep); - } - sslog.warn("[Stream #{}] Ignore the table with table_id {} which is dropped during streaming: {}", plan_id, cf_id, ep); - if (_mutation_done_sent) { - return make_ready_future(); - } - return session->manager().ms().send_stream_mutation_done(id, plan_id, _ranges, cf_id, session->dst_cpu_id); - }); }); + // If the table is dropped during streaming, we can ignore the + // errors and make the stream successful. This allows user to + // drop tables during node operations like decommission or + // bootstrap. + if (table_dropped) { + sslog.warn("[Stream #{}] Ignore the table with table_id {} which is dropped during streaming", plan_id, cf_id); + if (!_mutation_done_sent) { + co_await session->manager().ms().send_stream_mutation_done(id, plan_id, _ranges, cf_id, session->dst_cpu_id); + } + } } void stream_transfer_task::append_ranges(const dht::token_range_vector& ranges) { diff --git a/test/topology_custom/test_table_drop.py b/test/topology_custom/test_table_drop.py new file mode 100644 index 000000000000..421a8817c785 --- /dev/null +++ b/test/topology_custom/test_table_drop.py @@ -0,0 +1,11 @@ +from test.pylib.manager_client import ManagerClient +import pytest + +@pytest.mark.asyncio +async def test_drop_table_during_streaming_receiver_side(manager: ManagerClient): + servers = [await manager.server_add(config={ + 'error_injections_at_startup': ['stream_mutation_fragments_table_dropped'], + 'enable_repair_based_node_ops': False, + 'enable_user_defined_functions': False, + 'experimental_features': [] + }) for _ in range(2)]