Skip to content

Commit

Permalink
Revert "cql3: Extend the scope of group0_guard during DDL statement e…
Browse files Browse the repository at this point in the history
…xecution"

This reverts commit c42a91e.

A significant performance regression was observed due to this change.

From Avi:
> perf-simple-query --smp 1
>
> before:
>
> 216489.88 tps ( 61.1 allocs/op, 13.1 tasks/op, 43558 insns/op, 0 errors)
> 217708.69 tps ( 61.1 allocs/op, 13.1 tasks/op, 43542 insns/op, 0 errors)
> 219495.02 tps ( 61.1 allocs/op, 13.1 tasks/op, 43538 insns/op, 0 errors)
> 216863.84 tps ( 61.1 allocs/op, 13.1 tasks/op, 43567 insns/op, 0 errors)
> 218936.48 tps ( 61.1 allocs/op, 13.1 tasks/op, 43546 insns/op, 0 errors)
>
> after:
>
> 201773.52 tps ( 63.1 allocs/op, 15.1 tasks/op, 44600 insns/op, 0 errors)
> 210875.48 tps ( 63.1 allocs/op, 15.1 tasks/op, 44558 insns/op, 0 errors)
> 210186.55 tps ( 63.1 allocs/op, 15.1 tasks/op, 44588 insns/op, 0 errors)
> 211021.76 tps ( 63.1 allocs/op, 15.1 tasks/op, 44569 insns/op, 0 errors)
> 208597.52 tps ( 63.1 allocs/op, 15.1 tasks/op, 44587 insns/op, 0 errors)
>
> Two extra allocations, two extra tasks, 1k extra instructions, for
> something that is DDL only.

Fixes #14590
  • Loading branch information
kbr-scylla committed Jul 10, 2023
1 parent ec29272 commit 3d58e8e
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 118 deletions.
10 changes: 0 additions & 10 deletions cql3/cql_statement.hh
Expand Up @@ -42,12 +42,6 @@ class query_options;
// A vector of CQL warnings generated during execution of a statement.
using cql_warnings_vec = std::vector<sstring>;

struct statement_guard {
size_t retry_count;
statement_guard(size_t rc) : retry_count(rc) {}
virtual ~statement_guard() {}
};

class cql_statement {
timeout_config_selector _timeout_config_selector;
public:
Expand Down Expand Up @@ -104,10 +98,6 @@ public:
return execute(qp, state, options);
}

virtual future<std::unique_ptr<statement_guard>> take_guard(query_processor& qp) const {
return make_ready_future<std::unique_ptr<statement_guard>>(nullptr);
}

virtual bool depends_on(std::string_view ks_name, std::optional<std::string_view> cf_name) const = 0;

virtual seastar::shared_ptr<const metadata> get_result_metadata() const = 0;
Expand Down
123 changes: 40 additions & 83 deletions cql3/query_processor.cc
Expand Up @@ -29,7 +29,6 @@
#include "data_dictionary/data_dictionary.hh"
#include "utils/hashers.hh"
#include "utils/error_injection.hh"
#include "service/migration_manager.hh"

namespace cql3 {

Expand Down Expand Up @@ -489,33 +488,6 @@ future<> query_processor::stop() {
});
}

template<std::invocable<service::query_state&> F>
future<::shared_ptr<result_message>>
query_processor::execute_with_retry(::shared_ptr<cql_statement> statement, service::query_state& query_state, const F& fn) {
size_t retries = 0;
while (true) {
size_t max_retries;
try {
query_state.statement_guard = co_await statement->take_guard(*this);
max_retries = query_state.statement_guard ? query_state.statement_guard->retry_count : 0;
auto res = co_await fn(query_state);
// Reset guard in case it was not consumed
query_state.statement_guard.reset();
co_return std::move(res);
} catch (const retry_statement_execution_error& ex) {
bool retry = ++retries <= max_retries;
log.warn("Failed to execute statement \"{}\" due to guard conflict.{}.",
statement->raw_cql_statement, retry ? " Retrying" : " Number of retries exceeded, giving up");
if (retry) {
continue;
}
std::rethrow_if_nested(ex);
throw;
}
}

}

