Skip to content

Commit

Permalink
Revert "cql3: Extend the scope of group0_guard during DDL statement e…
Browse files Browse the repository at this point in the history
…xecution"

This reverts commit 70b5360. It generates
a failure in group0_test .test_concurrent_group0_modifications in debug
mode with about 4% probability.

Fixes #15050
  • Loading branch information
avikivity committed Aug 14, 2023
1 parent e7077da commit d57a951
Show file tree
Hide file tree
Showing 63 changed files with 136 additions and 252 deletions.
10 changes: 3 additions & 7 deletions cql3/cql_statement.hh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
#pragma once

#include "timeout_config.hh"
#include "service/raft/raft_group0_client.hh"

namespace service {

Expand Down Expand Up @@ -49,9 +48,6 @@ public:
// CQL statement text
seastar::sstring raw_cql_statement;

// True for statements that needs guard to be taken before the execution
bool needs_guard = false;

explicit cql_statement(timeout_config_selector timeout_selector) : _timeout_config_selector(timeout_selector) {}

virtual ~cql_statement()
Expand Down Expand Up @@ -86,7 +82,7 @@ public:
* @param options options for this query (consistency, variables, pageSize, ...)
*/
virtual seastar::future<seastar::shared_ptr<cql_transport::messages::result_message>>
execute(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const = 0;
execute(query_processor& qp, service::query_state& state, const query_options& options) const = 0;

/**
* Execute the statement and return the resulting result or null if there is no result.
Expand All @@ -98,8 +94,8 @@ public:
* @param options options for this query (consistency, variables, pageSize, ...)
*/
virtual seastar::future<seastar::shared_ptr<cql_transport::messages::result_message>>
execute_without_checking_exception_message(query_processor& qp, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) const {
return execute(qp, state, options, std::move(guard));
execute_without_checking_exception_message(query_processor& qp, service::query_state& state, const query_options& options) const {
return execute(qp, state, options);
}

virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const = 0;
Expand Down
141 changes: 42 additions & 99 deletions cql3/query_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -484,56 +484,16 @@ future<> query_processor::stop() {
});
}

future<::shared_ptr<cql_transport::messages::result_message>> query_processor::execute_with_guard(
std::function<future<::shared_ptr<cql_transport::messages::result_message>>(service::query_state&, ::shared_ptr<cql_statement>, const query_options&, std::optional<service::group0_guard>)> fn,
::shared_ptr<cql_statement> statement, service::query_state& query_state, const query_options& options) {
// execute all statements that need group0 guard on shard0
if (this_shard_id() != 0) {
co_return ::make_shared<cql_transport::messages::result_message::bounce_to_shard>(0,
std::move(const_cast<cql3::query_options&>(options).take_cached_pk_function_calls()));
}

auto [remote_, holder] = remote();
size_t retries = remote_.get().mm.get_concurrent_ddl_retries();
while (true) {
try {
auto guard = co_await remote_.get().mm.start_group0_operation();
co_return co_await fn(query_state, statement, options, std::move(guard));
} catch (const service::group0_concurrent_modification& ex) {
retries--;
log.warn("Failed to execute statement \"{}\" due to guard conflict.{}.",
statement->raw_cql_statement, retries ? " Retrying" : " Number of retries exceeded, giving up");
if (retries) {
continue;
}
throw;
}
}
}

template<typename... Args>
future<::shared_ptr<result_message>>
query_processor::execute_maybe_with_guard(service::query_state& query_state, ::shared_ptr<cql_statement> statement, const query_options& options,
future<::shared_ptr<result_message>>(query_processor::*fn)(service::query_state&, ::shared_ptr<cql_statement>, const query_options&, std::optional<service::group0_guard>, Args...), Args... args) {
if (!statement->needs_guard) {
return (this->*fn)(query_state, std::move(statement), options, std::nullopt, std::forward<Args>(args)...);
}
static auto exec = [fn] (query_processor& qp, Args... args, service::query_state& query_state, ::shared_ptr<cql_statement> statement, const query_options& options, std::optional<service::group0_guard> guard) {
return (qp.*fn)(query_state, std::move(statement), options, std::move(guard), std::forward<Args>(args)...);
};
return execute_with_guard(std::bind_front(exec, std::ref(*this), std::forward<Args>(args)...), std::move(statement), query_state, options);
}

future<::shared_ptr<result_message>>
query_processor::execute_direct_without_checking_exception_message(const sstring_view& query_string, service::query_state& query_state, query_options& options) {
log.trace("execute_direct: \"{}\"", query_string);
tracing::trace(query_state.get_trace_state(), "Parsing a statement");
auto p = get_statement(query_string, query_state.get_client_state());
auto statement = p->statement;
auto cql_statement = p->statement;
const auto warnings = std::move(p->warnings);
if (statement->get_bound_terms() != options.get_values_count()) {
if (cql_statement->get_bound_terms() != options.get_values_count()) {
const auto msg = format("Invalid amount of bind variables: expected {:d} received {:d}",
statement->get_bound_terms(),
cql_statement->get_bound_terms(),
options.get_values_count());
throw exceptions::invalid_request_exception(msg);
}
Expand All @@ -545,18 +505,8 @@ query_processor::execute_direct_without_checking_exception_message(const sstring
metrics.regularStatementsExecuted.inc();
#endif
tracing::trace(query_state.get_trace_state(), "Processing a statement");
return execute_maybe_with_guard(query_state, std::move(statement), options, &query_processor::do_execute_direct, std::move(warnings));
}

future<::shared_ptr<result_message>>
query_processor::do_execute_direct(
service::query_state& query_state,
shared_ptr<cql_statement> statement,
const query_options& options,
std::optional<service::group0_guard> guard,
cql3::cql_warnings_vec warnings) {
co_await statement->check_access(*this, query_state.get_client_state());
auto m = co_await process_authorized_statement(statement, query_state, options, std::move(guard));
co_await cql_statement->check_access(*this, query_state.get_client_state());
auto m = co_await process_authorized_statement(std::move(cql_statement), query_state, options);
for (const auto& w : warnings) {
m->add_warning(w);
}
Expand All @@ -565,24 +515,14 @@ query_processor::do_execute_direct(

future<::shared_ptr<result_message>>
query_processor::execute_prepared_without_checking_exception_message(
service::query_state& query_state,
shared_ptr<cql_statement> statement,
const query_options& options,
statements::prepared_statement::checked_weak_ptr prepared,
cql3::prepared_cache_key_type cache_key,
bool needs_authorization) {
return execute_maybe_with_guard(query_state, std::move(statement), options, &query_processor::do_execute_prepared, std::move(prepared), std::move(cache_key), needs_authorization);
}

future<::shared_ptr<result_message>>
query_processor::do_execute_prepared(
service::query_state& query_state,
shared_ptr<cql_statement> statement,
const query_options& options,
std::optional<service::group0_guard> guard,
statements::prepared_statement::checked_weak_ptr prepared,
cql3::prepared_cache_key_type cache_key,
bool needs_authorization) {

::shared_ptr<cql_statement> statement = prepared->statement;

if (needs_authorization) {
co_await statement->check_access(*this, query_state.get_client_state());
try {
Expand All @@ -591,18 +531,19 @@ query_processor::do_execute_prepared(
log.error("failed to cache the entry: {}", std::current_exception());
}
}
co_return co_await process_authorized_statement(std::move(statement), query_state, options, std::move(guard));

co_return co_await process_authorized_statement(std::move(statement), query_state, options);
}

future<::shared_ptr<result_message>>
query_processor::process_authorized_statement(const ::shared_ptr<cql_statement> statement, service::query_state& query_state, const query_options& options, std::optional<service::group0_guard> guard) {
query_processor::process_authorized_statement(const ::shared_ptr<cql_statement> statement, service::query_state& query_state, const query_options& options) {
auto& client_state = query_state.get_client_state();

++_stats.queries_by_cl[size_t(options.get_consistency())];

statement->validate(*this, client_state);

auto msg = co_await statement->execute_without_checking_exception_message(*this, query_state, options, std::move(guard));
auto msg = co_await statement->execute_without_checking_exception_message(*this, query_state, options);

if (msg) {
co_return std::move(msg);
Expand Down Expand Up @@ -808,7 +749,7 @@ query_processor::execute_paged_internal(internal_query_state& state) {
state.p->statement->validate(*this, service::client_state::for_internal_calls());
auto qs = query_state_for_internal_call();
::shared_ptr<cql_transport::messages::result_message> msg =
co_await state.p->statement->execute(*this, qs, *state.opts, std::nullopt);
co_await state.p->statement->execute(*this, qs, *state.opts);

class visitor : public result_message::visitor_base {
internal_query_state& _state;
Expand Down Expand Up @@ -882,22 +823,11 @@ query_processor::execute_with_params(
service::query_state& query_state,
const std::initializer_list<data_value>& values) {
auto opts = make_internal_options(p, values, cl);
auto statement = p->statement;

auto msg = co_await execute_maybe_with_guard(query_state, std::move(statement), opts, &query_processor::do_execute_with_params);
p->statement->validate(*this, service::client_state::for_internal_calls());
auto msg = co_await p->statement->execute(*this, query_state, opts);
co_return ::make_shared<untyped_result_set>(msg);
}

future<::shared_ptr<result_message>>
query_processor::do_execute_with_params(
service::query_state& query_state,
shared_ptr<cql_statement> statement,
const query_options& options, std::optional<service::group0_guard> guard) {
statement->validate(*this, service::client_state::for_internal_calls());
co_return co_await statement->execute(*this, query_state, options, std::move(guard));
}


future<::shared_ptr<cql_transport::messages::result_message>>
query_processor::execute_batch_without_checking_exception_message(
::shared_ptr<statements::batch_statement> batch,
Expand All @@ -922,7 +852,7 @@ query_processor::execute_batch_without_checking_exception_message(
}
log.trace("execute_batch({}): {}", batch->get_statements().size(), oss.str());
}
co_return co_await batch->execute(*this, query_state, options, std::nullopt);
co_return co_await batch->execute(*this, query_state, options);
}

future<service::broadcast_tables::query_result>
Expand All @@ -938,31 +868,44 @@ query_processor::forward(query::forward_request req, tracing::trace_state_ptr tr
}

future<::shared_ptr<messages::result_message>>
query_processor::execute_schema_statement(const statements::schema_altering_statement& stmt, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard) {
query_processor::execute_schema_statement(const statements::schema_altering_statement& stmt, service::query_state& state, const query_options& options) {
::shared_ptr<cql_transport::event::schema_change> ce;

if (this_shard_id() != 0) {
on_internal_error(log, "DDL must be executed on shard 0");
// execute all schema altering statements on a shard zero since this is where raft group 0 is
co_return ::make_shared<cql_transport::messages::result_message::bounce_to_shard>(0,
std::move(const_cast<cql3::query_options&>(options).take_cached_pk_function_calls()));
}

if (!guard) {
on_internal_error(log, "Guard must be present when executing DDL");
}
cql3::cql_warnings_vec warnings;

auto [remote_, holder] = remote();
auto& mm = remote_.get().mm;
auto retries = mm.get_concurrent_ddl_retries();
while (true) {
try {
auto group0_guard = co_await mm.start_group0_operation();

cql3::cql_warnings_vec warnings;
auto [ret, m, cql_warnings] = co_await stmt.prepare_schema_mutations(*this, group0_guard.write_timestamp());
warnings = std::move(cql_warnings);

auto [ret, m, cql_warnings] = co_await stmt.prepare_schema_mutations(*this, guard->write_timestamp());
warnings = std::move(cql_warnings);
if (!m.empty()) {
auto description = format("CQL DDL statement: \"{}\"", stmt.raw_cql_statement);
co_await mm.announce(std::move(m), std::move(group0_guard), description);
}

if (!m.empty()) {
auto description = format("CQL DDL statement: \"{}\"", stmt.raw_cql_statement);
co_await remote_.get().mm.announce(std::move(m), std::move(*guard), description);
ce = std::move(ret);
} catch (const service::group0_concurrent_modification&) {
log.warn("Failed to execute DDL statement \"{}\" due to concurrent group 0 modification.{}.",
stmt.raw_cql_statement, retries ? " Retrying" : " Number of retries exceeded, giving up");
if (retries--) {
continue;
}
throw;
}
break;
}

ce = std::move(ret);

// If an IF [NOT] EXISTS clause was used, this may not result in an actual schema change. To avoid doing
// extra work in the drivers to handle schema changes, we return an empty message in this case. (CASSANDRA-7600)
::shared_ptr<messages::result_message> result;
Expand Down
57 changes: 9 additions & 48 deletions cql3/query_processor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -205,13 +205,11 @@ public:
service::query_state& query_state,
const query_options& options,
bool needs_authorization) {
auto cql_statement = statement->statement;
return execute_prepared_without_checking_exception_message(
query_state,
std::move(cql_statement),
options,
std::move(statement),
std::move(cache_key),
query_state,
options,
needs_authorization)
.then(cql_transport::messages::propagate_exception_as_future<::shared_ptr<cql_transport::messages::result_message>>);
}
Expand All @@ -220,22 +218,11 @@ public:
// The result_message::exception must be explicitly handled.
future<::shared_ptr<cql_transport::messages::result_message>>
execute_prepared_without_checking_exception_message(
service::query_state& query_state,
shared_ptr<cql_statement> statement,
const query_options& options,
statements::prepared_statement::checked_weak_ptr prepared,
cql3::prepared_cache_key_type cache_key,
bool needs_authorization);

future<::shared_ptr<cql_transport::messages::result_message>>
do_execute_prepared(
service::query_state& query_state,
shared_ptr<cql_statement> statement,
const query_options& options,
std::optional<service::group0_guard> guard,
statements::prepared_statement::checked_weak_ptr prepared,
cql3::prepared_cache_key_type cache_key,
bool needs_authorization);
statements::prepared_statement::checked_weak_ptr statement,
cql3::prepared_cache_key_type cache_key,
service::query_state& query_state,
const query_options& options,
bool needs_authorization);

/// Execute a client statement that was not prepared.
inline
Expand All @@ -259,14 +246,6 @@ public:
service::query_state& query_state,
query_options& options);

future<::shared_ptr<cql_transport::messages::result_message>>
do_execute_direct(
service::query_state& query_state,
shared_ptr<cql_statement> statement,
const query_options& options,
std::optional<service::group0_guard> guard,
cql3::cql_warnings_vec warnings);

statements::prepared_statement::checked_weak_ptr prepare_internal(const sstring& query);

/*!
Expand Down Expand Up @@ -371,13 +350,6 @@ public:
service::query_state& query_state,
const std::initializer_list<data_value>& = { });

future<::shared_ptr<cql_transport::messages::result_message>> do_execute_with_params(
service::query_state& query_state,
shared_ptr<cql_statement> statement,
const query_options& options,
std::optional<service::group0_guard> guard);


future<::shared_ptr<cql_transport::messages::result_message::prepared>>
prepare(sstring query_string, service::query_state& query_state);

Expand Down Expand Up @@ -417,10 +389,8 @@ public:
future<query::forward_result>
forward(query::forward_request, tracing::trace_state_ptr);

struct retry_statement_execution_error : public std::exception {};

future<::shared_ptr<cql_transport::messages::result_message>>
execute_schema_statement(const statements::schema_altering_statement&, service::query_state& state, const query_options& options, std::optional<service::group0_guard> guard);
execute_schema_statement(const statements::schema_altering_statement&, service::query_state& state, const query_options& options);

future<std::string>
execute_thrift_schema_command(
Expand Down Expand Up @@ -450,7 +420,7 @@ private:
int32_t page_size = -1) const;

future<::shared_ptr<cql_transport::messages::result_message>>
process_authorized_statement(const ::shared_ptr<cql_statement> statement, service::query_state& query_state, const query_options& options, std::optional<service::group0_guard> guard);
process_authorized_statement(const ::shared_ptr<cql_statement> statement, service::query_state& query_state, const query_options& options);

/*!
* \brief created a state object for paging
Expand Down Expand Up @@ -485,15 +455,6 @@ private:
*/
bool has_more_results(cql3::internal_query_state& state) const;

template<typename... Args>
future<::shared_ptr<cql_transport::messages::result_message>>
execute_maybe_with_guard(service::query_state& query_state, ::shared_ptr<cql_statement> statement, const query_options& options,
future<::shared_ptr<cql_transport::messages::result_message>>(query_processor::*fn)(service::query_state&, ::shared_ptr<cql_statement>, const query_options&, std::optional<service::group0_guard>, Args...), Args... args);

future<::shared_ptr<cql_transport::messages::result_message>> execute_with_guard(
std::function<future<::shared_ptr<cql_transport::messages::result_message>>(service::query_state&, ::shared_ptr<cql_statement>, const query_options&, std::optional<service::group0_guard>)> fn,
::shared_ptr<cql_statement> statement, service::query_state& query_state, const query_options& options);

///
/// \tparam ResultMsgType type of the returned result message (CQL or Thrift)
/// \tparam PreparedKeyGenerator a function that generates the prepared statement cache key for given query and
Expand Down

0 comments on commit d57a951

Please sign in to comment.