Skip to content

Commit

Permalink
Merge 'repair: streaming: handle no_such_column_family from remote no…
Browse files Browse the repository at this point in the history
…de' from Aleksandra Martyniuk

RPC calls lose information about the type of returned exception.
Thus, if a table is dropped on receiver node, but it still exists
on a sender node and sender node streams the table's data, then
the whole operation fails.

To prevent that, add a method which synchronizes schema and then
checks, if the exception was caused by table drop. If so,
the exception is swallowed.

Use the method in streaming and repair to continue them when
the table is dropped in the meantime.

Fixes: #17028.
Fixes: #15370.
Fixes: #15598.

Closes #17231

* github.com:scylladb/scylladb:
  repair: handle no_such_column_family from remote node gracefully
  test: test drop table on receiver side during streaming
  streaming: fix indentation
  streaming: handle no_such_column_family from remote node gracefully
  repair: add methods to skip dropped table
  • Loading branch information
denesb committed Feb 23, 2024
2 parents 3574c22 + cf36015 commit 959d33b
Show file tree
Hide file tree
Showing 9 changed files with 180 additions and 55 deletions.
1 change: 1 addition & 0 deletions configure.py
Expand Up @@ -1126,6 +1126,7 @@ def find_ninja():
'utils/lister.cc',
'repair/repair.cc',
'repair/row_level.cc',
'repair/table_check.cc',
'exceptions/exceptions.cc',
'auth/allow_all_authenticator.cc',
'auth/allow_all_authorizer.cc',
Expand Down
10 changes: 7 additions & 3 deletions repair/repair.cc
Expand Up @@ -14,6 +14,7 @@
#include "gms/inet_address.hh"
#include "gms/gossiper.hh"
#include "message/messaging_service.hh"
#include "repair/table_check.hh"
#include "replica/database.hh"
#include "service/migration_manager.hh"
#include "service/storage_service.hh"
Expand Down Expand Up @@ -740,9 +741,12 @@ future<> repair::shard_repair_task_impl::repair_range(const dht::token_range& ra
co_return;
}
try {
co_await repair_cf_range_row_level(*this, cf, table.id, range, neighbors, _small_table_optimization);
} catch (replica::no_such_column_family&) {
dropped_tables.insert(cf);
auto dropped = co_await with_table_drop_silenced(db.local(), mm, table.id, [&] (const table_id& uuid) {
return repair_cf_range_row_level(*this, cf, table.id, range, neighbors, _small_table_optimization);
});
if (dropped) {
dropped_tables.insert(cf);
}
} catch (...) {
nr_failed_ranges++;
throw;
Expand Down
60 changes: 60 additions & 0 deletions repair/table_check.cc
@@ -0,0 +1,60 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/

/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/

#include "replica/database.hh"
#include "repair/table_check.hh"
#include "service/migration_manager.hh"

namespace repair {

future<table_dropped> table_sync_and_check(replica::database& db, service::migration_manager& mm, const table_id& uuid) {
if (mm.use_raft()) {
abort_on_expiry aoe(lowres_clock::now() + std::chrono::seconds{10});
auto& as = aoe.abort_source();
auto sub = mm.get_abort_source().subscribe([&as] () noexcept {
if (!as.abort_requested()) {
as.request_abort();
}
});

// Trigger read barrier to synchronize schema.
co_await mm.get_group0_barrier().trigger(as);
}

co_return !db.column_family_exists(uuid);
}

future<table_dropped> with_table_drop_silenced(replica::database& db, service::migration_manager& mm, const table_id& uuid,
std::function<future<>(const table_id&)> f) {
std::exception_ptr ex = nullptr;
try {
co_await f(uuid);
co_return table_dropped::no;
} catch (replica::no_such_column_family&) {
// No need to synchronize while we know the table was dropped.
} catch (...) {
// This node may still see a table while it is dropped on the remote node
// and so the remote node returns an error. In that case we want to skip
// that table and continue with the operation.
//
// But since RPC does not enable returning the exception type, the cause
// of the failure cannot be determined. Synchronize schema to see the latest
// changes and determine whether the table was dropped.
ex = std::current_exception();
}

if (ex) {
auto dropped = co_await table_sync_and_check(db, mm, uuid);
if (!dropped) {
co_await coroutine::return_exception_ptr(std::move(ex));
}
}
co_return table_dropped::yes;
}

}
42 changes: 42 additions & 0 deletions repair/table_check.hh
@@ -0,0 +1,42 @@
/*
* Copyright (C) 2024-present ScyllaDB
*/

/*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/

#pragma once

#include <seastar/core/future.hh>
#include <seastar/util/bool_class.hh>

#include "schema/schema_fwd.hh"

using table_dropped = bool_class<class table_dropped_tag>;

namespace raft {
class server;
}

namespace replica {
class database;
}

namespace service {
class migration_manager;
}

namespace repair {

class database;

future<table_dropped> table_sync_and_check(replica::database& db, service::migration_manager& mm, const table_id& uuid);

// Runs function f on given table. If f throws and the table is dropped, the exception is swallowed.
// Function is aimed to handle no_such_column_family on remote node or different shard, as it synchronizes
// schema before checking the table. Prefer standard error handling whenever possible.
future<table_dropped> with_table_drop_silenced(replica::database& db, service::migration_manager& mm, const table_id& uuid,
std::function<future<>(const table_id&)> f);

}
5 changes: 5 additions & 0 deletions service/migration_manager.hh
Expand Up @@ -94,6 +94,11 @@ public:
const migration_notifier& get_notifier() const { return _notifier; }
service::storage_proxy& get_storage_proxy() { return _storage_proxy; }
const service::storage_proxy& get_storage_proxy() const { return _storage_proxy; }
abort_source& get_abort_source() noexcept { return _as; }
const abort_source& get_abort_source() const noexcept { return _as; }
serialized_action& get_group0_barrier() noexcept { return _group0_barrier; }
const serialized_action& get_group0_barrier() const noexcept { return _group0_barrier; }
bool use_raft() const noexcept { return !_enable_schema_pulls; }

// Disable schema pulls when Raft group 0 is fully responsible for managing schema.
future<> disable_schema_pulls();
Expand Down
1 change: 1 addition & 0 deletions streaming/stream_manager.hh
Expand Up @@ -127,6 +127,7 @@ public:

replica::database& db() noexcept { return _db.local(); }
netw::messaging_service& ms() noexcept { return _ms.local(); }
service::migration_manager& mm() noexcept { return _mm.local(); }

const std::unordered_map<plan_id, shared_ptr<stream_result_future>>& get_initiated_streams() const {
return _initiated_streams;
Expand Down
3 changes: 3 additions & 0 deletions streaming/stream_session.cc
Expand Up @@ -191,6 +191,9 @@ void stream_manager::init_messaging_service_handler(abort_source& as) {
// Make sure the table with cf_id is still present at this point.
// Close the sink in case the table is dropped.
auto& table = _db.local().find_column_family(cf_id);
utils::get_local_injector().inject("stream_mutation_fragments_table_dropped", [this] () {
_db.local().find_column_family(table_id::create_null_id());
});
auto op = table.stream_in_progress();
auto sharder_ptr = std::make_unique<dht::auto_refreshing_sharder>(table.shared_from_this());
auto& sharder = *sharder_ptr;
Expand Down
102 changes: 50 additions & 52 deletions streaming/stream_transfer_task.cc
Expand Up @@ -30,7 +30,9 @@
#include <boost/icl/interval_set.hpp>
#include "sstables/sstables.hh"
#include "replica/database.hh"
#include "repair/table_check.hh"
#include "gms/feature_service.hh"
#include "utils/error_injection.hh"

namespace streaming {

Expand Down Expand Up @@ -203,63 +205,59 @@ future<> send_mutation_fragments(lw_shared_ptr<send_info> si) {
future<> stream_transfer_task::execute() {
auto plan_id = session->plan_id();
auto cf_id = this->cf_id;
auto dst_cpu_id = session->dst_cpu_id;
auto id = netw::messaging_service::msg_addr{session->peer, session->dst_cpu_id};
sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}", plan_id, cf_id);
sort_and_merge_ranges();
auto reason = session->get_reason();
auto& sm = session->manager();
auto topo_guard = session->topo_guard();
return sm.container().invoke_on_all([plan_id, cf_id, id, dst_cpu_id, ranges=this->_ranges, reason, topo_guard] (stream_manager& sm) mutable {
auto tbl = sm.db().find_column_family(cf_id).shared_from_this();
return sm.db().obtain_reader_permit(*tbl, "stream-transfer-task", db::no_timeout, {}).then([&sm, tbl, plan_id, cf_id, id, dst_cpu_id, ranges=std::move(ranges), reason, topo_guard] (reader_permit permit) mutable {
auto si = make_lw_shared<send_info>(sm.ms(), plan_id, tbl, std::move(permit), std::move(ranges), id, dst_cpu_id, reason, topo_guard, [&sm, plan_id, addr = id.addr] (size_t sz) {
sm.update_progress(plan_id, addr, streaming::progress_info::direction::OUT, sz);
});
return si->has_relevant_range_on_this_shard().then([si, plan_id, cf_id] (bool has_relevant_range_on_this_shard) {
if (!has_relevant_range_on_this_shard) {
sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}: ignore ranges on shard={}",
plan_id, cf_id, this_shard_id());
return make_ready_future<>();
}
return send_mutation_fragments(std::move(si));
}).finally([si] {
return si->reader.close();
});
});
}).then([this, plan_id, cf_id, id, &sm] {
sslog.debug("[Stream #{}] SEND STREAM_MUTATION_DONE to {}, cf_id={}", plan_id, id, cf_id);
return sm.ms().send_stream_mutation_done(id, plan_id, _ranges,
cf_id, session->dst_cpu_id).handle_exception([plan_id, id] (auto ep) {
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION_DONE to {}: {}", plan_id, id, ep);
auto table_dropped = co_await repair::with_table_drop_silenced(sm.db(), sm.mm(), cf_id, [this, &sm, cf_id, plan_id, id] (const table_id &) {
auto dst_cpu_id = session->dst_cpu_id;
sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}", plan_id, cf_id);
sort_and_merge_ranges();
auto reason = session->get_reason();
auto topo_guard = session->topo_guard();
return sm.container().invoke_on_all([plan_id, cf_id, id, dst_cpu_id, ranges=this->_ranges, reason, topo_guard] (stream_manager& sm) mutable {
auto tbl = sm.db().find_column_family(cf_id).shared_from_this();
return sm.db().obtain_reader_permit(*tbl, "stream-transfer-task", db::no_timeout, {}).then([&sm, tbl, plan_id, cf_id, id, dst_cpu_id, ranges=std::move(ranges), reason, topo_guard] (reader_permit permit) mutable {
auto si = make_lw_shared<send_info>(sm.ms(), plan_id, tbl, std::move(permit), std::move(ranges), id, dst_cpu_id, reason, topo_guard, [&sm, plan_id, addr = id.addr] (size_t sz) {
sm.update_progress(plan_id, addr, streaming::progress_info::direction::OUT, sz);
});
return si->has_relevant_range_on_this_shard().then([si, plan_id, cf_id] (bool has_relevant_range_on_this_shard) {
if (!has_relevant_range_on_this_shard) {
sslog.debug("[Stream #{}] stream_transfer_task: cf_id={}: ignore ranges on shard={}",
plan_id, cf_id, this_shard_id());
return make_ready_future<>();
}
return send_mutation_fragments(std::move(si));
}).finally([si] {
return si->reader.close();
});
});
}).then([this, plan_id, cf_id, id, &sm] {
sslog.debug("[Stream #{}] SEND STREAM_MUTATION_DONE to {}, cf_id={}", plan_id, id, cf_id);
return sm.ms().send_stream_mutation_done(id, plan_id, _ranges,
cf_id, session->dst_cpu_id).handle_exception([plan_id, id] (auto ep) {
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send STREAM_MUTATION_DONE to {}: {}", plan_id, id, ep);
std::rethrow_exception(ep);
});
}).then([this, id, plan_id] {
_mutation_done_sent = true;
sslog.debug("[Stream #{}] GOT STREAM_MUTATION_DONE Reply from {}", plan_id, id.addr);
}).handle_exception([plan_id, id, &sm] (std::exception_ptr ep) {
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send to {}: {}", plan_id, id, ep);
utils::get_local_injector().inject("stream_mutation_fragments_table_dropped", [&sm] () {
sm.db().find_column_family(table_id::create_null_id());
});
std::rethrow_exception(ep);
});
}).then([this, id, plan_id] {
_mutation_done_sent = true;
sslog.debug("[Stream #{}] GOT STREAM_MUTATION_DONE Reply from {}", plan_id, id.addr);
}).handle_exception([this, plan_id, cf_id, id] (std::exception_ptr ep) {
// If the table is dropped during streaming, we can ignore the
// errors and make the stream successful. This allows user to
// drop tables during node operations like decommission or
// bootstrap.
//
// The db table metadata on different shards are not necessarily in
// sync, but if the table is dropped on any one of them, the exception
// is thrown. So we need to check on all shards.
return session->manager().db().container().map_reduce0(
[cf_id] (const replica::database& db) { return db.column_family_exists(cf_id); },
true, std::logical_and<bool>()).then([this, plan_id, cf_id, id, ep] (bool cf_exists) {
if (cf_exists) {
sslog.warn("[Stream #{}] stream_transfer_task: Fail to send to {}: {}", plan_id, id, ep);
std::rethrow_exception(ep);
}
sslog.warn("[Stream #{}] Ignore the table with table_id {} which is dropped during streaming: {}", plan_id, cf_id, ep);
if (_mutation_done_sent) {
return make_ready_future();
}
return session->manager().ms().send_stream_mutation_done(id, plan_id, _ranges, cf_id, session->dst_cpu_id);
});
});
// If the table is dropped during streaming, we can ignore the
// errors and make the stream successful. This allows user to
// drop tables during node operations like decommission or
// bootstrap.
if (table_dropped) {
sslog.warn("[Stream #{}] Ignore the table with table_id {} which is dropped during streaming", plan_id, cf_id);
if (!_mutation_done_sent) {
co_await session->manager().ms().send_stream_mutation_done(id, plan_id, _ranges, cf_id, session->dst_cpu_id);
}
}
}

void stream_transfer_task::append_ranges(const dht::token_range_vector& ranges) {
Expand Down
11 changes: 11 additions & 0 deletions test/topology_custom/test_table_drop.py
@@ -0,0 +1,11 @@
from test.pylib.manager_client import ManagerClient
import pytest

@pytest.mark.asyncio
async def test_drop_table_during_streaming_receiver_side(manager: ManagerClient):
servers = [await manager.server_add(config={
'error_injections_at_startup': ['stream_mutation_fragments_table_dropped'],
'enable_repair_based_node_ops': False,
'enable_user_defined_functions': False,
'experimental_features': []
}) for _ in range(2)]

0 comments on commit 959d33b

Please sign in to comment.