Skip to content

Commit

Permalink
check if topology sm processes a global req
Browse files Browse the repository at this point in the history
  • Loading branch information
ptrsmrn committed Jan 17, 2024
1 parent 5aac4bd commit a7b8d74
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 7 deletions.
19 changes: 14 additions & 5 deletions cql3/query_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ const sstring query_processor::CQL_VERSION = "3.3.1";
const std::chrono::minutes prepared_statements_cache::entry_expiry = std::chrono::minutes(60);

struct query_processor::remote {
remote(service::migration_manager& mm, service::forward_service& fwd, service::raft_group0_client& group0_client)
: mm(mm), forwarder(fwd), group0_client(group0_client) {}
remote(service::migration_manager& mm, service::forward_service& fwd,
service::storage_service& ss, service::raft_group0_client& group0_client)
: mm(mm), forwarder(fwd), ss(ss), group0_client(group0_client) {}

service::migration_manager& mm;
service::forward_service& forwarder;
service::raft_group0_client& group0_client;

service::storage_service& ss;
seastar::gate gate;
};

Expand Down Expand Up @@ -498,8 +499,8 @@ query_processor::~query_processor() {
}

void query_processor::start_remote(service::migration_manager& mm, service::forward_service& forwarder,
service::raft_group0_client& group0_client) {
_remote = std::make_unique<struct remote>(mm, forwarder, group0_client);
service::storage_service& ss, service::raft_group0_client& group0_client) {
_remote = std::make_unique<struct remote>(mm, forwarder, ss, group0_client);
}

future<> query_processor::stop_remote() {
Expand All @@ -511,6 +512,14 @@ future<> query_processor::stop_remote() {
_remote = nullptr;
}

#include "service/storage_service.hh"

bool query_processor::global_topology_queue_empty() {
auto [remote_, _] = remote();
remote_.get().ss.;
return true;
}

future<> query_processor::stop() {
return _mnotifier.unregister_listener(_migration_subscriber.get()).then([this] {
return _authorized_prepared_cache.stop().finally([this] { return _prepared_cache.stop(); });
Expand Down
5 changes: 4 additions & 1 deletion cql3/query_processor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ public:

~query_processor();

void start_remote(service::migration_manager&, service::forward_service&, service::raft_group0_client&);
void start_remote(service::migration_manager&, service::forward_service&,
service::storage_service& ss, service::raft_group0_client&);
future<> stop_remote();

data_dictionary::database db() {
Expand Down Expand Up @@ -445,6 +446,8 @@ public:

void reset_cache();

bool global_topology_queue_empty();

private:
// Keep the holder until you stop using the `remote` services.
std::pair<std::reference_wrapper<remote>, gate::holder> remote();
Expand Down
3 changes: 3 additions & 0 deletions cql3/statements/alter_keyspace_statement.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ void cql3::statements::alter_keyspace_statement::validate(query_processor& qp, c
#include "locator/load_sketch.hh"
#include "service/topology_guard.hh"
#include "service/topology_mutation.hh"
#include "service/storage_service.hh"
#include "locator/tablet_replication_strategy.hh"

future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>>
Expand All @@ -101,8 +102,10 @@ cql3::statements::alter_keyspace_statement::prepare_schema_mutations(query_proce

// Check if topology_state_machine handles other global request
// TODO: - #include topology_state_machine to this module
qp.remote().first.get()->
if (topology_state_machine._topology.global_request) // TODO: refactor this check into a separate function
{
auto topology = qp.proxy().get_token_metadata_ptr()->get_topology_change_info();
// TODO: make this function accept group0_guard instead of timestamp_type
// this will require changing signature in the whole class hierarchy
service::group0_guard guard;
Expand Down
3 changes: 2 additions & 1 deletion main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1400,7 +1400,8 @@ To start the scylla server proper, simply invoke as: scylla server (or just scyl

supervisor::notify("initializing query processor remote part");
// TODO: do this together with proxy.start_remote(...)
qp.invoke_on_all(&cql3::query_processor::start_remote, std::ref(mm), std::ref(forward_service), std::ref(group0_client)).get();
qp.invoke_on_all(&cql3::query_processor::start_remote, std::ref(mm), std::ref(forward_service),
std::ref(ss), std::ref(group0_client)).get();
auto stop_qp_remote = defer_verbose_shutdown("query processor remote part", [&qp] {
qp.invoke_on_all(&cql3::query_processor::stop_remote).get();
});
Expand Down

0 comments on commit a7b8d74

Please sign in to comment.