future<::shared_ptr<result_message>>
query_processor::execute_direct_without_checking_exception_message(const sstring_view& query_string, service::query_state& query_state, query_options& options) {
log.trace("execute_direct: \"{}\"", query_string);
Expand All @@ -537,15 +509,12 @@ query_processor::execute_direct_without_checking_exception_message(const sstring
metrics.regularStatementsExecuted.inc();
#endif
tracing::trace(query_state.get_trace_state(), "Processing a statement");
co_return co_await execute_with_retry(cql_statement, query_state,
coroutine::lambda([this, cql_statement, &options, &warnings = std::move(warnings)] (service::query_state& query_state) -> future<::shared_ptr<result_message>> {
co_await cql_statement->check_access(*this, query_state.get_client_state());
auto m = co_await process_authorized_statement(cql_statement, query_state, options);
for (const auto& w : warnings) {
m->add_warning(w);
}
co_return std::move(m);
}));
co_await cql_statement->check_access(*this, query_state.get_client_state());
auto m = co_await process_authorized_statement(std::move(cql_statement), query_state, options);
for (const auto& w : warnings) {
m->add_warning(w);
}
co_return std::move(m);
}

future<::shared_ptr<result_message>>
Expand All @@ -557,18 +526,17 @@ query_processor::execute_prepared_without_checking_exception_message(
bool needs_authorization) {

::shared_ptr<cql_statement> statement = prepared->statement;
co_return co_await execute_with_retry(statement, query_state,
coroutine::lambda([this, needs_authorization, statement, cache_key = std::move(cache_key), prepared = std::move(prepared), &options] (service::query_state& query_state) -> future<::shared_ptr<result_message>> {
if (needs_authorization) {
co_await statement->check_access(*this, query_state.get_client_state());
try {
co_await _authorized_prepared_cache.insert(*query_state.get_client_state().user(), cache_key, prepared);
} catch (...) {
log.error("failed to cache the entry: {}", std::current_exception());
}
}
co_return co_await process_authorized_statement(statement, query_state, options);
}));

if (needs_authorization) {
co_await statement->check_access(*this, query_state.get_client_state());
try {
co_await _authorized_prepared_cache.insert(*query_state.get_client_state().user(), std::move(cache_key), std::move(prepared));
} catch (...) {
log.error("failed to cache the entry: {}", std::current_exception());
}
}

co_return co_await process_authorized_statement(std::move(statement), query_state, options);
}

future<::shared_ptr<result_message>>
Expand Down Expand Up @@ -859,12 +827,8 @@ query_processor::execute_with_params(
service::query_state& query_state,
const std::initializer_list<data_value>& values) {
auto opts = make_internal_options(p, values, cl);
auto statement = p->statement;
auto msg = co_await execute_with_retry(statement, query_state,
coroutine::lambda([this, statement, &opts] (service::query_state& query_state) -> future<::shared_ptr<result_message>> {
statement->validate(*this, service::client_state::for_internal_calls());
co_return co_await statement->execute(*this, query_state, opts);
}));
p->statement->validate(*this, service::client_state::for_internal_calls());
auto msg = co_await p->statement->execute(*this, query_state, opts);
co_return ::make_shared<untyped_result_set>(msg);
}

Expand All @@ -874,9 +838,6 @@ query_processor::execute_batch_without_checking_exception_message(
service::query_state& query_state,
query_options& options,
std::unordered_map<prepared_cache_key_type, authorized_prepared_statements_cache::value_type> pending_authorization_entries) {
// We do not call batch->take_guard() here and do not have a retry loop since
// currently it is only needed for schema statements and schema statement cannot
// be part of a batch
co_await batch->check_access(*this, query_state.get_client_state());
co_await coroutine::parallel_for_each(pending_authorization_entries, [this, &query_state] (auto& e) -> future<> {
try {
Expand Down Expand Up @@ -910,16 +871,6 @@ query_processor::forward(query::forward_request req, tracing::trace_state_ptr tr
co_return co_await remote_.get().forwarder.dispatch(std::move(req), std::move(tr_state));
}

future<std::unique_ptr<statement_guard>> query_processor::take_alter_schema_guard() {
auto [remote_, holder] = remote();
if (this_shard_id() == 0) {
co_return std::make_unique<schema_altering_statement::guard>(co_await remote_.get().mm.start_group0_operation(), remote_.get().mm, std::move(holder));
} else {
// The command will be bounced to shard zero
co_return nullptr;
}
}

future<::shared_ptr<messages::result_message>>
query_processor::execute_schema_statement(const statements::schema_altering_statement& stmt, service::query_state& state, const query_options& options) {
::shared_ptr<cql_transport::event::schema_change> ce;
Expand All @@ -932,27 +883,33 @@ query_processor::execute_schema_statement(const statements::schema_altering_stat

cql3::cql_warnings_vec warnings;

if (!state.statement_guard) {
on_internal_error(log, "Guard must be present when executing DDL");
}
std::unique_ptr<statement_guard> guard_ptr = std::exchange(state.statement_guard, nullptr);
auto& guard = dynamic_cast<schema_altering_statement::guard&>(*guard_ptr);
auto group0_guard = std::move(guard.group0_guard);
auto [remote_, holder] = remote();
auto& mm = remote_.get().mm;
auto retries = mm.get_concurrent_ddl_retries();
while (true) {
try {
auto group0_guard = co_await mm.start_group0_operation();

auto [ret, m, cql_warnings] = co_await stmt.prepare_schema_mutations(*this, guard.mm, group0_guard.write_timestamp());
warnings = std::move(cql_warnings);
auto [ret, m, cql_warnings] = co_await stmt.prepare_schema_mutations(*this, mm, group0_guard.write_timestamp());
warnings = std::move(cql_warnings);

if (!m.empty()) {
auto description = format("CQL DDL statement: \"{}\"", stmt.raw_cql_statement);
try {
co_await guard.mm.announce(std::move(m), std::move(group0_guard), description);
if (!m.empty()) {
auto description = format("CQL DDL statement: \"{}\"", stmt.raw_cql_statement);
co_await mm.announce(std::move(m), std::move(group0_guard), description);
}

ce = std::move(ret);
} catch (const service::group0_concurrent_modification&) {
std::throw_with_nested(retry_statement_execution_error{});
log.warn("Failed to execute DDL statement \"{}\" due to concurrent group 0 modification.{}.",
stmt.raw_cql_statement, retries ? " Retrying" : " Number of retries exceeded, giving up");
if (retries--) {
continue;
}
throw;
}
break;
}

ce = std::move(ret);

// If an IF [NOT] EXISTS clause was used, this may not result in an actual schema change. To avoid doing
// extra work in the drivers to handle schema changes, we return an empty message in this case. (CASSANDRA-7600)
::shared_ptr<messages::result_message> result;
Expand Down
6 changes: 0 additions & 6 deletions cql3/query_processor.hh
Expand Up @@ -402,9 +402,6 @@ public:
future<query::forward_result>
forward(query::forward_request, tracing::trace_state_ptr);

struct retry_statement_execution_error : public std::exception {};

future<std::unique_ptr<statement_guard>> take_alter_schema_guard();
future<::shared_ptr<cql_transport::messages::result_message>>
execute_schema_statement(const statements::schema_altering_statement&, service::query_state& state, const query_options& options);

Expand Down Expand Up @@ -472,9 +469,6 @@ private:
*/
bool has_more_results(cql3::internal_query_state& state) const;

template<std::invocable<service::query_state&> F>
future<::shared_ptr<cql_transport::messages::result_message>> execute_with_retry(::shared_ptr<cql_statement> statement, service::query_state& query_state, const F& fn);

///
/// \tparam ResultMsgType type of the returned result message (CQL or Thrift)
/// \tparam PreparedKeyGenerator a function that generates the prepared statement cache key for given query and
Expand Down
6 changes: 0 additions & 6 deletions cql3/statements/schema_altering_statement.cc
Expand Up @@ -57,10 +57,6 @@ void schema_altering_statement::prepare_keyspace(const service::client_state& st
}
}

future<std::unique_ptr<statement_guard>> schema_altering_statement::take_guard(query_processor& qp) const {
return qp.take_alter_schema_guard();
}

future<::shared_ptr<messages::result_message>>
schema_altering_statement::execute(query_processor& qp, service::query_state& state, const query_options& options) const {
bool internal = state.get_client_state().is_internal();
Expand All @@ -87,8 +83,6 @@ schema_altering_statement::execute(query_processor& qp, service::query_state& st
});
}

schema_altering_statement::guard::guard(service::group0_guard&& g,service::migration_manager& mm_, gate::holder&& h)
: statement_guard(mm_.get_concurrent_ddl_retries()), group0_guard(std::move(g)), mm(mm_), mm_holder(std::move(h)) {}
}

}
10 changes: 0 additions & 10 deletions cql3/statements/schema_altering_statement.hh
Expand Up @@ -18,8 +18,6 @@

#include <seastar/core/shared_ptr.hh>

#include "service/raft/raft_group0_client.hh"

class mutation;

namespace service {
Expand Down Expand Up @@ -64,15 +62,7 @@ protected:
execute(query_processor& qp, service::query_state& state, const query_options& options) const override;

public:
struct guard : public statement_guard {
service::group0_guard group0_guard;
service::migration_manager& mm;
gate::holder mm_holder;
guard(service::group0_guard&& g,service::migration_manager& mm_, gate::holder&& h);
};

virtual future<std::tuple<::shared_ptr<cql_transport::event::schema_change>, std::vector<mutation>, cql3::cql_warnings_vec>> prepare_schema_mutations(query_processor& qp, service::migration_manager& mm, api::timestamp_type) const = 0;
virtual future<std::unique_ptr<statement_guard>> take_guard(query_processor& qp) const override;
};

}
Expand Down
3 changes: 0 additions & 3 deletions service/query_state.hh
Expand Up @@ -13,7 +13,6 @@
#include "service/client_state.hh"
#include "tracing/tracing.hh"
#include "service_permit.hh"
#include "cql3/cql_statement.hh"

namespace qos {
class service_level_controller;
Expand All @@ -27,8 +26,6 @@ private:
service_permit _permit;

public:
std::unique_ptr<cql3::statement_guard> statement_guard;

query_state(client_state& client_state, service_permit permit)
: _client_state(client_state)
, _trace_state_ptr(tracing::trace_state_ptr())
Expand Down

0 comments on commit 3d58e8e

Please sign in to comment.