diff --git a/alternator/auth.cc b/alternator/auth.cc index 98481a4dcc8b..75f6e05afb1a 100644 --- a/alternator/auth.cc +++ b/alternator/auth.cc @@ -129,8 +129,7 @@ future get_key_from_roles(cql3::query_processor& qp, std::string us auth::meta::roles_table::qualified_name, auth::meta::roles_table::role_col_name); auto cl = auth::password_authenticator::consistency_for_user(username); - auto& timeout = auth::internal_distributed_timeout_config(); - return qp.execute_internal(query, cl, timeout, {sstring(username)}, true).then_wrapped([username = std::move(username)] (future<::shared_ptr> f) { + return qp.execute_internal(query, cl, auth::internal_distributed_query_state(), {sstring(username)}, true).then_wrapped([username = std::move(username)] (future<::shared_ptr> f) { auto res = f.get0(); auto salted_hash = std::optional(); if (res->empty()) { diff --git a/alternator/executor.cc b/alternator/executor.cc index 41d633ca016b..fe6f0bf596c8 100644 --- a/alternator/executor.cc +++ b/alternator/executor.cc @@ -3214,7 +3214,7 @@ static future do_query(service::storage_proxy& pr auto query_state_ptr = std::make_unique(client_state, trace_state, std::move(permit)); command->slice.options.set(); - auto query_options = std::make_unique(cl, infinite_timeout_config, std::vector{}); + auto query_options = std::make_unique(cl, std::vector{}); query_options = std::make_unique(std::move(query_options), std::move(paging_state)); auto p = service::pager::query_pagers::pager(schema, selection, *query_state_ptr, *query_options, command, std::move(partition_ranges), nullptr); diff --git a/auth/common.cc b/auth/common.cc index b8e5f4c2e09a..25fdb9059ad5 100644 --- a/auth/common.cc +++ b/auth/common.cc @@ -108,7 +108,7 @@ future<> wait_for_schema_agreement(::service::migration_manager& mm, const datab }); } -const timeout_config& internal_distributed_timeout_config() noexcept { +::service::query_state& internal_distributed_query_state() noexcept { #ifdef DEBUG // Give the much slower debug tests more headroom for completing auth queries. static const auto t = 30s; @@ -116,7 +116,9 @@ const timeout_config& internal_distributed_timeout_config() noexcept { static const auto t = 5s; #endif static const timeout_config tc{t, t, t, t, t, t, t}; - return tc; + static thread_local ::service::client_state cs(::service::client_state::internal_tag{}, tc); + static thread_local ::service::query_state qs(cs, empty_service_permit()); + return qs; } } diff --git a/auth/common.hh b/auth/common.hh index 2ad7ff10a3cd..6839fc28e7fa 100644 --- a/auth/common.hh +++ b/auth/common.hh @@ -35,6 +35,7 @@ #include "log.hh" #include "seastarx.hh" #include "utils/exponential_backoff_retry.hh" +#include "service/query_state.hh" using namespace std::chrono_literals; @@ -87,6 +88,6 @@ future<> wait_for_schema_agreement(::service::migration_manager&, const database /// /// Time-outs for internal, non-local CQL queries. /// -const timeout_config& internal_distributed_timeout_config() noexcept; +::service::query_state& internal_distributed_query_state() noexcept; } diff --git a/auth/default_authorizer.cc b/auth/default_authorizer.cc index 5de2ad6d0e53..08cd1b8db092 100644 --- a/auth/default_authorizer.cc +++ b/auth/default_authorizer.cc @@ -103,7 +103,6 @@ future default_authorizer::any_granted() const { return _qp.execute_internal( query, db::consistency_level::LOCAL_ONE, - infinite_timeout_config, {}, true).then([this](::shared_ptr results) { return !results->empty(); @@ -116,8 +115,7 @@ future<> default_authorizer::migrate_legacy_metadata() const { return _qp.execute_internal( query, - db::consistency_level::LOCAL_ONE, - infinite_timeout_config).then([this](::shared_ptr results) { + db::consistency_level::LOCAL_ONE).then([this](::shared_ptr results) { return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) { return do_with( row.get_as("username"), @@ -197,7 +195,6 @@ default_authorizer::authorize(const role_or_anonymous& maybe_role, const resourc return _qp.execute_internal( query, db::consistency_level::LOCAL_ONE, - infinite_timeout_config, {*maybe_role.name, r.name()}).then([](::shared_ptr results) { if (results->empty()) { return permissions::NONE; @@ -226,7 +223,7 @@ default_authorizer::modify( return _qp.execute_internal( query, db::consistency_level::ONE, - internal_distributed_timeout_config(), + internal_distributed_query_state(), {permissions::to_strings(set), sstring(role_name), resource.name()}).discard_result(); }); } @@ -251,7 +248,7 @@ future> default_authorizer::list_all() const { return _qp.execute_internal( query, db::consistency_level::ONE, - internal_distributed_timeout_config(), + internal_distributed_query_state(), {}, true).then([](::shared_ptr results) { std::vector all_details; @@ -278,7 +275,7 @@ future<> default_authorizer::revoke_all(std::string_view role_name) const { return _qp.execute_internal( query, db::consistency_level::ONE, - internal_distributed_timeout_config(), + internal_distributed_query_state(), {sstring(role_name)}).discard_result().handle_exception([role_name](auto ep) { try { std::rethrow_exception(ep); @@ -298,7 +295,6 @@ future<> default_authorizer::revoke_all(const resource& resource) const { return _qp.execute_internal( query, db::consistency_level::LOCAL_ONE, - infinite_timeout_config, {resource.name()}).then_wrapped([this, resource](future<::shared_ptr> f) { try { auto res = f.get0(); @@ -315,7 +311,6 @@ future<> default_authorizer::revoke_all(const resource& resource) const { return _qp.execute_internal( query, db::consistency_level::LOCAL_ONE, - infinite_timeout_config, {r.get_as(ROLE_NAME), resource.name()}).discard_result().handle_exception( [resource](auto ep) { try { diff --git a/auth/password_authenticator.cc b/auth/password_authenticator.cc index b34912f791bd..2def2c054516 100644 --- a/auth/password_authenticator.cc +++ b/auth/password_authenticator.cc @@ -114,7 +114,7 @@ future<> password_authenticator::migrate_legacy_metadata() const { return _qp.execute_internal( query, db::consistency_level::QUORUM, - internal_distributed_timeout_config()).then([this](::shared_ptr results) { + internal_distributed_query_state()).then([this](::shared_ptr results) { return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) { auto username = row.get_as("username"); auto salted_hash = row.get_as(SALTED_HASH); @@ -122,7 +122,7 @@ future<> password_authenticator::migrate_legacy_metadata() const { return _qp.execute_internal( update_row_query(), consistency_for_user(username), - internal_distributed_timeout_config(), + internal_distributed_query_state(), {std::move(salted_hash), username}).discard_result(); }).finally([results] {}); }).then([] { @@ -139,7 +139,7 @@ future<> password_authenticator::create_default_if_missing() const { return _qp.execute_internal( update_row_query(), db::consistency_level::QUORUM, - internal_distributed_timeout_config(), + internal_distributed_query_state(), {passwords::hash(DEFAULT_USER_PASSWORD, rng_for_salt), DEFAULT_USER_NAME}).then([](auto&&) { plogger.info("Created default superuser authentication record."); }); @@ -236,7 +236,7 @@ future password_authenticator::authenticate( return _qp.execute_internal( query, consistency_for_user(username), - internal_distributed_timeout_config(), + internal_distributed_query_state(), {username}, true); }).then_wrapped([=](future<::shared_ptr> f) { @@ -270,7 +270,7 @@ future<> password_authenticator::create(std::string_view role_name, const authen return _qp.execute_internal( update_row_query(), consistency_for_user(role_name), - internal_distributed_timeout_config(), + internal_distributed_query_state(), {passwords::hash(*options.password, rng_for_salt), sstring(role_name)}).discard_result(); } @@ -287,7 +287,7 @@ future<> password_authenticator::alter(std::string_view role_name, const authent return _qp.execute_internal( query, consistency_for_user(role_name), - internal_distributed_timeout_config(), + internal_distributed_query_state(), {passwords::hash(*options.password, rng_for_salt), sstring(role_name)}).discard_result(); } @@ -299,7 +299,7 @@ future<> password_authenticator::drop(std::string_view name) const { return _qp.execute_internal( query, consistency_for_user(name), - internal_distributed_timeout_config(), + internal_distributed_query_state(), {sstring(name)}).discard_result(); } diff --git a/auth/roles-metadata.cc b/auth/roles-metadata.cc index 8869795f6cb5..a5e21a3884c4 100644 --- a/auth/roles-metadata.cc +++ b/auth/roles-metadata.cc @@ -68,14 +68,13 @@ future default_role_row_satisfies( return qp.execute_internal( query, db::consistency_level::ONE, - infinite_timeout_config, {meta::DEFAULT_SUPERUSER_NAME}, true).then([&qp, &p](::shared_ptr results) { if (results->empty()) { return qp.execute_internal( query, db::consistency_level::QUORUM, - internal_distributed_timeout_config(), + internal_distributed_query_state(), {meta::DEFAULT_SUPERUSER_NAME}, true).then([&p](::shared_ptr results) { if (results->empty()) { @@ -100,7 +99,7 @@ future any_nondefault_role_row_satisfies( return qp.execute_internal( query, db::consistency_level::QUORUM, - internal_distributed_timeout_config()).then([&p](::shared_ptr results) { + internal_distributed_query_state()).then([&p](::shared_ptr results) { if (results->empty()) { return false; } diff --git a/auth/service.cc b/auth/service.cc index 47c556854a2b..327a1e523673 100644 --- a/auth/service.cc +++ b/auth/service.cc @@ -210,7 +210,6 @@ future service::has_existing_legacy_users() const { return _qp.execute_internal( default_user_query, db::consistency_level::ONE, - infinite_timeout_config, {meta::DEFAULT_SUPERUSER_NAME}, true).then([this](auto results) { if (!results->empty()) { @@ -220,7 +219,6 @@ future service::has_existing_legacy_users() const { return _qp.execute_internal( default_user_query, db::consistency_level::QUORUM, - infinite_timeout_config, {meta::DEFAULT_SUPERUSER_NAME}, true).then([this](auto results) { if (!results->empty()) { @@ -229,8 +227,7 @@ future service::has_existing_legacy_users() const { return _qp.execute_internal( all_users_query, - db::consistency_level::QUORUM, - infinite_timeout_config).then([](auto results) { + db::consistency_level::QUORUM).then([](auto results) { return make_ready_future(!results->empty()); }); }); diff --git a/auth/standard_role_manager.cc b/auth/standard_role_manager.cc index fbb95c987327..57d253e844ac 100644 --- a/auth/standard_role_manager.cc +++ b/auth/standard_role_manager.cc @@ -86,7 +86,7 @@ static future> find_record(cql3::query_processor& qp, std: return qp.execute_internal( query, consistency_for_role(role_name), - internal_distributed_timeout_config(), + internal_distributed_query_state(), {sstring(role_name)}, true).then([](::shared_ptr results) { if (results->empty()) { @@ -165,7 +165,7 @@ future<> standard_role_manager::create_default_role_if_missing() const { return _qp.execute_internal( query, db::consistency_level::QUORUM, - internal_distributed_timeout_config(), + internal_distributed_query_state(), {meta::DEFAULT_SUPERUSER_NAME}).then([](auto&&) { log.info("Created default superuser role '{}'.", meta::DEFAULT_SUPERUSER_NAME); return make_ready_future<>(); @@ -192,7 +192,7 @@ future<> standard_role_manager::migrate_legacy_metadata() const { return _qp.execute_internal( query, db::consistency_level::QUORUM, - internal_distributed_timeout_config()).then([this](::shared_ptr results) { + internal_distributed_query_state()).then([this](::shared_ptr results) { return do_for_each(*results, [this](const cql3::untyped_result_set_row& row) { role_config config; config.is_superuser = row.get_or("super", false); @@ -253,7 +253,7 @@ future<> standard_role_manager::create_or_replace(std::string_view role_name, co return _qp.execute_internal( query, consistency_for_role(role_name), - internal_distributed_timeout_config(), + internal_distributed_query_state(), {sstring(role_name), c.is_superuser, c.can_login}, true).discard_result(); } @@ -296,7 +296,7 @@ standard_role_manager::alter(std::string_view role_name, const role_config_updat build_column_assignments(u), meta::roles_table::role_col_name), consistency_for_role(role_name), - internal_distributed_timeout_config(), + internal_distributed_query_state(), {sstring(role_name)}).discard_result(); }); } @@ -315,7 +315,7 @@ future<> standard_role_manager::drop(std::string_view role_name) const { return _qp.execute_internal( query, consistency_for_role(role_name), - internal_distributed_timeout_config(), + internal_distributed_query_state(), {sstring(role_name)}).then([this, role_name](::shared_ptr members) { return parallel_for_each( members->begin(), @@ -354,7 +354,7 @@ future<> standard_role_manager::drop(std::string_view role_name) const { return _qp.execute_internal( query, consistency_for_role(role_name), - internal_distributed_timeout_config(), + internal_distributed_query_state(), {sstring(role_name)}).discard_result(); }; @@ -381,7 +381,7 @@ standard_role_manager::modify_membership( return _qp.execute_internal( query, consistency_for_role(grantee_name), - internal_distributed_timeout_config(), + internal_distributed_query_state(), {role_set{sstring(role_name)}, sstring(grantee_name)}).discard_result(); }; @@ -392,7 +392,7 @@ standard_role_manager::modify_membership( format("INSERT INTO {} (role, member) VALUES (?, ?)", meta::role_members_table::qualified_name), consistency_for_role(role_name), - internal_distributed_timeout_config(), + internal_distributed_query_state(), {sstring(role_name), sstring(grantee_name)}).discard_result(); case membership_change::remove: @@ -400,7 +400,7 @@ standard_role_manager::modify_membership( format("DELETE FROM {} WHERE role = ? AND member = ?", meta::role_members_table::qualified_name), consistency_for_role(role_name), - internal_distributed_timeout_config(), + internal_distributed_query_state(), {sstring(role_name), sstring(grantee_name)}).discard_result(); } @@ -503,7 +503,7 @@ future standard_role_manager::query_all() const { return _qp.execute_internal( query, db::consistency_level::QUORUM, - internal_distributed_timeout_config()).then([](::shared_ptr results) { + internal_distributed_query_state()).then([](::shared_ptr results) { role_set roles; std::transform( diff --git a/cql3/query_options.cc b/cql3/query_options.cc index d76dbc1821a3..9fc3391f96be 100644 --- a/cql3/query_options.cc +++ b/cql3/query_options.cc @@ -52,12 +52,11 @@ thread_local const query_options::specific_options query_options::specific_optio -1, {}, db::consistency_level::SERIAL, api::missing_timestamp}; thread_local query_options query_options::DEFAULT{default_cql_config, - db::consistency_level::ONE, infinite_timeout_config, std::nullopt, + db::consistency_level::ONE, std::nullopt, std::vector(), false, query_options::specific_options::DEFAULT, cql_serialization_format::latest()}; query_options::query_options(const cql_config& cfg, db::consistency_level consistency, - const ::timeout_config& timeout_config, std::optional> names, std::vector values, std::vector value_views, @@ -66,7 +65,6 @@ query_options::query_options(const cql_config& cfg, cql_serialization_format sf) : _cql_config(cfg) , _consistency(consistency) - , _timeout_config(timeout_config) , _names(std::move(names)) , _values(std::move(values)) , _value_views(value_views) @@ -78,7 +76,6 @@ query_options::query_options(const cql_config& cfg, query_options::query_options(const cql_config& cfg, db::consistency_level consistency, - const ::timeout_config& timeout_config, std::optional> names, std::vector values, bool skip_metadata, @@ -86,7 +83,6 @@ query_options::query_options(const cql_config& cfg, cql_serialization_format sf) : _cql_config(cfg) , _consistency(consistency) - , _timeout_config(timeout_config) , _names(std::move(names)) , _values(std::move(values)) , _value_views() @@ -99,7 +95,6 @@ query_options::query_options(const cql_config& cfg, query_options::query_options(const cql_config& cfg, db::consistency_level consistency, - const ::timeout_config& timeout_config, std::optional> names, std::vector value_views, bool skip_metadata, @@ -107,7 +102,6 @@ query_options::query_options(const cql_config& cfg, cql_serialization_format sf) : _cql_config(cfg) , _consistency(consistency) - , _timeout_config(timeout_config) , _names(std::move(names)) , _values() , _value_views(std::move(value_views)) @@ -117,12 +111,11 @@ query_options::query_options(const cql_config& cfg, { } -query_options::query_options(db::consistency_level cl, const ::timeout_config& timeout_config, std::vector values, +query_options::query_options(db::consistency_level cl, std::vector values, specific_options options) : query_options( default_cql_config, cl, - timeout_config, {}, std::move(values), false, @@ -135,7 +128,6 @@ query_options::query_options(db::consistency_level cl, const ::timeout_config& t query_options::query_options(std::unique_ptr qo, lw_shared_ptr paging_state) : query_options(qo->_cql_config, qo->_consistency, - qo->get_timeout_config(), std::move(qo->_names), std::move(qo->_values), std::move(qo->_value_views), @@ -148,7 +140,6 @@ query_options::query_options(std::unique_ptr qo, lw_shared_ptr qo, lw_shared_ptr paging_state, int32_t page_size) : query_options(qo->_cql_config, qo->_consistency, - qo->get_timeout_config(), std::move(qo->_names), std::move(qo->_values), std::move(qo->_value_views), @@ -160,7 +151,7 @@ query_options::query_options(std::unique_ptr qo, lw_shared_ptr values) : query_options( - db::consistency_level::ONE, infinite_timeout_config, std::move(values)) + db::consistency_level::ONE, std::move(values)) {} void query_options::prepare(const std::vector>& specs) diff --git a/cql3/query_options.hh b/cql3/query_options.hh index eb5fa54634ab..221b14526422 100644 --- a/cql3/query_options.hh +++ b/cql3/query_options.hh @@ -51,7 +51,6 @@ #include "cql3/column_identifier.hh" #include "cql3/values.hh" #include "cql_serialization_format.hh" -#include "timeout_config.hh" namespace cql3 { @@ -75,7 +74,6 @@ public: private: const cql_config& _cql_config; const db::consistency_level _consistency; - const timeout_config& _timeout_config; const std::optional> _names; std::vector _values; std::vector _value_views; @@ -118,7 +116,6 @@ public: explicit query_options(const cql_config& cfg, db::consistency_level consistency, - const timeout_config& timeouts, std::optional> names, std::vector values, bool skip_metadata, @@ -126,7 +123,6 @@ public: cql_serialization_format sf); explicit query_options(const cql_config& cfg, db::consistency_level consistency, - const timeout_config& timeouts, std::optional> names, std::vector values, std::vector value_views, @@ -135,7 +131,6 @@ public: cql_serialization_format sf); explicit query_options(const cql_config& cfg, db::consistency_level consistency, - const timeout_config& timeouts, std::optional> names, std::vector value_views, bool skip_metadata, @@ -167,13 +162,10 @@ public: // forInternalUse explicit query_options(std::vector values); - explicit query_options(db::consistency_level, const timeout_config& timeouts, - std::vector values, specific_options options = specific_options::DEFAULT); + explicit query_options(db::consistency_level, std::vector values, specific_options options = specific_options::DEFAULT); explicit query_options(std::unique_ptr, lw_shared_ptr paging_state); explicit query_options(std::unique_ptr, lw_shared_ptr paging_state, int32_t page_size); - const timeout_config& get_timeout_config() const { return _timeout_config; } - db::consistency_level get_consistency() const { return _consistency; } @@ -300,7 +292,7 @@ query_options::query_options(query_options&& o, std::vector tmp; tmp.reserve(values_ranges.size()); std::transform(values_ranges.begin(), values_ranges.end(), std::back_inserter(tmp), [this](auto& values_range) { - return query_options(_cql_config, _consistency, _timeout_config, {}, std::move(values_range), _skip_metadata, _options, _cql_serialization_format); + return query_options(_cql_config, _consistency, {}, std::move(values_range), _skip_metadata, _options, _cql_serialization_format); }); _batch_options = std::move(tmp); } diff --git a/cql3/query_processor.cc b/cql3/query_processor.cc index 8f3aea5885ec..0ed159310da2 100644 --- a/cql3/query_processor.cc +++ b/cql3/query_processor.cc @@ -619,7 +619,6 @@ query_options query_processor::make_internal_options( const statements::prepared_statement::checked_weak_ptr& p, const std::initializer_list& values, db::consistency_level cl, - const timeout_config& timeout_config, int32_t page_size) const { if (p->bound_names.size() != values.size()) { throw std::invalid_argument( @@ -643,11 +642,10 @@ query_options query_processor::make_internal_options( api::timestamp_type ts = api::missing_timestamp; return query_options( cl, - timeout_config, bound_values, cql3::query_options::specific_options{page_size, std::move(paging_state), serial_consistency, ts}); } - return query_options(cl, timeout_config, bound_values); + return query_options(cl, bound_values); } statements::prepared_statement::checked_weak_ptr query_processor::prepare_internal(const sstring& query_string) { @@ -671,11 +669,10 @@ struct internal_query_state { ::shared_ptr query_processor::create_paged_state( const sstring& query_string, db::consistency_level cl, - const timeout_config& timeout_config, const std::initializer_list& values, int32_t page_size) { auto p = prepare_internal(query_string); - auto opts = make_internal_options(p, values, cl, timeout_config, page_size); + auto opts = make_internal_options(p, values, cl, page_size); ::shared_ptr res = ::make_shared( internal_query_state{ query_string, @@ -793,7 +790,16 @@ future<::shared_ptr> query_processor::execute_internal( const sstring& query_string, db::consistency_level cl, - const timeout_config& timeout_config, + const std::initializer_list& values, + bool cache) { + return execute_internal(query_string, cl, *_internal_state, values, cache); +} + +future<::shared_ptr> +query_processor::execute_internal( + const sstring& query_string, + db::consistency_level cl, + service::query_state& query_state, const std::initializer_list& values, bool cache) { @@ -801,13 +807,13 @@ query_processor::execute_internal( log.trace("execute_internal: {}\"{}\" ({})", cache ? "(cached) " : "", query_string, ::join(", ", values)); } if (cache) { - return execute_with_params(prepare_internal(query_string), cl, timeout_config, values); + return execute_with_params(prepare_internal(query_string), cl, query_state, values); } else { auto p = parse_statement(query_string)->prepare(_db, _cql_stats); p->statement->raw_cql_statement = query_string; p->statement->validate(_proxy, *_internal_state); auto checked_weak_ptr = p->checked_weak_from_this(); - return execute_with_params(std::move(checked_weak_ptr), cl, timeout_config, values).finally([p = std::move(p)] {}); + return execute_with_params(std::move(checked_weak_ptr), cl, query_state, values).finally([p = std::move(p)] {}); } } @@ -815,11 +821,11 @@ future<::shared_ptr> query_processor::execute_with_params( statements::prepared_statement::checked_weak_ptr p, db::consistency_level cl, - const timeout_config& timeout_config, + service::query_state& query_state, const std::initializer_list& values) { - auto opts = make_internal_options(p, values, cl, timeout_config); - return do_with(std::move(opts), [this, p = std::move(p)](auto & opts) { - return p->statement->execute(_proxy, *_internal_state, opts).then([](auto msg) { + auto opts = make_internal_options(p, values, cl); + return do_with(std::move(opts), [this, &query_state, p = std::move(p)](auto & opts) { + return p->statement->execute(_proxy, query_state, opts).then([](auto msg) { return make_ready_future<::shared_ptr>(::make_shared(msg)); }); }); @@ -942,17 +948,16 @@ bool query_processor::migration_subscriber::should_invalidate( future<> query_processor::query_internal( const sstring& query_string, db::consistency_level cl, - const timeout_config& timeout_config, const std::initializer_list& values, int32_t page_size, noncopyable_function(const cql3::untyped_result_set_row&)>&& f) { - return for_each_cql_result(create_paged_state(query_string, cl, timeout_config, values, page_size), std::move(f)); + return for_each_cql_result(create_paged_state(query_string, cl, values, page_size), std::move(f)); } future<> query_processor::query_internal( const sstring& query_string, noncopyable_function(const cql3::untyped_result_set_row&)>&& f) { - return query_internal(query_string, db::consistency_level::ONE, infinite_timeout_config, {}, 1000, std::move(f)); + return query_internal(query_string, db::consistency_level::ONE, {}, 1000, std::move(f)); } } diff --git a/cql3/query_processor.hh b/cql3/query_processor.hh index 89359fc4deb1..6ec848538529 100644 --- a/cql3/query_processor.hh +++ b/cql3/query_processor.hh @@ -215,8 +215,7 @@ public: // creating namespaces, etc) is explicitly forbidden via this interface. future<::shared_ptr> execute_internal(const sstring& query_string, const std::initializer_list& values = { }) { - return execute_internal(query_string, db::consistency_level::ONE, - infinite_timeout_config, values, true); + return execute_internal(query_string, db::consistency_level::ONE, values, true); } statements::prepared_statement::checked_weak_ptr prepare_internal(const sstring& query); @@ -234,7 +233,6 @@ public: return query_internal( "SELECT * from system.compaction_history", db::consistency_level::ONE, - infinite_timeout_config, {}, [&history] (const cql3::untyped_result_set::row& row) mutable { .... @@ -246,7 +244,6 @@ public: * * query_string - the cql string, can contain placeholders * cl - consistency level of the query - * timeout_config - timeout configuration * values - values to be substituted for the placeholders in the query * page_size - maximum page size * f - a function to be run on each row of the query result, @@ -255,7 +252,6 @@ public: future<> query_internal( const sstring& query_string, db::consistency_level cl, - const timeout_config& timeout_config, const std::initializer_list& values, int32_t page_size, noncopyable_function(const cql3::untyped_result_set_row&)>&& f); @@ -282,14 +278,19 @@ public: future<::shared_ptr> execute_internal( const sstring& query_string, db::consistency_level, - const timeout_config& timeout_config, + const std::initializer_list& = { }, + bool cache = false); + future<::shared_ptr> execute_internal( + const sstring& query_string, + db::consistency_level, + service::query_state& query_state, const std::initializer_list& = { }, bool cache = false); future<::shared_ptr> execute_with_params( statements::prepared_statement::checked_weak_ptr p, db::consistency_level, - const timeout_config& timeout_config, + service::query_state& query_state, const std::initializer_list& = { }); future<::shared_ptr> @@ -318,7 +319,6 @@ private: const statements::prepared_statement::checked_weak_ptr& p, const std::initializer_list&, db::consistency_level, - const timeout_config& timeout_config, int32_t page_size = -1) const; future<::shared_ptr> @@ -332,7 +332,6 @@ private: ::shared_ptr create_paged_state( const sstring& query_string, db::consistency_level, - const timeout_config&, const std::initializer_list&, int32_t page_size); diff --git a/db/query_context.hh b/db/query_context.hh index 994106c189db..46d706e66467 100644 --- a/db/query_context.hh +++ b/db/query_context.hh @@ -55,10 +55,18 @@ struct query_context { // let the `storage_proxy` time out the query down the call chain db::timeout_clock::duration::zero(); - return do_with(timeout_config{d, d, d, d, d, d, d}, [this, req = std::move(req), &args...] (auto& tcfg) { + struct timeout_context { + std::unique_ptr client_state; + service::query_state query_state; + timeout_context(db::timeout_clock::duration d) + : client_state(std::make_unique(service::client_state::internal_tag{}, timeout_config{d, d, d, d, d, d, d})) + , query_state(*client_state, empty_service_permit()) + {} + }; + return do_with(timeout_context(d), [this, req = std::move(req), &args...] (auto& tctx) { return _qp.local().execute_internal(req, cql3::query_options::DEFAULT.get_consistency(), - tcfg, + tctx.query_state, { data_value(std::forward(args))... }, true); }); diff --git a/db/schema_tables.cc b/db/schema_tables.cc index 839b145fe270..941c3ab1fa71 100644 --- a/db/schema_tables.cc +++ b/db/schema_tables.cc @@ -3131,7 +3131,6 @@ future get_column_mapping(utils::UUID table_id, table_schema_ver auto cm_fut = qctx->qp().execute_internal( GET_COLUMN_MAPPING_QUERY, db::consistency_level::LOCAL_ONE, - infinite_timeout_config, {table_id, version} ); return cm_fut.then([version] (shared_ptr results) { @@ -3174,7 +3173,6 @@ future column_mapping_exists(utils::UUID table_id, table_schema_version ve return qctx->qp().execute_internal( GET_COLUMN_MAPPING_QUERY, db::consistency_level::LOCAL_ONE, - infinite_timeout_config, {table_id, version} ).then([] (shared_ptr results) { return !results->empty(); @@ -3188,7 +3186,6 @@ future<> drop_column_mapping(utils::UUID table_id, table_schema_version version) return qctx->qp().execute_internal( DEL_COLUMN_MAPPING_QUERY, db::consistency_level::LOCAL_ONE, - infinite_timeout_config, {table_id, version}).discard_result(); } diff --git a/db/system_distributed_keyspace.cc b/db/system_distributed_keyspace.cc index 1d067bf770ce..dc2b6f892fcb 100644 --- a/db/system_distributed_keyspace.cc +++ b/db/system_distributed_keyspace.cc @@ -181,17 +181,20 @@ future<> system_distributed_keyspace::stop() { return make_ready_future<>(); } -static timeout_config get_timeout_config(db::timeout_clock::duration t) { - return timeout_config{ t, t, t, t, t, t, t }; -} - -static const timeout_config internal_distributed_timeout_config = get_timeout_config(std::chrono::seconds(10)); +static service::query_state& internal_distributed_query_state() { + using namespace std::chrono_literals; + const auto t = 10s; + static timeout_config tc{ t, t, t, t, t, t, t }; + static thread_local service::client_state cs(service::client_state::internal_tag{}, tc); + static thread_local service::query_state qs(cs, empty_service_permit()); + return qs; +}; future> system_distributed_keyspace::view_status(sstring ks_name, sstring view_name) const { return _qp.execute_internal( format("SELECT host_id, status FROM {}.{} WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS), db::consistency_level::ONE, - internal_distributed_timeout_config, + internal_distributed_query_state(), { std::move(ks_name), std::move(view_name) }, false).then([this] (::shared_ptr cql_result) { return boost::copy_range>(*cql_result @@ -208,7 +211,7 @@ future<> system_distributed_keyspace::start_view_build(sstring ks_name, sstring return _qp.execute_internal( format("INSERT INTO {}.{} (keyspace_name, view_name, host_id, status) VALUES (?, ?, ?, ?)", NAME, VIEW_BUILD_STATUS), db::consistency_level::ONE, - internal_distributed_timeout_config, + internal_distributed_query_state(), { std::move(ks_name), std::move(view_name), std::move(host_id), "STARTED" }, false).discard_result(); }); @@ -219,7 +222,7 @@ future<> system_distributed_keyspace::finish_view_build(sstring ks_name, sstring return _qp.execute_internal( format("UPDATE {}.{} SET status = ? WHERE keyspace_name = ? AND view_name = ? AND host_id = ?", NAME, VIEW_BUILD_STATUS), db::consistency_level::ONE, - internal_distributed_timeout_config, + internal_distributed_query_state(), { "SUCCESS", std::move(ks_name), std::move(view_name), std::move(host_id) }, false).discard_result(); }); @@ -229,7 +232,7 @@ future<> system_distributed_keyspace::remove_view(sstring ks_name, sstring view_ return _qp.execute_internal( format("DELETE FROM {}.{} WHERE keyspace_name = ? AND view_name = ?", NAME, VIEW_BUILD_STATUS), db::consistency_level::ONE, - internal_distributed_timeout_config, + internal_distributed_query_state(), { std::move(ks_name), std::move(view_name) }, false).discard_result(); } @@ -307,7 +310,7 @@ system_distributed_keyspace::insert_cdc_topology_description( return _qp.execute_internal( format("INSERT INTO {}.{} (time, description) VALUES (?,?)", NAME, CDC_TOPOLOGY_DESCRIPTION), quorum_if_many(ctx.num_token_owners), - internal_distributed_timeout_config, + internal_distributed_query_state(), { time, make_list_value(cdc_generation_description_type, prepare_cdc_generation_description(description)) }, false).discard_result(); } @@ -319,7 +322,7 @@ system_distributed_keyspace::read_cdc_topology_description( return _qp.execute_internal( format("SELECT description FROM {}.{} WHERE time = ?", NAME, CDC_TOPOLOGY_DESCRIPTION), quorum_if_many(ctx.num_token_owners), - internal_distributed_timeout_config, + internal_distributed_query_state(), { time }, false ).then([] (::shared_ptr cql_result) -> std::optional { @@ -347,7 +350,7 @@ system_distributed_keyspace::expire_cdc_topology_description( return _qp.execute_internal( format("UPDATE {}.{} SET expired = ? WHERE time = ?", NAME, CDC_TOPOLOGY_DESCRIPTION), quorum_if_many(ctx.num_token_owners), - internal_distributed_timeout_config, + internal_distributed_query_state(), { expiration_time, streams_ts }, false).discard_result(); } @@ -413,7 +416,7 @@ system_distributed_keyspace::create_cdc_desc( co_await _qp.execute_internal( format("INSERT INTO {}.{} (key, time) VALUES (?, ?)", NAME, CDC_TIMESTAMPS), quorum_if_many(ctx.num_token_owners), - internal_distributed_timeout_config, + internal_distributed_query_state(), { CDC_TIMESTAMPS_KEY, time }, false).discard_result(); } @@ -426,7 +429,7 @@ system_distributed_keyspace::expire_cdc_desc( return _qp.execute_internal( format("UPDATE {}.{} SET expired = ? WHERE time = ?", NAME, CDC_TIMESTAMPS), quorum_if_many(ctx.num_token_owners), - internal_distributed_timeout_config, + internal_distributed_query_state(), { expiration_time, streams_ts }, false).discard_result(); } @@ -471,7 +474,7 @@ system_distributed_keyspace::cdc_desc_exists( co_return co_await _qp.execute_internal( format("SELECT time FROM {}.{} WHERE key = ? AND time = ?", NAME, CDC_TIMESTAMPS), quorum_if_many(ctx.num_token_owners), - internal_distributed_timeout_config, + internal_distributed_query_state(), { CDC_TIMESTAMPS_KEY, streams_ts }, false ).then([] (::shared_ptr cql_result) -> bool { @@ -484,7 +487,7 @@ system_distributed_keyspace::cdc_get_versioned_streams(db_clock::time_point not_ auto timestamps_cql = co_await _qp.execute_internal( format("SELECT time FROM {}.{} WHERE key = ?", NAME, CDC_TIMESTAMPS), quorum_if_many(ctx.num_token_owners), - internal_distributed_timeout_config, + internal_distributed_query_state(), { CDC_TIMESTAMPS_KEY }, false); @@ -506,7 +509,7 @@ system_distributed_keyspace::cdc_get_versioned_streams(db_clock::time_point not_ auto streams_cql = co_await _qp.execute_internal( format("SELECT streams FROM {}.{} WHERE time = ?", NAME, CDC_DESC_V2), quorum_if_many(ctx.num_token_owners), - internal_distributed_timeout_config, + internal_distributed_query_state(), { ts }, false); @@ -526,11 +529,10 @@ future> system_distributed_keyspace::get_cdc_desc_v1_timestamps(context ctx) { std::vector res; co_await _qp.query_internal( - format("SELECT time FROM {}.{}", NAME, CDC_DESC_V1), - quorum_if_many(ctx.num_token_owners), // This is a long and expensive scan (mostly due to #8061). // Give it a bit more time than usual. - get_timeout_config(std::chrono::seconds(60)), + format("SELECT time FROM {}.{} USING TIMEOUT 60s", NAME, CDC_DESC_V1), + quorum_if_many(ctx.num_token_owners), {}, 1000, [&] (const cql3::untyped_result_set_row& r) { diff --git a/service/client_state.hh b/service/client_state.hh index 5537e0bd1ff6..0e87078d045a 100644 --- a/service/client_state.hh +++ b/service/client_state.hh @@ -198,12 +198,15 @@ public: return _timeout_config; } - client_state(internal_tag) + client_state(internal_tag) : client_state(internal_tag{}, infinite_timeout_config) + {} + + client_state(internal_tag, const timeout_config& config) : _keyspace("system") , _is_internal(true) , _is_thrift(false) - , _default_timeout_config(infinite_timeout_config) - , _timeout_config(infinite_timeout_config) + , _default_timeout_config(config) + , _timeout_config(config) {} client_state(const client_state&) = delete; diff --git a/service/raft/raft_sys_table_storage.cc b/service/raft/raft_sys_table_storage.cc index 0322d3e30b30..3e766f1bdd4a 100644 --- a/service/raft/raft_sys_table_storage.cc +++ b/service/raft/raft_sys_table_storage.cc @@ -184,7 +184,6 @@ future<> raft_sys_table_storage::do_store_log_entries(const std::vector{}, false, diff --git a/test/boost/cql_query_large_test.cc b/test/boost/cql_query_large_test.cc index 89637049ef76..94c7d6e7c87b 100644 --- a/test/boost/cql_query_large_test.cc +++ b/test/boost/cql_query_large_test.cc @@ -173,7 +173,7 @@ SEASTAR_TEST_CASE(test_insert_large_collection_values) { BOOST_REQUIRE_THROW(e.execute_cql(format("INSERT INTO tbl (pk, m) VALUES ('Golding', {{'{}': 'value'}});", long_value)).get(), std::exception); auto make_query_options = [] (cql_protocol_version_type version) { - return std::make_unique(cql3::default_cql_config, db::consistency_level::ONE, infinite_timeout_config, std::nullopt, + return std::make_unique(cql3::default_cql_config, db::consistency_level::ONE, std::nullopt, std::vector(), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format{version}); }; diff --git a/test/boost/cql_query_test.cc b/test/boost/cql_query_test.cc index c3cd97951595..9e21d5dd4214 100644 --- a/test/boost/cql_query_test.cc +++ b/test/boost/cql_query_test.cc @@ -385,7 +385,6 @@ SEASTAR_TEST_CASE(test_list_append_limit) { // append sequence, which will be exceeded it in this // test. auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, - infinite_timeout_config, std::vector{}, cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()}); auto cql = fmt::format("UPDATE t SET l = l + [{}] WHERE pk = 0;", value_list); @@ -3037,7 +3036,7 @@ SEASTAR_TEST_CASE(test_empty_partition_range_scan) { e.execute_cql("create table empty_partition_range_scan.tb (a int, b int, c int, val int, PRIMARY KEY ((a,b),c) );").get(); - auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()}); auto res = e.execute_cql("select * from empty_partition_range_scan.tb where token (a,b) > 1 and token(a,b) <= 1;", std::move(qo)).get0(); assert_that(res).is_rows().is_empty(); @@ -4432,7 +4431,6 @@ static std::unique_ptr q_serial_opts( const auto& so = cql3::query_options::specific_options::DEFAULT; auto qo = std::make_unique( cl, - infinite_timeout_config, values, // Ensure (optional) serial consistency is always specified. cql3::query_options::specific_options{ @@ -4647,7 +4645,7 @@ SEASTAR_THREAD_TEST_CASE(test_query_limit) { const auto select_query = format("SELECT * FROM test WHERE pk = {} ORDER BY ck {};", pk, is_reversed ? "DESC" : "ASC"); int32_t page_size = is_paged ? 10000 : -1; - auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{page_size, nullptr, {}, api::new_timestamp()}); const auto* expected_rows = is_reversed ? &reversed_rows : &normal_rows; diff --git a/test/boost/filtering_test.cc b/test/boost/filtering_test.cc index ce6597e64785..a0081e18d535 100644 --- a/test/boost/filtering_test.cc +++ b/test/boost/filtering_test.cc @@ -831,14 +831,14 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) { { int32_type->decompose(6), boolean_type->decompose(false)}, }); - auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()}); msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=true LIMIT 1 ALLOW FILTERING;", std::move(qo)).get0(); assert_that(msg).is_rows().with_rows({ { int32_type->decompose(3), boolean_type->decompose(true)}, }); - qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()}); msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 5 ALLOW FILTERING;", std::move(qo)).get0(); assert_that(msg).is_rows().with_rows({ @@ -849,7 +849,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) { { int32_type->decompose(6), boolean_type->decompose(false)}, }); - qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()}); msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 2 ALLOW FILTERING;", std::move(qo)).get0(); assert_that(msg).is_rows().with_rows({ @@ -857,7 +857,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) { { int32_type->decompose(2), boolean_type->decompose(false)} }); - qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()}); msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0(); assert_that(msg).is_rows().with_rows({ @@ -866,7 +866,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) { { int32_type->decompose(4), boolean_type->decompose(false)} }); - qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()}); msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0(); auto paging_state = extract_paging_state(msg); @@ -877,7 +877,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) { // Some pages might be empty and in such case we should continue querying size_t rows_fetched = 0; while (rows_fetched == 0) { - qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0(); rows_fetched = count_rows_fetched(msg); @@ -889,7 +889,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) { rows_fetched = 0; while (rows_fetched == 0) { - qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0(); rows_fetched = count_rows_fetched(msg); @@ -905,7 +905,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_limit) { rows_fetched = 0; uint64_t remaining = 1; while (remaining > 0) { - qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false LIMIT 3 ALLOW FILTERING;", std::move(qo)).get0(); rows_fetched += count_rows_fetched(msg); @@ -964,7 +964,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_per_partition_limit) { { int32_type->decompose(1), boolean_type->decompose(false)}, }); - auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()}); msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=true PER PARTITION LIMIT 1 ALLOW FILTERING;", std::move(qo)).get0(); assert_that(msg).is_rows().with_rows({ @@ -972,7 +972,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_per_partition_limit) { { int32_type->decompose(3), boolean_type->decompose(true)}, }); - qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()}); msg = e.execute_cql("SELECT c, liked FROM timeline PER PARTITION LIMIT 1;", std::move(qo)).get0(); assert_that(msg).is_rows().with_rows({ @@ -983,7 +983,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_per_partition_limit) { // Some pages might be empty and in such case we should continue querying size_t rows_fetched = 0; while (rows_fetched == 0) { - qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); msg = e.execute_cql("SELECT c, liked FROM timeline WHERE liked=false PER PARTITION LIMIT 1 ALLOW FILTERING;", std::move(qo)).get0(); rows_fetched = count_rows_fetched(msg); @@ -1001,7 +1001,7 @@ SEASTAR_TEST_CASE(test_allow_filtering_per_partition_limit) { rows_fetched = 0; uint64_t remaining = 1; while (remaining > 0) { - qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{pg, paging_state, {}, api::new_timestamp()}); sstring query = allow_filtering ? fmt::format("SELECT c, liked FROM timeline WHERE liked=false PER PARTITION LIMIT {} ALLOW FILTERING;", ppl) : diff --git a/test/boost/index_with_paging_test.cc b/test/boost/index_with_paging_test.cc index 087e97940c1f..ff39c2f43e0e 100644 --- a/test/boost/index_with_paging_test.cc +++ b/test/boost/index_with_paging_test.cc @@ -36,7 +36,7 @@ SEASTAR_TEST_CASE(test_index_with_paging) { } eventually([&] { - auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{4321, nullptr, {}, api::new_timestamp()}); auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0(); assert_that(res).is_rows().with_size(4321); diff --git a/test/boost/large_paging_state_test.cc b/test/boost/large_paging_state_test.cc index 059c790cd8da..3b3ef198961f 100644 --- a/test/boost/large_paging_state_test.cc +++ b/test/boost/large_paging_state_test.cc @@ -63,7 +63,7 @@ SEASTAR_TEST_CASE(test_use_high_bits_of_remaining_rows_in_paging_state) { e.execute_prepared(id, {cql3_pk, cql3_ck}).get(); } - auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{5, nullptr, {}, api::new_timestamp()}); auto msg = e.execute_cql("select * from test;", std::move(qo)).get0(); auto paging_state = extract_paging_state(msg); @@ -75,7 +75,7 @@ SEASTAR_TEST_CASE(test_use_high_bits_of_remaining_rows_in_paging_state) { paging_state->set_remaining(test_remaining); while (has_more_pages(msg)) { - qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{5, paging_state, {}, api::new_timestamp()}); msg = e.execute_cql("SELECT * FROM test;", std::move(qo)).get0(); rows_fetched = count_rows_fetched(msg); @@ -101,7 +101,7 @@ SEASTAR_TEST_CASE(test_use_high_bits_of_remaining_rows_in_paging_state_filtering e.execute_prepared(id, {cql3_pk, cql3_ck}).get(); } - auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{5, nullptr, {}, api::new_timestamp()}); auto msg = e.execute_cql("select * from test where ck > 10;", std::move(qo)).get0(); auto paging_state = extract_paging_state(msg); @@ -113,7 +113,7 @@ SEASTAR_TEST_CASE(test_use_high_bits_of_remaining_rows_in_paging_state_filtering paging_state->set_remaining(test_remaining); while (has_more_pages(msg)) { - qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{5, paging_state, {}, api::new_timestamp()}); msg = e.execute_cql("SELECT * FROM test where ck > 10;", std::move(qo)).get0(); rows_fetched = count_rows_fetched(msg); diff --git a/test/boost/query_processor_test.cc b/test/boost/query_processor_test.cc index ce088689875f..18265a1de41f 100644 --- a/test/boost/query_processor_test.cc +++ b/test/boost/query_processor_test.cc @@ -209,8 +209,7 @@ std::unordered_map get_query_metrics() { /// Creates query_options with cl, infinite timeout, and no named values. auto make_options(clevel cl) { - return std::make_unique( - cl, infinite_timeout_config, std::vector()); + return std::make_unique(cl, std::vector()); } } // anonymous namespace diff --git a/test/boost/restrictions_test.cc b/test/boost/restrictions_test.cc index adc373bc19d8..5e2bad035680 100644 --- a/test/boost/restrictions_test.cc +++ b/test/boost/restrictions_test.cc @@ -45,7 +45,7 @@ std::unique_ptr to_options( static auto& d = cql3::query_options::DEFAULT; return std::make_unique( cfg, - d.get_consistency(), d.get_timeout_config(), std::move(names), std::move(values), d.skip_metadata(), + d.get_consistency(), std::move(names), std::move(values), d.skip_metadata(), d.get_specific_options(), d.get_cql_serialization_format()); } diff --git a/test/boost/secondary_index_test.cc b/test/boost/secondary_index_test.cc index 18473d2e0675..6f0ddcc0d04b 100644 --- a/test/boost/secondary_index_test.cc +++ b/test/boost/secondary_index_test.cc @@ -427,7 +427,7 @@ SEASTAR_TEST_CASE(test_index_on_pk_ck_with_paging) { } eventually([&] { - auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{101, nullptr, {}, api::new_timestamp()}); auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0(); assert_that(res).is_rows().with_size(101); @@ -439,7 +439,7 @@ SEASTAR_TEST_CASE(test_index_on_pk_ck_with_paging) { }); eventually([&] { - auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()}); auto res = e.execute_cql("SELECT * FROM tab WHERE pk2 = 1", std::move(qo)).get0(); assert_that(res).is_rows().with_rows({{ @@ -449,7 +449,7 @@ SEASTAR_TEST_CASE(test_index_on_pk_ck_with_paging) { }); eventually([&] { - auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{100, nullptr, {}, api::new_timestamp()}); auto res = e.execute_cql("SELECT * FROM tab WHERE ck2 = 'world8'", std::move(qo)).get0(); assert_that(res).is_rows().with_rows({{ @@ -485,7 +485,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) { }; eventually([&] { - auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()}); auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0(); auto paging_state = extract_paging_state(res); @@ -495,7 +495,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) { {int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)}, }}); - qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0(); expect_more_pages(res, true); @@ -505,7 +505,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) { {int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)}, }}); - qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0(); paging_state = extract_paging_state(res); @@ -520,7 +520,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) { try { expect_more_pages(res, false); } catch (...) { - qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0(); assert_that(res).is_rows().with_size(0); @@ -530,7 +530,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) { }); eventually([&] { - auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()}); auto res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0(); auto paging_state = extract_paging_state(res); @@ -539,7 +539,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) { {int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)}, }}); - qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0(); @@ -549,7 +549,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) { }); { - auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()}); auto res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0(); auto paging_state = extract_paging_state(res); @@ -566,7 +566,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) { paging_state->get_last_replicas(), paging_state->get_query_read_repair_decision(), paging_state->get_rows_fetched_for_last_partition()); - qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); res = e.execute_cql("SELECT * FROM tab WHERE c = 2", std::move(qo)).get0(); @@ -578,7 +578,7 @@ SEASTAR_TEST_CASE(test_simple_index_paging) { // not to return rows (since no row matches an empty partition key) auto paging_state = make_lw_shared(partition_key::make_empty(), std::nullopt, 1, utils::make_random_uuid(), service::pager::paging_state::replicas_per_token_range{}, std::nullopt, 1); - auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); auto res = e.execute_cql("SELECT * FROM tab WHERE v = 1", std::move(qo)).get0(); @@ -817,7 +817,7 @@ SEASTAR_TEST_CASE(test_local_index_paging) { }; eventually([&] { - auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()}); auto res = e.execute_cql("SELECT * FROM tab WHERE p = 1 and v = 1", std::move(qo)).get0(); auto paging_state = extract_paging_state(res); @@ -826,7 +826,7 @@ SEASTAR_TEST_CASE(test_local_index_paging) { {int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(1)}, }}); - qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); res = e.execute_cql("SELECT * FROM tab WHERE p = 1 and v = 1", std::move(qo)).get0(); @@ -836,7 +836,7 @@ SEASTAR_TEST_CASE(test_local_index_paging) { }); eventually([&] { - auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{1, nullptr, {}, api::new_timestamp()}); auto res = e.execute_cql("SELECT * FROM tab WHERE p = 1 and c2 = 2", std::move(qo)).get0(); auto paging_state = extract_paging_state(res); @@ -845,7 +845,7 @@ SEASTAR_TEST_CASE(test_local_index_paging) { {int32_type->decompose(1)}, {int32_type->decompose(1)}, {int32_type->decompose(2)}, {int32_type->decompose(1)}, }}); - qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{1, paging_state, {}, api::new_timestamp()}); res = e.execute_cql("SELECT * FROM tab WHERE p = 1 and c2 = 2", std::move(qo)).get0(); @@ -1173,7 +1173,7 @@ SEASTAR_TEST_CASE(test_indexing_paging_and_aggregation) { } eventually([&] { - auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{2, nullptr, {}, api::new_timestamp()}); auto msg = cquery_nofail(e, "SELECT sum(id) FROM fpa WHERE v = 0;", std::move(qo)); // Even though we set up paging, we still expect a single result from an aggregation function. @@ -1188,7 +1188,7 @@ SEASTAR_TEST_CASE(test_indexing_paging_and_aggregation) { { int32_type->decompose(row_count * row_count / 4 + row_count / 2)}, }); - qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()}); msg = cquery_nofail(e, "SELECT avg(id) FROM fpa WHERE v = 1;", std::move(qo)); assert_that(msg).is_rows().with_rows({ @@ -1206,7 +1206,7 @@ SEASTAR_TEST_CASE(test_indexing_paging_and_aggregation) { cquery_nofail(e, format("INSERT INTO fpa2 (id, c1, c2) VALUES ({}, {}, {})", i + 1, i + 1, i % 2).c_str()); } - auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + auto qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{2, nullptr, {}, api::new_timestamp()}); auto msg = cquery_nofail(e, "SELECT sum(id) FROM fpa2 WHERE c2 = 0;", std::move(qo)); // Even though we set up paging, we still expect a single result from an aggregation function @@ -1214,7 +1214,7 @@ SEASTAR_TEST_CASE(test_indexing_paging_and_aggregation) { { int32_type->decompose(row_count * row_count / 4)}, }); - qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{3, nullptr, {}, api::new_timestamp()}); msg = cquery_nofail(e, "SELECT avg(id) FROM fpa2 WHERE c2 = 1;", std::move(qo)); assert_that(msg).is_rows().with_rows({ diff --git a/test/lib/cql_test_env.cc b/test/lib/cql_test_env.cc index a25d1e288c53..e03ebda02964 100644 --- a/test/lib/cql_test_env.cc +++ b/test/lib/cql_test_env.cc @@ -190,7 +190,7 @@ class single_node_cql_env : public cql_test_env { db::consistency_level cl = db::consistency_level::ONE) override { const auto& so = cql3::query_options::specific_options::DEFAULT; - auto options = std::make_unique(cl, infinite_timeout_config, + auto options = std::make_unique(cl, std::move(values), cql3::query_options::specific_options{ so.page_size, so.state, @@ -227,7 +227,7 @@ class single_node_cql_env : public cql_test_env { throw std::runtime_error(format("get_stmt_mutations: not a modification statement: {}", text)); } auto& qo = cql3::query_options::DEFAULT; - auto timeout = db::timeout_clock::now() + qo.get_timeout_config().write_timeout; + auto timeout = db::timeout_clock::now() + qs->get_client_state().get_timeout_config().write_timeout; return modif_stmt->get_mutations(local_qp().proxy(), qo, timeout, false, qo.get_timestamp(*qs), *qs) .finally([qs, modif_stmt = std::move(modif_stmt)] {}); diff --git a/test/manual/enormous_table_scan_test.cc b/test/manual/enormous_table_scan_test.cc index 2ba3ea7da2d1..5b061afc416e 100644 --- a/test/manual/enormous_table_scan_test.cc +++ b/test/manual/enormous_table_scan_test.cc @@ -229,7 +229,7 @@ SEASTAR_TEST_CASE(scan_enormous_table_test) { std::unique_ptr qo; uint64_t fetched_rows_log_counter = 1e7; do { - qo = std::make_unique(db::consistency_level::LOCAL_ONE, infinite_timeout_config, std::vector{}, + qo = std::make_unique(db::consistency_level::LOCAL_ONE, std::vector{}, cql3::query_options::specific_options{10000, paging_state, {}, api::new_timestamp()}); msg = e.execute_cql("select * from enormous_table;", std::move(qo)).get0(); rows_fetched += count_rows_fetched(msg); diff --git a/test/tools/cql_repl.cc b/test/tools/cql_repl.cc index 4f4bf7425789..aa494d39b31e 100644 --- a/test/tools/cql_repl.cc +++ b/test/tools/cql_repl.cc @@ -105,7 +105,6 @@ std::unique_ptr repl_options() { const auto& so = cql3::query_options::specific_options::DEFAULT; auto qo = std::make_unique( db::consistency_level::ONE, - infinite_timeout_config, std::vector{}, // Ensure (optional) serial consistency is always specified. cql3::query_options::specific_options{ diff --git a/thrift/handler.cc b/thrift/handler.cc index 9a79d6197808..68d40b517d48 100644 --- a/thrift/handler.cc +++ b/thrift/handler.cc @@ -976,7 +976,7 @@ class thrift_handler : public CassandraCobSvIf { throw make_exception("Compressed query strings are not supported"); } auto& qp = _query_processor.local(); - auto opts = std::make_unique(qp.get_cql_config(), cl_from_thrift(consistency), _timeout_config, std::nullopt, std::vector(), + auto opts = std::make_unique(qp.get_cql_config(), cl_from_thrift(consistency), std::nullopt, std::vector(), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); auto f = qp.execute_direct(query, _query_state, *opts); return f.then([cob = std::move(cob), opts = std::move(opts)](auto&& ret) { @@ -1056,7 +1056,7 @@ class thrift_handler : public CassandraCobSvIf { return cql3::raw_value::make_value(to_bytes(s)); }); auto& qp = _query_processor.local(); - auto opts = std::make_unique(qp.get_cql_config(), cl_from_thrift(consistency), _timeout_config, std::nullopt, std::move(bytes_values), + auto opts = std::make_unique(qp.get_cql_config(), cl_from_thrift(consistency), std::nullopt, std::move(bytes_values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); auto f = qp.execute_prepared(std::move(prepared), std::move(cache_key), _query_state, *opts, needs_authorization); return f.then([cob = std::move(cob), opts = std::move(opts)](auto&& ret) { diff --git a/tracing/trace_keyspace_helper.cc b/tracing/trace_keyspace_helper.cc index f7c323df4fb1..06b992d880d0 100644 --- a/tracing/trace_keyspace_helper.cc +++ b/tracing/trace_keyspace_helper.cc @@ -64,9 +64,13 @@ const sstring trace_keyspace_helper::EVENTS("events"); const sstring trace_keyspace_helper::NODE_SLOW_QUERY_LOG("node_slow_log"); const sstring trace_keyspace_helper::NODE_SLOW_QUERY_LOG_TIME_IDX("node_slow_log_time_idx"); -timeout_config tracing_db_timeout_config { - 5s, 5s, 5s, 5s, 5s, 5s, 5s, -}; +static service::client_state& tracing_client_state() { + static timeout_config tracing_db_timeout_config { + 5s, 5s, 5s, 5s, 5s, 5s, 5s, + }; + static thread_local service::client_state s(service::client_state::internal_tag{}, tracing_db_timeout_config); + return s; +} struct trace_keyspace_backend_sesssion_state final : public backend_session_state_base { int64_t last_nanos = 0; @@ -76,7 +80,7 @@ struct trace_keyspace_backend_sesssion_state final : public backend_session_stat trace_keyspace_helper::trace_keyspace_helper(tracing& tr) : i_tracing_backend_helper(tr) - , _dummy_query_state(service::client_state::for_internal_calls(), empty_service_permit()) + , _dummy_query_state(tracing_client_state(), empty_service_permit()) , _sessions(KEYSPACE_NAME, SESSIONS, sprint("CREATE TABLE IF NOT EXISTS %s.%s (" "session_id uuid," @@ -314,7 +318,7 @@ cql3::query_options trace_keyspace_helper::make_session_mutation_data(const one_ }; return cql3::query_options(cql3::default_cql_config, - db::consistency_level::ANY, tracing_db_timeout_config, std::move(names), std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); + db::consistency_level::ANY, std::move(names), std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); } cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(const one_session_records& session_records) { @@ -332,7 +336,7 @@ cql3::query_options trace_keyspace_helper::make_session_time_idx_mutation_data(c }; return cql3::query_options(cql3::default_cql_config, - db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); + db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); } cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const one_session_records& session_records, const utils::UUID& start_time_id) { @@ -375,7 +379,7 @@ cql3::query_options trace_keyspace_helper::make_slow_query_mutation_data(const o }); return cql3::query_options(cql3::default_cql_config, - db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); + db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); } cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_data(const one_session_records& session_records, const utils::UUID& start_time_id) { @@ -396,7 +400,7 @@ cql3::query_options trace_keyspace_helper::make_slow_query_time_idx_mutation_dat }); return cql3::query_options(cql3::default_cql_config, - db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); + db::consistency_level::ANY, std::nullopt, std::move(values), false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()); } std::vector trace_keyspace_helper::make_event_mutation_data(one_session_records& session_records, const event_record& record) { @@ -432,7 +436,7 @@ future<> trace_keyspace_helper::apply_events_mutation(cql3::query_processor& qp, std::for_each(events_records.begin(), events_records.end(), [&values, all_records = records, this] (event_record& one_event_record) { values.emplace_back(make_event_mutation_data(*all_records, one_event_record)); }); return do_with( - cql3::query_options::make_batch_options(cql3::query_options(cql3::default_cql_config, db::consistency_level::ANY, tracing_db_timeout_config, std::nullopt, std::vector{}, false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()), std::move(values)), + cql3::query_options::make_batch_options(cql3::query_options(cql3::default_cql_config, db::consistency_level::ANY, std::nullopt, std::vector{}, false, cql3::query_options::specific_options::DEFAULT, cql_serialization_format::latest()), std::move(values)), cql3::statements::batch_statement(cql3::statements::batch_statement::type::UNLOGGED, std::move(modifications), cql3::attributes::none(), qp.get_cql_stats()), [this] (auto& batch_options, auto& batch) { return batch.execute(service::get_storage_proxy().local(), _dummy_query_state, batch_options).then([] (shared_ptr res) { return now(); }); diff --git a/transport/request.hh b/transport/request.hh index 21446409da3b..7d9ea04a78c7 100644 --- a/transport/request.hh +++ b/transport/request.hh @@ -219,10 +219,10 @@ private: options_flag::NAMES_FOR_VALUES >; public: - std::unique_ptr read_options(uint8_t version, cql_serialization_format cql_ser_format, const timeout_config& timeouts, const cql3::cql_config& cql_config) { + std::unique_ptr read_options(uint8_t version, cql_serialization_format cql_ser_format, const cql3::cql_config& cql_config) { auto consistency = read_consistency(); if (version == 1) { - return std::make_unique(cql_config, consistency, timeouts, std::nullopt, std::vector{}, + return std::make_unique(cql_config, consistency, std::nullopt, std::vector{}, false, cql3::query_options::specific_options::DEFAULT, cql_ser_format); } @@ -270,11 +270,11 @@ public: if (!names.empty()) { onames = std::move(names); } - options = std::make_unique(cql_config, consistency, timeouts, std::move(onames), std::move(values), skip_metadata, + options = std::make_unique(cql_config, consistency, std::move(onames), std::move(values), skip_metadata, cql3::query_options::specific_options{page_size, std::move(paging_state), serial_consistency, ts}, cql_ser_format); } else { - options = std::make_unique(cql_config, consistency, timeouts, std::nullopt, std::move(values), skip_metadata, + options = std::make_unique(cql_config, consistency, std::nullopt, std::move(values), skip_metadata, cql3::query_options::specific_options::DEFAULT, cql_ser_format); } diff --git a/transport/server.cc b/transport/server.cc index 72771610b0bc..761e67131f7d 100644 --- a/transport/server.cc +++ b/transport/server.cc @@ -943,7 +943,7 @@ cql_server::connection::process_on_shard(unsigned shard, uint16_t stream, fragme (bytes_ostream& linearization_buffer, service::client_state& client_state) mutable { request_reader in(is, linearization_buffer); return process_fn(client_state, server._query_processor, in, stream, _version, _cql_serialization_format, - server.timeout_config(), /* FIXME */empty_service_permit(), std::move(trace_state), false).then([] (auto msg) { + /* FIXME */empty_service_permit(), std::move(trace_state), false).then([] (auto msg) { // result here has to be foreign ptr return std::get>>(std::move(msg)); }); @@ -958,7 +958,7 @@ cql_server::connection::process(uint16_t stream, request_reader in, service::cli fragmented_temporary_buffer::istream is = in.get_stream(); return process_fn(client_state, _server._query_processor, in, stream, - _version, _cql_serialization_format, _server.timeout_config(), permit, trace_state, true) + _version, _cql_serialization_format, permit, trace_state, true) .then([stream, &client_state, this, is, permit, process_fn, trace_state] (std::variant>, unsigned> msg) mutable { unsigned* shard = std::get_if(&msg); @@ -972,12 +972,11 @@ cql_server::connection::process(uint16_t stream, request_reader in, service::cli static future>, unsigned>> process_query_internal(service::client_state& client_state, distributed& qp, request_reader in, uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format, - const ::timeout_config& timeout_config, service_permit permit, tracing::trace_state_ptr trace_state, - bool init_trace) { + service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace) { auto query = in.read_long_string_view(); auto q_state = std::make_unique(client_state, trace_state, std::move(permit)); auto& query_state = q_state->query_state; - q_state->options = in.read_options(version, serialization_format, timeout_config, qp.local().get_cql_config()); + q_state->options = in.read_options(version, serialization_format, qp.local().get_cql_config()); auto& options = *q_state->options; auto skip_metadata = options.skip_metadata(); @@ -1040,8 +1039,7 @@ future> cql_server::connection::process_pr static future>, unsigned>> process_execute_internal(service::client_state& client_state, distributed& qp, request_reader in, uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format, - const ::timeout_config& timeout_config, service_permit permit, - tracing::trace_state_ptr trace_state, bool init_trace) { + service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace) { cql3::prepared_cache_key_type cache_key(in.read_short_bytes()); auto& id = cql3::prepared_cache_key_type::cql_id(cache_key); bool needs_authorization = false; @@ -1064,10 +1062,10 @@ process_execute_internal(service::client_state& client_state, distributed values; in.read_value_view_list(version, values); auto consistency = in.read_consistency(); - q_state->options = std::make_unique(qp.local().get_cql_config(), consistency, timeout_config, std::nullopt, values, false, + q_state->options = std::make_unique(qp.local().get_cql_config(), consistency, std::nullopt, values, false, cql3::query_options::specific_options::DEFAULT, serialization_format); } else { - q_state->options = in.read_options(version, serialization_format, timeout_config, qp.local().get_cql_config()); + q_state->options = in.read_options(version, serialization_format, qp.local().get_cql_config()); } auto& options = *q_state->options; auto skip_metadata = options.skip_metadata(); @@ -1120,8 +1118,7 @@ future>> cql_server::connectio static future>, unsigned>> process_batch_internal(service::client_state& client_state, distributed& qp, request_reader in, uint16_t stream, cql_protocol_version_type version, cql_serialization_format serialization_format, - const ::timeout_config& timeout_config, service_permit permit, - tracing::trace_state_ptr trace_state, bool init_trace) { + service_permit permit, tracing::trace_state_ptr trace_state, bool init_trace) { if (version == 1) { throw exceptions::protocol_exception("BATCH messages are not support in version 1 of the protocol"); } @@ -1210,7 +1207,7 @@ process_batch_internal(service::client_state& client_state, distributedquery_state; // #563. CQL v2 encodes query_options in v1 format for batch requests. q_state->options = std::make_unique(cql3::query_options::make_batch_options(std::move(*in.read_options(version < 3 ? 1 : version, serialization_format, - timeout_config, qp.local().get_cql_config())), std::move(values))); + qp.local().get_cql_config())), std::move(values))); auto& options = *q_state->options; if (init_trace) {