Skip to content

Commit

Permalink
Merge 'move migration_request handling to shard0' from Gleb
Browse files Browse the repository at this point in the history
The RPC is used by group0 now which is available only on shard0

Fixes #17565

* 'gleb/migration-request-shard0' of github.com:scylladb/scylla-dev:
  raft_group0_client: assert that hold_read_apply_mutex is called on shard 0
  migration_manager: fix indentation after the previous patch.
  messaging_service: process migration_request rpc on shard 0
  • Loading branch information
kbr-scylla committed Feb 29, 2024
2 parents 6afa80a + 9847e27 commit 57b1458
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 36 deletions.
73 changes: 37 additions & 36 deletions service/migration_manager.cc
Expand Up @@ -141,48 +141,49 @@ void migration_manager::init_messaging_service()
});
return netw::messaging_service::no_wait();
});
_messaging.register_migration_request(std::bind_front(
[] (migration_manager& self, const rpc::client_info& cinfo, rpc::optional<netw::schema_pull_options> options)
_messaging.register_migration_request([this] (const rpc::client_info& cinfo, rpc::optional<netw::schema_pull_options> options) {
return container().invoke_on(0, std::bind_front(
[] (netw::msg_addr src, rpc::optional<netw::schema_pull_options> options, migration_manager& self)
-> future<rpc::tuple<std::vector<frozen_mutation>, std::vector<canonical_mutation>>> {
const auto cm_retval_supported = options && options->remote_supports_canonical_mutation_retval;
if (!cm_retval_supported) {
// Canonical mutations support was added way back in scylla-3.2 and we don't support
// skipping versions during upgrades (certainly not a 3.2 -> 5.4 upgrade).
auto src = netw::messaging_service::get_source(cinfo);
on_internal_error(mlogger, ::format(
"canonical mutations not supported by {}", src));
}
const auto cm_retval_supported = options && options->remote_supports_canonical_mutation_retval;
if (!cm_retval_supported) {
// Canonical mutations support was added way back in scylla-3.2 and we don't support
// skipping versions during upgrades (certainly not a 3.2 -> 5.4 upgrade).
on_internal_error(mlogger, ::format(
"canonical mutations not supported by {}", src));
}

auto features = self._feat.cluster_schema_features();
auto& proxy = self._storage_proxy.container();
auto& db = proxy.local().get_db();
semaphore_units<> guard;
if (options->group0_snapshot_transfer) {
guard = co_await self._group0_client.hold_read_apply_mutex(self._as);
}
auto cm = co_await db::schema_tables::convert_schema_to_mutations(proxy, features);
if (options->group0_snapshot_transfer) {
cm.emplace_back(co_await db::system_keyspace::get_group0_history(db));
if (proxy.local().local_db().get_config().check_experimental(db::experimental_features_t::feature::TABLETS)) {
for (auto&& m: co_await replica::read_tablet_mutations(db)) {
cm.emplace_back(std::move(m));
auto features = self._feat.cluster_schema_features();
auto& proxy = self._storage_proxy.container();
auto& db = proxy.local().get_db();
semaphore_units<> guard;
if (options->group0_snapshot_transfer) {
guard = co_await self._group0_client.hold_read_apply_mutex(self._as);
}
auto cm = co_await db::schema_tables::convert_schema_to_mutations(proxy, features);
if (options->group0_snapshot_transfer) {
cm.emplace_back(co_await db::system_keyspace::get_group0_history(db));
if (proxy.local().local_db().get_config().check_experimental(db::experimental_features_t::feature::TABLETS)) {
for (auto&& m: co_await replica::read_tablet_mutations(db)) {
cm.emplace_back(std::move(m));
}
}
}
}

// If the schema we're returning was last modified in group 0 mode, we also need to return
// the persisted schema version so the pulling node uses it instead of calculating a schema digest.
//
// If it was modified in RECOVERY mode, we still need to return the mutation as it may contain a tombstone
// that will force the pulling node to revert to digest calculation instead of using a version that it
// could've persisted earlier.
auto group0_schema_version = co_await self._sys_ks.local().get_group0_schema_version();
if (group0_schema_version) {
cm.emplace_back(std::move(*group0_schema_version));
}
// If the schema we're returning was last modified in group 0 mode, we also need to return
// the persisted schema version so the pulling node uses it instead of calculating a schema digest.
//
// If it was modified in RECOVERY mode, we still need to return the mutation as it may contain a tombstone
// that will force the pulling node to revert to digest calculation instead of using a version that it
// could've persisted earlier.
auto group0_schema_version = co_await self._sys_ks.local().get_group0_schema_version();
if (group0_schema_version) {
cm.emplace_back(std::move(*group0_schema_version));
}

co_return rpc::tuple(std::vector<frozen_mutation>{}, std::move(cm));
}, std::ref(*this)));
co_return rpc::tuple(std::vector<frozen_mutation>{}, std::move(cm));
}, netw::messaging_service::get_source(cinfo), std::move(options)));
});
_messaging.register_schema_check([this] {
return make_ready_future<table_schema_version>(_storage_proxy.get_db().local().get_version());
});
Expand Down
4 changes: 4 additions & 0 deletions service/raft/raft_group0_client.cc
Expand Up @@ -430,6 +430,10 @@ future<semaphore_units<>> raft_group0_client::hold_read_apply_mutex() {
}

future<semaphore_units<>> raft_group0_client::hold_read_apply_mutex(abort_source& as) {
if (this_shard_id() != 0) {
on_internal_error(logger, "hold_read_apply_mutex: must run on shard 0");
}

return get_units(_read_apply_mutex, 1, as);
}

Expand Down

0 comments on commit 57b1458

Please sign in to comment.