diff --git a/Makefile b/Makefile index 15d7d333..c644cdf8 100644 --- a/Makefile +++ b/Makefile @@ -262,7 +262,7 @@ build-examples: _update-rust-tooling: @echo "Run rustup update" - @rustup update + @rustup update stable check-cargo: install-cargo-if-missing _update-rust-tooling @echo "Running \"cargo check\" in ./scylla-rust-wrapper" diff --git a/scylla-rust-wrapper/src/api.rs b/scylla-rust-wrapper/src/api.rs index 220e4d3e..5a21166b 100644 --- a/scylla-rust-wrapper/src/api.rs +++ b/scylla-rust-wrapper/src/api.rs @@ -387,7 +387,7 @@ pub mod future { pub mod statement { // Disabling rustfmt to have one item per line for better readability. #[rustfmt::skip] - pub use crate::statement::{ + pub use crate::statements::statement::{ CassStatement, cass_statement_bind_bool, cass_statement_bind_bool_by_name, @@ -483,7 +483,7 @@ pub mod statement { pub mod prepared { // Disabling rustfmt to have one item per line for better readability. #[rustfmt::skip] - pub use crate::prepared::{ + pub use crate::statements::prepared::{ CassPrepared, cass_prepared_bind, cass_prepared_free, @@ -497,7 +497,7 @@ pub mod prepared { pub mod batch { // Disabling rustfmt to have one item per line for better readability. #[rustfmt::skip] - pub use crate::batch::{ + pub use crate::statements::batch::{ CassBatch, CassBatchType, cass_batch_add_statement, @@ -526,7 +526,7 @@ pub mod batch { pub mod data_type { // Disabling rustfmt to have one item per line for better readability. #[rustfmt::skip] - pub use crate::cass_types::{ + pub use crate::cql_types::data_type::{ CassDataType, cass_data_sub_type_count, cass_data_type_add_sub_type, @@ -562,7 +562,7 @@ pub mod data_type { pub mod collection { // Disabling rustfmt to have one item per line for better readability. #[rustfmt::skip] - pub use crate::collection::{ + pub use crate::cql_types::collection::{ CassCollection, cass_collection_append_bool, cass_collection_append_bytes, @@ -594,7 +594,7 @@ pub mod collection { pub mod tuple { // Disabling rustfmt to have one item per line for better readability. #[rustfmt::skip] - pub use crate::tuple::{ + pub use crate::cql_types::tuple::{ CassTuple, cass_tuple_data_type, cass_tuple_free, @@ -627,7 +627,7 @@ pub mod tuple { pub mod user_type { // Disabling rustfmt to have one item per line for better readability. #[rustfmt::skip] - pub use crate::user_type::{ + pub use crate::cql_types::user_type::{ CassUserType, cass_user_type_data_type, cass_user_type_free, @@ -832,7 +832,7 @@ pub mod value { pub mod uuid_gen { // Disabling rustfmt to have one item per line for better readability. #[rustfmt::skip] - pub use crate::uuid::{ + pub use crate::cql_types::uuid::{ CassUuidGen, cass_uuid_gen_free, cass_uuid_gen_from_time, @@ -846,7 +846,8 @@ pub mod uuid_gen { pub mod uuid { // Disabling rustfmt to have one item per line for better readability. #[rustfmt::skip] - pub use crate::uuid::{ + pub use crate::cql_types::uuid::{ + CassUuid, cass_uuid_from_string, cass_uuid_from_string_n, cass_uuid_max_from_time, @@ -895,16 +896,16 @@ pub mod custom_payload { pub mod consistency { // Disabling rustfmt to have one item per line for better readability. #[rustfmt::skip] - pub use crate::misc::{ + pub use crate::cql_types::{ CassConsistency, - cass_consistency_string + cass_consistency_string, }; } pub mod write_type { // Disabling rustfmt to have one item per line for better readability. #[rustfmt::skip] - pub use crate::misc::{ + pub use crate::cql_types::{ CassWriteType, cass_write_type_string }; @@ -927,7 +928,7 @@ pub mod log { pub mod inet { // Disabling rustfmt to have one item per line for better readability. #[rustfmt::skip] - pub use crate::inet::{ + pub use crate::cql_types::inet::{ CassInet, cass_inet_from_string, cass_inet_from_string_n, @@ -940,7 +941,7 @@ pub mod inet { pub mod date_time { // Disabling rustfmt to have one item per line for better readability. #[rustfmt::skip] - pub use crate::date_time::{ + pub use crate::cql_types::date_time::{ cass_date_from_epoch, cass_date_time_to_epoch, cass_time_from_epoch @@ -955,7 +956,7 @@ pub mod alloc { pub mod integration_testing { // Disabling rustfmt to have one item per line for better readability. #[rustfmt::skip] - pub use crate::integration_testing::{ + pub use crate::testing::integration::{ IgnoringRetryPolicy, testing_batch_set_sleeping_history_listener, testing_cluster_get_connect_timeout, @@ -974,7 +975,7 @@ pub mod integration_testing { /// and at the same time the functions are not yet implemented in the wrapper. // Disabling rustfmt to have one item per line for better readability. #[rustfmt::skip] - pub use crate::integration_testing::stubs::{ + pub use crate::testing::integration::stubs::{ CassAggregateMeta, CassAuthenticator, CassCustomPayload, diff --git a/scylla-rust-wrapper/src/argconv.rs b/scylla-rust-wrapper/src/argconv.rs index 3c9a9849..887be340 100644 --- a/scylla-rust-wrapper/src/argconv.rs +++ b/scylla-rust-wrapper/src/argconv.rs @@ -91,7 +91,7 @@ mod sealed { /// There is no way to obtain a mutable reference from such pointer. /// /// In some cases, we need to be able to mutate the data behind a shared pointer. -/// There is an example of such use case - namely [`crate::cass_types::CassDataType`]. +/// There is an example of such use case - namely [`crate::cql_types::data_type::CassDataType`]. /// argconv API does not provide a way to mutate such pointer - one can only convert the pointer /// to [`Arc`] or &. It is the API user's responsibility to implement sound interior mutability /// pattern in such case. This is what we currently do - CassDataType wraps CassDataTypeInner @@ -626,7 +626,7 @@ impl BoxFFI for T where T: FFI {} /// C API user should be responsible for freeing (decreasing reference count of) /// associated memory manually via corresponding API call. /// -/// An example of such implementor would be [`CassDataType`](crate::cass_types::CassDataType): +/// An example of such implementor would be [`CassDataType`](crate::cql_types::data_type::CassDataType): /// - it is allocated on the heap via [`Arc::new`] /// - there are multiple owners of the shared CassDataType object /// - some API functions require to increase a reference count of the object diff --git a/scylla-rust-wrapper/src/binding.rs b/scylla-rust-wrapper/src/binding.rs index 2574c28a..c4eb1acf 100644 --- a/scylla-rust-wrapper/src/binding.rs +++ b/scylla-rust-wrapper/src/binding.rs @@ -59,7 +59,7 @@ macro_rules! make_index_binder { ) -> CassError { // For some reason detected as unused, which is not true #[allow(unused_imports)] - use crate::value::CassCqlValue::*; + use crate::cql_types::value::CassCqlValue::*; let Some(this) = BoxFFI::as_mut_ref(this) else { tracing::error!("Provided null pointer to {}!", stringify!($fn_by_idx)); return CassError::CASS_ERROR_LIB_BAD_PARAMS; @@ -83,7 +83,7 @@ macro_rules! make_name_binder { ) -> CassError { // For some reason detected as unused, which is not true #[allow(unused_imports)] - use crate::value::CassCqlValue::*; + use crate::cql_types::value::CassCqlValue::*; let Some(this) = BoxFFI::as_mut_ref(this) else { tracing::error!("Provided null pointer to {}!", stringify!($fn_by_name)); return CassError::CASS_ERROR_LIB_BAD_PARAMS; @@ -109,7 +109,7 @@ macro_rules! make_name_n_binder { ) -> CassError { // For some reason detected as unused, which is not true #[allow(unused_imports)] - use crate::value::CassCqlValue::*; + use crate::cql_types::value::CassCqlValue::*; let Some(this) = BoxFFI::as_mut_ref(this) else { tracing::error!("Provided null pointer to {}!", stringify!($fn_by_name_n)); return CassError::CASS_ERROR_LIB_BAD_PARAMS; @@ -133,7 +133,7 @@ macro_rules! make_appender { ) -> CassError { // For some reason detected as unused, which is not true #[allow(unused_imports)] - use crate::value::CassCqlValue::*; + use crate::cql_types::value::CassCqlValue::*; let Some(this) = BoxFFI::as_mut_ref(this) else { tracing::error!("Provided null pointer to {}!", stringify!($fn_append)); return CassError::CASS_ERROR_LIB_BAD_PARAMS; @@ -263,8 +263,8 @@ macro_rules! invoke_binder_maker_macro_with_type { $this, $consume_v, $fn, - |v: crate::uuid::CassUuid| Ok(Some(Uuid(v.into()))), - [v @ crate::uuid::CassUuid] + |v: crate::cql_types::uuid::CassUuid| Ok(Some(Uuid(v.into()))), + [v @ crate::cql_types::uuid::CassUuid] ); }; (inet, $macro_name:ident, $this:ty, $consume_v:expr, $fn:ident) => { @@ -272,7 +272,7 @@ macro_rules! invoke_binder_maker_macro_with_type { $this, $consume_v, $fn, - |v: crate::inet::CassInet| { + |v: crate::cql_types::inet::CassInet| { // Err if length in struct is invalid. // cppdriver doesn't check this - it encodes any length given to it // but it doesn't seem like something we wanna do. Also, rust driver can't @@ -282,7 +282,7 @@ macro_rules! invoke_binder_maker_macro_with_type { Err(_) => Err(CassError::CASS_ERROR_LIB_INVALID_VALUE_TYPE), } }, - [v @ crate::inet::CassInet] + [v @ crate::cql_types::inet::CassInet] ); }; (duration, $macro_name:ident, $this:ty, $consume_v:expr, $fn:ident) => { @@ -318,10 +318,10 @@ macro_rules! invoke_binder_maker_macro_with_type { $this, $consume_v, $fn, - |p: CassBorrowedSharedPtr| { + |p: CassBorrowedSharedPtr| { Ok(Some(std::convert::Into::into(BoxFFI::as_ref(p).unwrap()))) }, - [p @ CassBorrowedSharedPtr] + [p @ CassBorrowedSharedPtr] ); }; (tuple, $macro_name:ident, $this:ty, $consume_v:expr, $fn:ident) => { @@ -329,10 +329,10 @@ macro_rules! invoke_binder_maker_macro_with_type { $this, $consume_v, $fn, - |p: CassBorrowedSharedPtr| { + |p: CassBorrowedSharedPtr| { Ok(Some(BoxFFI::as_ref(p).unwrap().into())) }, - [p @ CassBorrowedSharedPtr] + [p @ CassBorrowedSharedPtr] ); }; (user_type, $macro_name:ident, $this:ty, $consume_v:expr, $fn:ident) => { @@ -340,10 +340,10 @@ macro_rules! invoke_binder_maker_macro_with_type { $this, $consume_v, $fn, - |p: CassBorrowedSharedPtr| { + |p: CassBorrowedSharedPtr| { Ok(Some(BoxFFI::as_ref(p).unwrap().into())) }, - [p @ CassBorrowedSharedPtr] + [p @ CassBorrowedSharedPtr] ); }; } diff --git a/scylla-rust-wrapper/src/cass_error.rs b/scylla-rust-wrapper/src/cass_error.rs index fb05a183..74eeecac 100644 --- a/scylla-rust-wrapper/src/cass_error.rs +++ b/scylla-rust-wrapper/src/cass_error.rs @@ -1,6 +1,5 @@ use crate::argconv::*; -use crate::cass_types::CassConsistency; -use crate::misc::CassWriteType; +use crate::cql_types::{CassConsistency, CassWriteType}; use crate::types::*; use libc::c_char; use scylla::deserialize::DeserializationError; @@ -15,7 +14,7 @@ use thiserror::Error; // Re-export error types. pub use crate::cass_error_types::{CassError, CassErrorSource}; -use crate::statement::UnknownNamedParameterError; +use crate::statements::statement::UnknownNamedParameterError; pub(crate) trait ToCassError { fn to_cass_error(&self) -> CassError; diff --git a/scylla-rust-wrapper/src/cluster.rs b/scylla-rust-wrapper/src/cluster.rs index 8a2f9169..5f3cd233 100644 --- a/scylla-rust-wrapper/src/cluster.rs +++ b/scylla-rust-wrapper/src/cluster.rs @@ -1,7 +1,8 @@ use crate::argconv::*; use crate::cass_error::CassError; -use crate::cass_types::CassConsistency; use crate::config_value::MaybeUnsetConfig; +use crate::cql_types::CassConsistency; +use crate::cql_types::uuid::CassUuid; use crate::exec_profile::{CassExecProfile, ExecProfileName, exec_profile_builder_modify}; use crate::load_balancing::{ CassHostFilter, DcRestriction, LoadBalancingConfig, LoadBalancingKind, @@ -11,7 +12,6 @@ use crate::runtime::{RUNTIMES, Runtime}; use crate::ssl::CassSsl; use crate::timestamp_generator::CassTimestampGen; use crate::types::*; -use crate::uuid::CassUuid; use openssl::ssl::SslContextBuilder; use openssl_sys::SSL_CTX_up_ref; use scylla::client::execution_profile::ExecutionProfileBuilder; @@ -1617,7 +1617,7 @@ pub unsafe extern "C" fn cass_cluster_set_metadata_request_serverside_timeout( #[cfg(test)] mod tests { - use crate::testing::{assert_cass_error_eq, setup_tracing}; + use crate::testing::utils::{assert_cass_error_eq, setup_tracing}; use super::*; use crate::{ diff --git a/scylla-rust-wrapper/src/config_value.rs b/scylla-rust-wrapper/src/config_value.rs index 73c30b61..3549d7cf 100644 --- a/scylla-rust-wrapper/src/config_value.rs +++ b/scylla-rust-wrapper/src/config_value.rs @@ -5,7 +5,7 @@ use scylla::{ statement::{Consistency, SerialConsistency}, }; -use crate::{cass_types::CassConsistency, retry_policy::CassRetryPolicy, types::cass_uint64_t}; +use crate::{cql_types::CassConsistency, retry_policy::CassRetryPolicy, types::cass_uint64_t}; /// Represents a configuration value that may or may not be set. /// If a configuration value is unset, it means that the default value diff --git a/scylla-rust-wrapper/src/collection.rs b/scylla-rust-wrapper/src/cql_types/collection.rs similarity index 97% rename from scylla-rust-wrapper/src/collection.rs rename to scylla-rust-wrapper/src/cql_types/collection.rs index 7daffb54..c61d5bed 100644 --- a/scylla-rust-wrapper/src/collection.rs +++ b/scylla-rust-wrapper/src/cql_types/collection.rs @@ -1,9 +1,9 @@ +use crate::argconv::*; use crate::cass_collection_types::CassCollectionType; use crate::cass_error::CassError; -use crate::cass_types::{CassDataType, CassDataTypeInner, MapDataType}; +use crate::cql_types::data_type::{CassDataType, CassDataTypeInner, MapDataType}; +use crate::cql_types::value::{self, CassCqlValue}; use crate::types::*; -use crate::value::CassCqlValue; -use crate::{argconv::*, value}; use std::convert::TryFrom; use std::sync::Arc; use std::sync::LazyLock; @@ -281,14 +281,17 @@ mod tests { use crate::{ argconv::ArcFFI, cass_error::CassError, - cass_types::{ - CassDataType, CassDataTypeInner, CassValueType, MapDataType, - cass_data_type_add_sub_type, cass_data_type_free, cass_data_type_new, - }, - collection::{ - cass_collection_append_double, cass_collection_append_float, cass_collection_free, + cql_types::{ + CassValueType, + collection::{ + cass_collection_append_double, cass_collection_append_float, cass_collection_free, + }, + data_type::{ + CassDataType, CassDataTypeInner, MapDataType, cass_data_type_add_sub_type, + cass_data_type_free, cass_data_type_new, + }, }, - testing::assert_cass_error_eq, + testing::utils::assert_cass_error_eq, }; use super::{ diff --git a/scylla-rust-wrapper/src/cass_types.rs b/scylla-rust-wrapper/src/cql_types/data_type.rs similarity index 94% rename from scylla-rust-wrapper/src/cass_types.rs rename to scylla-rust-wrapper/src/cql_types/data_type.rs index b2ec9e17..a9ffa06c 100644 --- a/scylla-rust-wrapper/src/cass_types.rs +++ b/scylla-rust-wrapper/src/cql_types/data_type.rs @@ -1,5 +1,7 @@ use crate::argconv::*; use crate::cass_error::CassError; +use crate::cql_types::CassValueType; +use crate::statements::batch::CassBatchType; use crate::types::*; use scylla::cluster::metadata::{CollectionType, NativeType}; use scylla::frame::response::result::ColumnType; @@ -8,10 +10,6 @@ use std::cell::UnsafeCell; use std::os::raw::c_char; use std::sync::Arc; -pub use crate::cass_batch_types::CassBatchType; -pub use crate::cass_consistency_types::CassConsistency; -pub use crate::cass_data_types::CassValueType; - #[derive(Clone, Debug)] #[cfg_attr(test, derive(PartialEq, Eq))] pub(crate) struct UdtDataType { @@ -324,10 +322,7 @@ impl CassDataTypeInner { fn add_sub_data_type(&mut self, sub_type: Arc) -> Result<(), CassError> { match self { CassDataTypeInner::List { typ, .. } | CassDataTypeInner::Set { typ, .. } => match typ { - Some(_) => { - tracing::error!("Trying to add sub-type to already typed list/set!"); - Err(CassError::CASS_ERROR_LIB_BAD_PARAMS) - } + Some(_) => Err(CassError::CASS_ERROR_LIB_BAD_PARAMS), None => { *typ = Some(sub_type); Ok(()) @@ -336,10 +331,7 @@ impl CassDataTypeInner { CassDataTypeInner::Map { typ: MapDataType::KeyAndValue(_, _), .. - } => { - tracing::error!("Trying to add sub-type to already fully typed map!"); - Err(CassError::CASS_ERROR_LIB_BAD_PARAMS) - } + } => Err(CassError::CASS_ERROR_LIB_BAD_PARAMS), CassDataTypeInner::Map { typ: MapDataType::Key(k), frozen, @@ -364,10 +356,7 @@ impl CassDataTypeInner { types.push(sub_type); Ok(()) } - _ => { - tracing::error!("Trying to add sub-type to non-collection/tuple data type!"); - Err(CassError::CASS_ERROR_LIB_INVALID_VALUE_TYPE) - } + _ => Err(CassError::CASS_ERROR_LIB_INVALID_VALUE_TYPE), } } @@ -560,10 +549,7 @@ pub unsafe extern "C" fn cass_data_type_type_name( unsafe { write_str_to_c(name, type_name, type_name_length) }; CassError::CASS_OK } - _ => { - tracing::error!("Trying to get type name from non-UDT data type!"); - CassError::CASS_ERROR_LIB_INVALID_VALUE_TYPE - } + _ => CassError::CASS_ERROR_LIB_INVALID_VALUE_TYPE, } } @@ -595,10 +581,7 @@ pub unsafe extern "C" fn cass_data_type_set_type_name_n( udt_data_type.name = type_name_string; CassError::CASS_OK } - _ => { - tracing::error!("Trying to set type name on non-UDT data type!"); - CassError::CASS_ERROR_LIB_INVALID_VALUE_TYPE - } + _ => CassError::CASS_ERROR_LIB_INVALID_VALUE_TYPE, } } @@ -618,10 +601,7 @@ pub unsafe extern "C" fn cass_data_type_keyspace( unsafe { write_str_to_c(name, keyspace, keyspace_length) }; CassError::CASS_OK } - _ => { - tracing::error!("Trying to get keyspace from non-UDT data type!"); - CassError::CASS_ERROR_LIB_INVALID_VALUE_TYPE - } + _ => CassError::CASS_ERROR_LIB_INVALID_VALUE_TYPE, } } @@ -653,10 +633,7 @@ pub unsafe extern "C" fn cass_data_type_set_keyspace_n( udt_data_type.keyspace = keyspace_string; CassError::CASS_OK } - _ => { - tracing::error!("Trying to set keyspace on non-UDT data type!"); - CassError::CASS_ERROR_LIB_INVALID_VALUE_TYPE - } + _ => CassError::CASS_ERROR_LIB_INVALID_VALUE_TYPE, } } @@ -676,10 +653,7 @@ pub unsafe extern "C" fn cass_data_type_class_name( unsafe { write_str_to_c(name, class_name, class_name_length) }; CassError::CASS_OK } - _ => { - tracing::error!("Trying to get class name from non-Custom data type!"); - CassError::CASS_ERROR_LIB_INVALID_VALUE_TYPE - } + _ => CassError::CASS_ERROR_LIB_INVALID_VALUE_TYPE, } } @@ -710,10 +684,7 @@ pub unsafe extern "C" fn cass_data_type_set_class_name_n( *name = class_string; CassError::CASS_OK } - _ => { - tracing::error!("Trying to set class name on non-Custom data type!"); - CassError::CASS_ERROR_LIB_INVALID_VALUE_TYPE - } + _ => CassError::CASS_ERROR_LIB_INVALID_VALUE_TYPE, } } @@ -820,10 +791,7 @@ pub unsafe extern "C" fn cass_data_type_sub_type_name( CassError::CASS_OK } }, - _ => { - tracing::error!("Trying to get sub-type name from non-UDT data type!"); - CassError::CASS_ERROR_LIB_INVALID_VALUE_TYPE - } + _ => CassError::CASS_ERROR_LIB_INVALID_VALUE_TYPE, } } @@ -887,10 +855,7 @@ pub unsafe extern "C" fn cass_data_type_add_sub_type_by_name_n( udt_data_type.field_types.push((name_string, sub_data_type)); CassError::CASS_OK } - _ => { - tracing::error!("Trying to add sub-type by name to non-UDT data type!"); - CassError::CASS_ERROR_LIB_INVALID_VALUE_TYPE - } + _ => CassError::CASS_ERROR_LIB_INVALID_VALUE_TYPE, } } diff --git a/scylla-rust-wrapper/src/date_time.rs b/scylla-rust-wrapper/src/cql_types/date_time.rs similarity index 100% rename from scylla-rust-wrapper/src/date_time.rs rename to scylla-rust-wrapper/src/cql_types/date_time.rs diff --git a/scylla-rust-wrapper/src/inet.rs b/scylla-rust-wrapper/src/cql_types/inet.rs similarity index 100% rename from scylla-rust-wrapper/src/inet.rs rename to scylla-rust-wrapper/src/cql_types/inet.rs diff --git a/scylla-rust-wrapper/src/misc.rs b/scylla-rust-wrapper/src/cql_types/mod.rs similarity index 83% rename from scylla-rust-wrapper/src/misc.rs rename to scylla-rust-wrapper/src/cql_types/mod.rs index ef61f0ef..36732950 100644 --- a/scylla-rust-wrapper/src/misc.rs +++ b/scylla-rust-wrapper/src/cql_types/mod.rs @@ -1,6 +1,17 @@ -use std::ffi::{CStr, c_char}; +pub(crate) mod collection; +pub(crate) mod data_type; +pub(crate) mod date_time; +pub(crate) mod inet; +pub(crate) mod tuple; +pub(crate) mod user_type; +pub(crate) mod uuid; +pub(crate) mod value; + +pub use crate::cass_consistency_types::CassConsistency; +pub use crate::cass_data_types::CassValueType; +pub use crate::cass_error_types::CassWriteType; -pub use crate::{cass_error_types::CassWriteType, cass_types::CassConsistency}; +use std::ffi::{CStr, c_char}; impl CassConsistency { pub(crate) fn as_cstr(&self) -> &'static CStr { diff --git a/scylla-rust-wrapper/src/tuple.rs b/scylla-rust-wrapper/src/cql_types/tuple.rs similarity index 94% rename from scylla-rust-wrapper/src/tuple.rs rename to scylla-rust-wrapper/src/cql_types/tuple.rs index e22a8866..610d7d06 100644 --- a/scylla-rust-wrapper/src/tuple.rs +++ b/scylla-rust-wrapper/src/cql_types/tuple.rs @@ -1,10 +1,9 @@ use crate::argconv::*; use crate::cass_error::CassError; -use crate::cass_types::CassDataType; -use crate::cass_types::CassDataTypeInner; +use crate::cql_types::data_type::{CassDataType, CassDataTypeInner}; +use crate::cql_types::value; +use crate::cql_types::value::CassCqlValue; use crate::types::*; -use crate::value; -use crate::value::CassCqlValue; use std::sync::Arc; use std::sync::LazyLock; @@ -136,8 +135,11 @@ make_binders!(user_type, cass_tuple_set_user_type); #[cfg(test)] mod tests { - use crate::cass_types::{ - CassValueType, cass_data_type_add_sub_type, cass_data_type_free, cass_data_type_new, + use crate::{ + cql_types::CassValueType, + cql_types::data_type::{ + cass_data_type_add_sub_type, cass_data_type_free, cass_data_type_new, + }, }; use super::{cass_tuple_data_type, cass_tuple_new}; diff --git a/scylla-rust-wrapper/src/user_type.rs b/scylla-rust-wrapper/src/cql_types/user_type.rs similarity index 97% rename from scylla-rust-wrapper/src/user_type.rs rename to scylla-rust-wrapper/src/cql_types/user_type.rs index 946ab101..8ab8e79a 100644 --- a/scylla-rust-wrapper/src/user_type.rs +++ b/scylla-rust-wrapper/src/cql_types/user_type.rs @@ -1,8 +1,8 @@ +use crate::argconv::*; use crate::cass_error::CassError; -use crate::cass_types::{CassDataType, CassDataTypeInner}; +use crate::cql_types::data_type::{CassDataType, CassDataTypeInner}; +use crate::cql_types::value::{self, CassCqlValue}; use crate::types::*; -use crate::value::CassCqlValue; -use crate::{argconv::*, value}; use std::os::raw::c_char; use std::sync::Arc; diff --git a/scylla-rust-wrapper/src/uuid.rs b/scylla-rust-wrapper/src/cql_types/uuid.rs similarity index 99% rename from scylla-rust-wrapper/src/uuid.rs rename to scylla-rust-wrapper/src/cql_types/uuid.rs index 7439cb32..91d20234 100644 --- a/scylla-rust-wrapper/src/uuid.rs +++ b/scylla-rust-wrapper/src/cql_types/uuid.rs @@ -10,7 +10,7 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{SystemTime, UNIX_EPOCH}; use uuid::Uuid; -pub(crate) use crate::cass_uuid_types::CassUuid; +pub use crate::cass_uuid_types::CassUuid; pub struct CassUuidGen { pub(crate) clock_seq_and_node: cass_uint64_t, diff --git a/scylla-rust-wrapper/src/value.rs b/scylla-rust-wrapper/src/cql_types/value.rs similarity index 99% rename from scylla-rust-wrapper/src/value.rs rename to scylla-rust-wrapper/src/cql_types/value.rs index 06933778..3a8bfe46 100644 --- a/scylla-rust-wrapper/src/value.rs +++ b/scylla-rust-wrapper/src/cql_types/value.rs @@ -11,7 +11,8 @@ use scylla::serialize::writers::{CellWriter, WrittenCellProof}; use scylla::value::{CqlDate, CqlDecimal, CqlDuration}; use uuid::Uuid; -use crate::cass_types::{CassDataType, CassValueType}; +use crate::cql_types::CassValueType; +use crate::cql_types::data_type::CassDataType; /// A narrower version of rust driver's CqlValue. /// @@ -416,8 +417,11 @@ mod tests { use scylla::value::{CqlDate, CqlDecimal, CqlDuration}; use crate::{ - cass_types::{CassDataType, CassDataTypeInner, CassValueType, MapDataType, UdtDataType}, - value::{CassCqlValue, is_type_compatible}, + cql_types::CassValueType, + cql_types::{ + data_type::{CassDataType, CassDataTypeInner, MapDataType, UdtDataType}, + value::{CassCqlValue, is_type_compatible}, + }, }; fn all_value_data_types() -> Vec { diff --git a/scylla-rust-wrapper/src/exec_profile.rs b/scylla-rust-wrapper/src/exec_profile.rs index 337704f6..f560b437 100644 --- a/scylla-rust-wrapper/src/exec_profile.rs +++ b/scylla-rust-wrapper/src/exec_profile.rs @@ -18,17 +18,17 @@ use crate::argconv::{ ArcFFI, BoxFFI, CMut, CassBorrowedExclusivePtr, CassBorrowedSharedPtr, CassOwnedExclusivePtr, FFI, FromBox, ptr_to_cstr_n, strlen, }; -use crate::batch::CassBatch; use crate::cass_error::CassError; -use crate::cass_types::CassConsistency; use crate::cluster::{ set_load_balance_dc_aware_n, set_load_balance_rack_aware_n, update_comma_delimited_list, }; use crate::config_value::{MaybeUnsetConfig, RequestTimeout}; +use crate::cql_types::CassConsistency; use crate::load_balancing::{LoadBalancingConfig, LoadBalancingKind}; use crate::retry_policy::CassRetryPolicy; use crate::session::CassConnectedSession; -use crate::statement::CassStatement; +use crate::statements::batch::CassBatch; +use crate::statements::statement::CassStatement; use crate::types::{ cass_bool_t, cass_double_t, cass_int32_t, cass_int64_t, cass_uint32_t, cass_uint64_t, size_t, }; @@ -869,15 +869,17 @@ mod tests { use super::*; use crate::argconv::CassPtr; + use crate::cql_types::CassConsistency; use crate::retry_policy::{ cass_retry_policy_downgrading_consistency_new, cass_retry_policy_free, }; - use crate::testing::{assert_cass_error_eq, setup_tracing}; + use crate::testing::utils::{assert_cass_error_eq, setup_tracing}; use crate::{ argconv::{make_c_str, str_to_c_str_n}, - batch::{cass_batch_add_statement, cass_batch_free, cass_batch_new}, - cass_types::CassBatchType, - statement::{cass_statement_free, cass_statement_new}, + statements::batch::{ + CassBatchType, cass_batch_add_statement, cass_batch_free, cass_batch_new, + }, + statements::statement::{cass_statement_free, cass_statement_new}, }; use assert_matches::assert_matches; diff --git a/scylla-rust-wrapper/src/future.rs b/scylla-rust-wrapper/src/future.rs index 971bf95c..a39f6fd7 100644 --- a/scylla-rust-wrapper/src/future.rs +++ b/scylla-rust-wrapper/src/future.rs @@ -1,10 +1,10 @@ use crate::argconv::*; use crate::cass_error::{CassError, CassErrorMessage, CassErrorResult, ToCassError as _}; -use crate::prepared::CassPrepared; +use crate::cql_types::uuid::CassUuid; use crate::query_result::{CassNode, CassResult}; use crate::runtime::Runtime; +use crate::statements::prepared::CassPrepared; use crate::types::*; -use crate::uuid::CassUuid; use futures::future; use std::future::Future; use std::mem; @@ -84,7 +84,7 @@ struct ResolvableFuture { wait_for_value: Condvar, #[cfg(cpp_integration_testing)] - recording_listener: Option>, + recording_listener: Option>, } pub struct CassFuture { @@ -119,7 +119,7 @@ impl CassFuture { runtime: Arc, fut: impl Future + Send + 'static, #[cfg(cpp_integration_testing)] recording_listener: Option< - Arc, + Arc, >, ) -> CassOwnedSharedPtr { Self::new_from_future( @@ -135,7 +135,7 @@ impl CassFuture { runtime: Arc, fut: impl Future + Send + 'static, #[cfg(cpp_integration_testing)] recording_listener: Option< - Arc, + Arc, >, ) -> Arc { let cass_fut = Arc::new(CassFuture { @@ -675,7 +675,7 @@ pub unsafe extern "C" fn cass_future_coordinator( #[cfg(test)] mod tests { - use crate::testing::{assert_cass_error_eq, assert_cass_future_error_message_eq}; + use crate::testing::utils::{assert_cass_error_eq, assert_cass_future_error_message_eq}; use super::*; use std::{ diff --git a/scylla-rust-wrapper/src/iterator.rs b/scylla-rust-wrapper/src/iterator.rs index e19bb807..636c8302 100644 --- a/scylla-rust-wrapper/src/iterator.rs +++ b/scylla-rust-wrapper/src/iterator.rs @@ -6,7 +6,8 @@ use crate::argconv::{ CassOwnedExclusivePtr, FFI, FromBox, RefFFI, write_str_to_c, }; use crate::cass_error::CassError; -use crate::cass_types::{CassDataType, CassDataTypeInner, CassValueType, MapDataType}; +use crate::cql_types::CassValueType; +use crate::cql_types::data_type::{CassDataType, CassDataTypeInner, MapDataType}; use crate::metadata::{ CassColumnMeta, CassKeyspaceMeta, CassMaterializedViewMeta, CassSchemaMeta, CassTableMeta, }; diff --git a/scylla-rust-wrapper/src/lib.rs b/scylla-rust-wrapper/src/lib.rs index af0665a0..096f7c7d 100644 --- a/scylla-rust-wrapper/src/lib.rs +++ b/scylla-rust-wrapper/src/lib.rs @@ -10,39 +10,25 @@ mod binding; pub mod api; // pub, because doctests defined in `argconv` module need to access it. pub mod argconv; -pub(crate) mod batch; pub(crate) mod cass_error; -pub(crate) mod cass_types; pub(crate) mod cluster; -pub(crate) mod collection; pub(crate) mod config_value; -pub(crate) mod date_time; +pub(crate) mod cql_types; pub(crate) mod exec_profile; pub(crate) mod future; -pub(crate) mod inet; -#[cfg(cpp_integration_testing)] -pub(crate) mod integration_testing; pub(crate) mod iterator; mod load_balancing; mod logging; pub(crate) mod metadata; -pub(crate) mod misc; -pub(crate) mod prepared; pub(crate) mod query_result; pub(crate) mod retry_policy; pub(crate) mod runtime; -#[cfg(test)] -mod ser_de_tests; pub(crate) mod session; pub(crate) mod ssl; -pub(crate) mod statement; -#[cfg(test)] +pub(crate) mod statements; +#[cfg(any(test, cpp_integration_testing))] pub(crate) mod testing; pub(crate) mod timestamp_generator; -pub(crate) mod tuple; -pub(crate) mod user_type; -pub(crate) mod uuid; -pub(crate) mod value; /// Includes a file generated by bindgen called `filename`. macro_rules! include_bindgen_generated { diff --git a/scylla-rust-wrapper/src/metadata.rs b/scylla-rust-wrapper/src/metadata.rs index ca68a71e..667434f4 100644 --- a/scylla-rust-wrapper/src/metadata.rs +++ b/scylla-rust-wrapper/src/metadata.rs @@ -1,7 +1,6 @@ use crate::argconv::*; use crate::cass_column_types::CassColumnType; -use crate::cass_types::CassDataType; -use crate::cass_types::get_column_type; +use crate::cql_types::data_type::{CassDataType, get_column_type}; use crate::types::*; use scylla::cluster::metadata::{ColumnKind, Table}; use std::collections::HashMap; diff --git a/scylla-rust-wrapper/src/query_result.rs b/scylla-rust-wrapper/src/query_result.rs index 4c6615bf..7bce5280 100644 --- a/scylla-rust-wrapper/src/query_result.rs +++ b/scylla-rust-wrapper/src/query_result.rs @@ -1,14 +1,14 @@ use crate::argconv::*; use crate::cass_error::CassErrorResult; use crate::cass_error::{CassError, ToCassError}; -pub use crate::cass_types::CassValueType; -use crate::cass_types::{ +pub use crate::cql_types::CassValueType; +use crate::cql_types::data_type::{ CassColumnSpec, CassDataType, CassDataTypeInner, MapDataType, cass_data_type_type, get_column_type, }; -use crate::inet::CassInet; +use crate::cql_types::inet::CassInet; +use crate::cql_types::uuid::CassUuid; use crate::types::*; -use crate::uuid::CassUuid; use cass_raw_value::CassRawValue; use row_with_self_borrowed_result_data::RowWithSelfBorrowedResultData; use scylla::cluster::metadata::{ColumnType, NativeType}; @@ -1178,11 +1178,11 @@ mod tests { use scylla::response::query_result::ColumnSpecs; use crate::argconv::{CConst, CassBorrowedSharedPtr, ptr_to_cstr_n}; - use crate::cass_types::{CassDataType, CassDataTypeInner}; + use crate::cql_types::data_type::{CassDataType, CassDataTypeInner}; use crate::{ argconv::{ArcFFI, RefFFI}, cass_error::CassError, - cass_types::CassValueType, + cql_types::CassValueType, query_result::{ cass_result_column_data_type, cass_result_column_name, cass_result_first_row, size_t, }, diff --git a/scylla-rust-wrapper/src/retry_policy.rs b/scylla-rust-wrapper/src/retry_policy.rs index 2a85a128..364a602c 100644 --- a/scylla-rust-wrapper/src/retry_policy.rs +++ b/scylla-rust-wrapper/src/retry_policy.rs @@ -69,7 +69,7 @@ pub enum CassRetryPolicy { DowngradingConsistency(Arc), Logging(Arc), #[cfg(cpp_integration_testing)] - Ignoring(Arc), + Ignoring(Arc), } impl RetryPolicy for CassRetryPolicy { diff --git a/scylla-rust-wrapper/src/runtime.rs b/scylla-rust-wrapper/src/runtime.rs index 20f92b9d..343eea33 100644 --- a/scylla-rust-wrapper/src/runtime.rs +++ b/scylla-rust-wrapper/src/runtime.rs @@ -136,7 +136,7 @@ mod tests { cluster::{cass_cluster_free, cass_cluster_new, cass_cluster_set_contact_points_n}, future::cass_future_free, session::{cass_session_close, cass_session_connect, cass_session_free, cass_session_new}, - testing::{ + testing::utils::{ assert_cass_error_eq, cass_future_wait_check_and_free, mock_init_rules, rusty_fork_test_with_proxy, setup_tracing, test_with_one_proxy_at_ip, }, diff --git a/scylla-rust-wrapper/src/session.rs b/scylla-rust-wrapper/src/session.rs index 19a1aa3a..a208d99e 100644 --- a/scylla-rust-wrapper/src/session.rs +++ b/scylla-rust-wrapper/src/session.rs @@ -1,19 +1,19 @@ use crate::argconv::*; -use crate::batch::CassBatch; use crate::cass_error::*; use crate::cass_metrics_types::CassMetrics; -use crate::cass_types::get_column_type; use crate::cluster::CassCluster; +use crate::cql_types::data_type::get_column_type; +use crate::cql_types::uuid::CassUuid; use crate::exec_profile::{CassExecProfile, ExecProfileName, PerStatementExecProfile}; use crate::future::{CassFuture, CassFutureResult, CassResultValue}; use crate::metadata::create_table_metadata; use crate::metadata::{CassKeyspaceMeta, CassMaterializedViewMeta, CassSchemaMeta}; -use crate::prepared::CassPrepared; use crate::query_result::{CassResult, CassResultKind, CassResultMetadata}; use crate::runtime::Runtime; -use crate::statement::{BoundStatement, CassStatement, SimpleQueryRowSerializer}; +use crate::statements::batch::CassBatch; +use crate::statements::prepared::CassPrepared; +use crate::statements::statement::{BoundStatement, CassStatement, SimpleQueryRowSerializer}; use crate::types::size_t; -use crate::uuid::CassUuid; use scylla::client::execution_profile::ExecutionProfileHandle; use scylla::client::session::Session; use scylla::client::session_builder::SessionBuilder; @@ -389,7 +389,7 @@ pub unsafe extern "C" fn cass_session_execute( #[cfg(cpp_integration_testing)] let recording_listener = statement_opt.record_hosts.then(|| { let recording_listener = - Arc::new(crate::integration_testing::RecordingHistoryListener::new()); + Arc::new(crate::testing::integration::RecordingHistoryListener::new()); match statement { BoundStatement::Simple(ref mut unprepared) => { unprepared @@ -876,52 +876,32 @@ pub unsafe extern "C" fn cass_session_get_metrics( #[cfg(test)] mod tests { - use rusty_fork::rusty_fork_test; - use scylla::errors::DbError; - use scylla::frame::types::Consistency; use scylla_proxy::{Condition, RequestOpcode, RequestReaction, RequestRule, RunningProxy}; use tracing::instrument::WithSubscriber; use super::*; use crate::{ argconv::make_c_str, - batch::{ - cass_batch_add_statement, cass_batch_free, cass_batch_new, cass_batch_set_retry_policy, - }, - cass_types::CassBatchType, cluster::{ - cass_cluster_free, cass_cluster_new, cass_cluster_set_client_id, - cass_cluster_set_contact_points_n, cass_cluster_set_execution_profile, - cass_cluster_set_latency_aware_routing, cass_cluster_set_retry_policy, + cass_cluster_free, cass_cluster_new, cass_cluster_set_contact_points_n, + cass_cluster_set_execution_profile, }, exec_profile::{ ExecProfileName, cass_batch_set_execution_profile, cass_batch_set_execution_profile_n, cass_execution_profile_free, cass_execution_profile_new, - cass_execution_profile_set_latency_aware_routing, - cass_execution_profile_set_retry_policy, cass_statement_set_execution_profile, - cass_statement_set_execution_profile_n, - }, - future::{ - cass_future_error_code, cass_future_free, cass_future_set_callback, cass_future_wait, + cass_statement_set_execution_profile, cass_statement_set_execution_profile_n, }, - retry_policy::{ - CassRetryPolicy, cass_retry_policy_default_new, cass_retry_policy_fallthrough_new, + future::cass_future_error_code, + statements::batch::{ + CassBatchType, cass_batch_add_statement, cass_batch_free, cass_batch_new, }, - statement::{cass_statement_free, cass_statement_new, cass_statement_set_retry_policy}, - testing::{ + statements::statement::{cass_statement_free, cass_statement_new}, + testing::utils::{ assert_cass_error_eq, cass_future_wait_check_and_free, generic_drop_queries_rules, - handshake_rules, mock_init_rules, setup_tracing, test_with_one_proxy, + handshake_rules, setup_tracing, test_with_one_proxy, }, - types::cass_bool_t, - }; - use std::{ - collections::HashSet, - convert::{TryFrom, TryInto}, - ffi::{CStr, c_void}, - iter, - net::SocketAddr, - sync::atomic::{AtomicUsize, Ordering}, }; + use std::{collections::HashSet, iter, net::SocketAddr}; #[tokio::test] #[ntest::timeout(5000)] @@ -1304,658 +1284,4 @@ mod tests { } proxy } - - #[tokio::test] - #[ntest::timeout(30000)] - async fn retry_policy_on_statement_and_batch_is_handled_properly() { - setup_tracing(); - test_with_one_proxy( - retry_policy_on_statement_and_batch_is_handled_properly_do, - retry_policy_on_statement_and_batch_is_handled_properly_rules(), - ) - .with_current_subscriber() - .await; - } - - fn retry_policy_on_statement_and_batch_is_handled_properly_rules() - -> impl IntoIterator { - handshake_rules() - .into_iter() - .chain(iter::once(RequestRule( - Condition::RequestOpcode(RequestOpcode::Query) - .or(Condition::RequestOpcode(RequestOpcode::Batch)) - .and(Condition::BodyContainsCaseInsensitive(Box::new( - *b"SELECT host_id FROM system.", - ))) - // this 1 differentiates Fallthrough and Default retry policies. - .and(Condition::TrueForLimitedTimes(1)), - // We simulate the read timeout error in order to trigger DefaultRetryPolicy's - // retry on the same node. - // We don't use the example ReadTimeout error that is included in proxy, - // because in order to trigger a retry we need data_present=false. - RequestReaction::forge_with_error(DbError::ReadTimeout { - consistency: Consistency::All, - received: 1, - required: 1, - data_present: false, - }), - ))) - .chain(iter::once(RequestRule( - Condition::RequestOpcode(RequestOpcode::Query) - .or(Condition::RequestOpcode(RequestOpcode::Batch)) - .and(Condition::BodyContainsCaseInsensitive(Box::new( - *b"SELECT host_id FROM system.", - ))), - // We make the second attempt return a hard, nonrecoverable error. - RequestReaction::forge().read_failure(), - ))) - .chain(generic_drop_queries_rules()) - } - - // This test aims to verify that the retry policy emulation works properly, - // in any sequence of actions mutating the retry policy for a query. - // - // Below, the consecutive states of the test case are illustrated: - // Retry policy set on: ('F' - Fallthrough, 'D' - Default, '-' - no policy set) - // session default exec profile: F F F F F F F F F F F F F F - // per stmt/batch exec profile: - D - - D D D D D - - - D D - // stmt/batch (emulated): - - - F F - F D F F - D D - - fn retry_policy_on_statement_and_batch_is_handled_properly_do( - node_addr: SocketAddr, - mut proxy: RunningProxy, - ) -> RunningProxy { - unsafe { - let mut cluster_raw = cass_cluster_new(); - let ip = node_addr.ip().to_string(); - let (c_ip, c_ip_len) = str_to_c_str_n(ip.as_str()); - - assert_cass_error_eq!( - cass_cluster_set_contact_points_n(cluster_raw.borrow_mut(), c_ip, c_ip_len,), - CassError::CASS_OK - ); - - let fallthrough_policy = cass_retry_policy_fallthrough_new(); - let default_policy = cass_retry_policy_default_new(); - cass_cluster_set_retry_policy(cluster_raw.borrow_mut(), fallthrough_policy.borrow()); - - let session_raw = cass_session_new(); - - let mut profile_raw = cass_execution_profile_new(); - // A name of a profile that will have been registered in the Cluster. - let profile_name_c_str = make_c_str!("profile"); - - assert_cass_error_eq!( - cass_execution_profile_set_retry_policy( - profile_raw.borrow_mut(), - default_policy.borrow() - ), - CassError::CASS_OK - ); - - let query = make_c_str!("SELECT host_id FROM system.local WHERE key='local'"); - let mut statement_raw = cass_statement_new(query, 0); - let mut batch_raw = cass_batch_new(CassBatchType::CASS_BATCH_TYPE_LOGGED); - assert_cass_error_eq!( - cass_batch_add_statement(batch_raw.borrow_mut(), statement_raw.borrow()), - CassError::CASS_OK - ); - - assert_cass_error_eq!( - cass_cluster_set_execution_profile( - cluster_raw.borrow_mut(), - profile_name_c_str, - profile_raw.borrow_mut(), - ), - CassError::CASS_OK - ); - - cass_future_wait_check_and_free(cass_session_connect( - session_raw.borrow(), - cluster_raw.borrow().into_c_const(), - )); - { - unsafe fn execute_query( - session_raw: CassBorrowedSharedPtr, - statement_raw: CassBorrowedSharedPtr, - ) -> CassError { - unsafe { - cass_future_error_code( - cass_session_execute(session_raw, statement_raw).borrow(), - ) - } - } - unsafe fn execute_batch( - session_raw: CassBorrowedSharedPtr, - batch_raw: CassBorrowedSharedPtr, - ) -> CassError { - unsafe { - cass_future_error_code( - cass_session_execute_batch(session_raw, batch_raw).borrow(), - ) - } - } - - fn reset_proxy_rules(proxy: &mut RunningProxy) { - proxy.running_nodes[0].change_request_rules(Some( - retry_policy_on_statement_and_batch_is_handled_properly_rules() - .into_iter() - .collect(), - )) - } - - unsafe fn assert_query_with_fallthrough_policy( - proxy: &mut RunningProxy, - session_raw: CassBorrowedSharedPtr, - statement_raw: CassBorrowedSharedPtr, - batch_raw: CassBorrowedSharedPtr, - ) { - reset_proxy_rules(&mut *proxy); - unsafe { - assert_cass_error_eq!( - execute_query(session_raw.borrow(), statement_raw), - CassError::CASS_ERROR_SERVER_READ_TIMEOUT, - ); - reset_proxy_rules(&mut *proxy); - assert_cass_error_eq!( - execute_batch(session_raw, batch_raw), - CassError::CASS_ERROR_SERVER_READ_TIMEOUT, - ); - } - } - - unsafe fn assert_query_with_default_policy( - proxy: &mut RunningProxy, - session_raw: CassBorrowedSharedPtr, - statement_raw: CassBorrowedSharedPtr, - batch_raw: CassBorrowedSharedPtr, - ) { - reset_proxy_rules(&mut *proxy); - unsafe { - assert_cass_error_eq!( - execute_query(session_raw.borrow(), statement_raw), - CassError::CASS_ERROR_SERVER_READ_FAILURE - ); - reset_proxy_rules(&mut *proxy); - assert_cass_error_eq!( - execute_batch(session_raw, batch_raw), - CassError::CASS_ERROR_SERVER_READ_FAILURE - ); - } - } - - unsafe fn set_provided_exec_profile( - name: *const i8, - statement_raw: CassBorrowedExclusivePtr, - batch_raw: CassBorrowedExclusivePtr, - ) { - // Set statement/batch exec profile. - unsafe { - assert_cass_error_eq!( - cass_statement_set_execution_profile(statement_raw, name,), - CassError::CASS_OK - ); - assert_cass_error_eq!( - cass_batch_set_execution_profile(batch_raw, name,), - CassError::CASS_OK - ); - } - } - unsafe fn set_exec_profile( - profile_name_c_str: *const c_char, - statement_raw: CassBorrowedExclusivePtr, - batch_raw: CassBorrowedExclusivePtr, - ) { - unsafe { - set_provided_exec_profile(profile_name_c_str, statement_raw, batch_raw) - }; - } - unsafe fn unset_exec_profile( - statement_raw: CassBorrowedExclusivePtr, - batch_raw: CassBorrowedExclusivePtr, - ) { - unsafe { - set_provided_exec_profile(std::ptr::null::(), statement_raw, batch_raw) - }; - } - unsafe fn set_retry_policy_on_stmt( - policy: CassBorrowedSharedPtr, - statement_raw: CassBorrowedExclusivePtr, - batch_raw: CassBorrowedExclusivePtr, - ) { - unsafe { - assert_cass_error_eq!( - cass_statement_set_retry_policy(statement_raw, policy.borrow()), - CassError::CASS_OK - ); - assert_cass_error_eq!( - cass_batch_set_retry_policy(batch_raw, policy,), - CassError::CASS_OK - ); - } - } - unsafe fn unset_retry_policy_on_stmt( - statement_raw: CassBorrowedExclusivePtr, - batch_raw: CassBorrowedExclusivePtr, - ) { - unsafe { set_retry_policy_on_stmt(ArcFFI::null(), statement_raw, batch_raw) }; - } - - // ### START TESTING - - // With no exec profile nor retry policy set on statement/batch, - // the default cluster-wide retry policy should be used: in this case, fallthrough. - - // F - - - assert_query_with_fallthrough_policy( - &mut proxy, - session_raw.borrow(), - statement_raw.borrow().into_c_const(), - batch_raw.borrow().into_c_const(), - ); - - // F D - - set_exec_profile( - profile_name_c_str, - statement_raw.borrow_mut(), - batch_raw.borrow_mut(), - ); - assert_query_with_default_policy( - &mut proxy, - session_raw.borrow(), - statement_raw.borrow().into_c_const(), - batch_raw.borrow().into_c_const(), - ); - - // F - - - unset_exec_profile(statement_raw.borrow_mut(), batch_raw.borrow_mut()); - assert_query_with_fallthrough_policy( - &mut proxy, - session_raw.borrow(), - statement_raw.borrow().into_c_const(), - batch_raw.borrow().into_c_const(), - ); - - // F - F - set_retry_policy_on_stmt( - fallthrough_policy.borrow(), - statement_raw.borrow_mut(), - batch_raw.borrow_mut(), - ); - assert_query_with_fallthrough_policy( - &mut proxy, - session_raw.borrow(), - statement_raw.borrow().into_c_const(), - batch_raw.borrow().into_c_const(), - ); - - // F D F - set_exec_profile( - profile_name_c_str, - statement_raw.borrow_mut(), - batch_raw.borrow_mut(), - ); - assert_query_with_fallthrough_policy( - &mut proxy, - session_raw.borrow(), - statement_raw.borrow().into_c_const(), - batch_raw.borrow().into_c_const(), - ); - - // F D - - unset_retry_policy_on_stmt(statement_raw.borrow_mut(), batch_raw.borrow_mut()); - assert_query_with_default_policy( - &mut proxy, - session_raw.borrow(), - statement_raw.borrow().into_c_const(), - batch_raw.borrow().into_c_const(), - ); - - // F D F - set_retry_policy_on_stmt( - fallthrough_policy.borrow(), - statement_raw.borrow_mut(), - batch_raw.borrow_mut(), - ); - assert_query_with_fallthrough_policy( - &mut proxy, - session_raw.borrow(), - statement_raw.borrow().into_c_const(), - batch_raw.borrow().into_c_const(), - ); - - // F D D - set_retry_policy_on_stmt( - default_policy.borrow(), - statement_raw.borrow_mut(), - batch_raw.borrow_mut(), - ); - assert_query_with_default_policy( - &mut proxy, - session_raw.borrow(), - statement_raw.borrow().into_c_const(), - batch_raw.borrow().into_c_const(), - ); - - // F D F - set_retry_policy_on_stmt( - fallthrough_policy.borrow(), - statement_raw.borrow_mut(), - batch_raw.borrow_mut(), - ); - assert_query_with_fallthrough_policy( - &mut proxy, - session_raw.borrow(), - statement_raw.borrow().into_c_const(), - batch_raw.borrow().into_c_const(), - ); - - // F - F - unset_exec_profile(statement_raw.borrow_mut(), batch_raw.borrow_mut()); - assert_query_with_fallthrough_policy( - &mut proxy, - session_raw.borrow(), - statement_raw.borrow().into_c_const(), - batch_raw.borrow().into_c_const(), - ); - - // F - - - unset_retry_policy_on_stmt(statement_raw.borrow_mut(), batch_raw.borrow_mut()); - assert_query_with_fallthrough_policy( - &mut proxy, - session_raw.borrow(), - statement_raw.borrow().into_c_const(), - batch_raw.borrow().into_c_const(), - ); - - // F - D - set_retry_policy_on_stmt( - default_policy.borrow(), - statement_raw.borrow_mut(), - batch_raw.borrow_mut(), - ); - assert_query_with_default_policy( - &mut proxy, - session_raw.borrow(), - statement_raw.borrow().into_c_const(), - batch_raw.borrow().into_c_const(), - ); - - // F D D - set_exec_profile( - profile_name_c_str, - statement_raw.borrow_mut(), - batch_raw.borrow_mut(), - ); - assert_query_with_default_policy( - &mut proxy, - session_raw.borrow(), - statement_raw.borrow().into_c_const(), - batch_raw.borrow().into_c_const(), - ); - - // F D - - unset_retry_policy_on_stmt(statement_raw.borrow_mut(), batch_raw.borrow_mut()); - assert_query_with_default_policy( - &mut proxy, - session_raw.borrow(), - statement_raw.borrow().into_c_const(), - batch_raw.borrow().into_c_const(), - ); - } - - cass_future_wait_check_and_free(cass_session_close(session_raw.borrow())); - cass_execution_profile_free(profile_raw); - cass_statement_free(statement_raw); - cass_batch_free(batch_raw); - cass_session_free(session_raw); - cass_cluster_free(cluster_raw); - } - proxy - } - - #[test] - #[ntest::timeout(5000)] - fn session_with_latency_aware_load_balancing_does_not_panic() { - unsafe { - let mut cluster_raw = cass_cluster_new(); - - // An IP with very little chance of having a ScyllaDB node listening - let ip = "127.0.1.231"; - let (c_ip, c_ip_len) = str_to_c_str_n(ip); - - assert_cass_error_eq!( - cass_cluster_set_contact_points_n(cluster_raw.borrow_mut(), c_ip, c_ip_len), - CassError::CASS_OK - ); - cass_cluster_set_latency_aware_routing(cluster_raw.borrow_mut(), true as cass_bool_t); - let session_raw = cass_session_new(); - let mut profile_raw = cass_execution_profile_new(); - assert_cass_error_eq!( - cass_execution_profile_set_latency_aware_routing( - profile_raw.borrow_mut(), - true as cass_bool_t - ), - CassError::CASS_OK - ); - let profile_name = make_c_str!("latency_aware"); - cass_cluster_set_execution_profile( - cluster_raw.borrow_mut(), - profile_name, - profile_raw.borrow_mut(), - ); - { - let cass_future = - cass_session_connect(session_raw.borrow(), cluster_raw.borrow().into_c_const()); - cass_future_wait(cass_future.borrow()); - // The exact outcome is not important, we only test that we don't panic. - } - cass_execution_profile_free(profile_raw); - cass_session_free(session_raw); - cass_cluster_free(cluster_raw); - } - } - - rusty_fork_test! { - #![rusty_fork(timeout_ms = 1000)] - #[test] - fn cluster_is_not_referenced_by_session_connect_future() { - // An IP with very little chance of having a ScyllaDB node listening - let ip = "127.0.1.231"; - let (c_ip, c_ip_len) = str_to_c_str_n(ip); - let profile_name = make_c_str!("latency_aware"); - - unsafe { - let mut cluster_raw = cass_cluster_new(); - - assert_cass_error_eq!( - cass_cluster_set_contact_points_n(cluster_raw.borrow_mut(), c_ip, c_ip_len), - CassError::CASS_OK - ); - cass_cluster_set_latency_aware_routing(cluster_raw.borrow_mut(), true as cass_bool_t); - let session_raw = cass_session_new(); - let mut profile_raw = cass_execution_profile_new(); - assert_cass_error_eq!( - cass_execution_profile_set_latency_aware_routing(profile_raw.borrow_mut(), true as cass_bool_t), - CassError::CASS_OK - ); - cass_cluster_set_execution_profile(cluster_raw.borrow_mut(), profile_name, profile_raw.borrow_mut()); - { - let cass_future = cass_session_connect(session_raw.borrow(), cluster_raw.borrow().into_c_const()); - - // This checks that we don't use-after-free the cluster inside the future. - cass_cluster_free(cluster_raw); - - cass_future_wait(cass_future.borrow()); - // The exact outcome is not important, we only test that we don't segfault. - } - cass_execution_profile_free(profile_raw); - cass_session_free(session_raw); - } - } - } - - #[tokio::test] - #[ntest::timeout(5000)] - async fn test_cass_session_get_client_id_on_disconnected_session() { - setup_tracing(); - test_with_one_proxy( - |node_addr: SocketAddr, proxy: RunningProxy| unsafe { - let session_raw = cass_session_new(); - - // Check that we can get a client ID from a disconnected session. - let _random_client_id = cass_session_get_client_id(session_raw.borrow()); - - let mut cluster_raw = cass_cluster_new(); - let ip = node_addr.ip().to_string(); - let (c_ip, c_ip_len) = str_to_c_str_n(ip.as_str()); - assert_cass_error_eq!( - cass_cluster_set_contact_points_n(cluster_raw.borrow_mut(), c_ip, c_ip_len), - CassError::CASS_OK - ); - - let cluster_client_id = CassUuid { - time_and_version: 2137, - clock_seq_and_node: 7312, - }; - cass_cluster_set_client_id(cluster_raw.borrow_mut(), cluster_client_id); - - let connect_fut = - cass_session_connect(session_raw.borrow(), cluster_raw.borrow().into_c_const()); - assert_cass_error_eq!(cass_future_error_code(connect_fut), CassError::CASS_OK); - - // Verify that the session inherits the client ID from the cluster. - let session_client_id = cass_session_get_client_id(session_raw.borrow()); - assert_eq!(session_client_id, cluster_client_id); - - // Verify that we can still get a client ID after disconnecting. - let session_client_id = cass_session_get_client_id(session_raw.borrow()); - assert_eq!(session_client_id, cluster_client_id); - - cass_session_free(session_raw); - cass_cluster_free(cluster_raw); - - proxy - }, - mock_init_rules(), - ) - .with_current_subscriber() - .await; - } - - #[tokio::test] - #[ntest::timeout(5000)] - async fn session_free_waits_for_requests_to_complete() { - setup_tracing(); - test_with_one_proxy( - session_free_waits_for_requests_to_complete_do, - mock_init_rules(), - ) - .with_current_subscriber() - .await; - } - - fn session_free_waits_for_requests_to_complete_do( - node_addr: SocketAddr, - proxy: RunningProxy, - ) -> RunningProxy { - unsafe { - let mut cluster_raw = cass_cluster_new(); - let ip = node_addr.ip().to_string(); - let (c_ip, c_ip_len) = str_to_c_str_n(ip.as_str()); - - assert_cass_error_eq!( - cass_cluster_set_contact_points_n(cluster_raw.borrow_mut(), c_ip, c_ip_len), - CassError::CASS_OK - ); - let session_raw = cass_session_new(); - cass_future_wait_check_and_free(cass_session_connect( - session_raw.borrow(), - cluster_raw.borrow().into_c_const(), - )); - - tracing::debug!("Session connected, starting to execute requests..."); - - let statement = c"SELECT host_id FROM system.local WHERE key='local'" as *const CStr - as *const c_char; - let statement_raw = cass_statement_new(statement, 0); - - let mut batch_raw = cass_batch_new(CassBatchType::CASS_BATCH_TYPE_LOGGED); - // This batch is obviously invalid, because it contains a SELECT statement. This is OK for us, - // because we anyway expect the batch to fail. The goal is to have the future set, no matter if it's - // set with a success or an error. - cass_batch_add_statement(batch_raw.borrow_mut(), statement_raw.borrow()); - - let finished_executions = AtomicUsize::new(0); - unsafe extern "C" fn finished_execution_callback( - _future_raw: CassBorrowedSharedPtr, - data: *mut c_void, - ) { - let finished_executions = unsafe { &*(data as *const AtomicUsize) }; - finished_executions.fetch_add(1, Ordering::SeqCst); - } - - const ITERATIONS: usize = 1; - const EXECUTIONS: usize = 3 * ITERATIONS; // One prepare, one statement and one batch per iteration. - - let futures = (0..ITERATIONS) - .flat_map(|_| { - // Prepare a statement - let prepare_fut = cass_session_prepare(session_raw.borrow(), statement); - - // Execute a statement - let statement_fut = cass_session_execute( - session_raw.borrow(), - statement_raw.borrow().into_c_const(), - ); - - // Execute a batch - let batch_fut = cass_session_execute_batch( - session_raw.borrow(), - batch_raw.borrow().into_c_const(), - ); - for fut in [ - prepare_fut.borrow(), - statement_fut.borrow(), - batch_fut.borrow(), - ] { - cass_future_set_callback( - fut, - Some(finished_execution_callback), - std::ptr::addr_of!(finished_executions) as _, - ); - } - - [prepare_fut, statement_fut, batch_fut] - }) - .collect::>(); - - tracing::debug!("Started all requests. Now, freeing statements and session..."); - - // Free the statement - cass_statement_free(statement_raw); - // Free the batch - cass_batch_free(batch_raw); - - // Session is freed, but the requests may still be in-flight. - cass_session_free(session_raw); - - tracing::debug!("Session freed."); - - // Assert that the session awaited completion of all requests. - let actually_finished_executions = finished_executions.load(Ordering::SeqCst); - assert_eq!( - actually_finished_executions, EXECUTIONS, - "Expected {} requests to complete before the session was freed, but only {} did.", - EXECUTIONS, actually_finished_executions - ); - - futures.into_iter().for_each(|fut| { - // As per cassandra.h, "a future can be freed anytime". - cass_future_free(fut); - }); - - cass_cluster_free(cluster_raw); - } - proxy - } } diff --git a/scylla-rust-wrapper/src/batch.rs b/scylla-rust-wrapper/src/statements/batch.rs similarity index 97% rename from scylla-rust-wrapper/src/batch.rs rename to scylla-rust-wrapper/src/statements/batch.rs index cf1fba17..d490cdf0 100644 --- a/scylla-rust-wrapper/src/batch.rs +++ b/scylla-rust-wrapper/src/statements/batch.rs @@ -2,15 +2,16 @@ use crate::argconv::{ ArcFFI, BoxFFI, CMut, CassBorrowedExclusivePtr, CassBorrowedSharedPtr, CassOwnedExclusivePtr, FFI, FromBox, }; +pub use crate::cass_batch_types::CassBatchType; use crate::cass_error::CassError; -pub use crate::cass_types::CassBatchType; -use crate::cass_types::{CassConsistency, make_batch_type}; use crate::config_value::{MaybeUnsetConfig, RequestTimeout}; +use crate::cql_types::CassConsistency; +use crate::cql_types::data_type::make_batch_type; +use crate::cql_types::value::CassCqlValue; use crate::exec_profile::PerStatementExecProfile; use crate::retry_policy::CassRetryPolicy; -use crate::statement::{BoundStatement, CassStatement}; +use crate::statements::statement::{BoundStatement, CassStatement}; use crate::types::*; -use crate::value::CassCqlValue; use scylla::statement::batch::Batch; use scylla::statement::{Consistency, SerialConsistency}; use scylla::value::MaybeUnset; diff --git a/scylla-rust-wrapper/src/statements/mod.rs b/scylla-rust-wrapper/src/statements/mod.rs new file mode 100644 index 00000000..be8a6349 --- /dev/null +++ b/scylla-rust-wrapper/src/statements/mod.rs @@ -0,0 +1,3 @@ +pub(crate) mod batch; +pub(crate) mod prepared; +pub(crate) mod statement; diff --git a/scylla-rust-wrapper/src/prepared.rs b/scylla-rust-wrapper/src/statements/prepared.rs similarity index 98% rename from scylla-rust-wrapper/src/prepared.rs rename to scylla-rust-wrapper/src/statements/prepared.rs index 06ef9e0d..c40497ef 100644 --- a/scylla-rust-wrapper/src/prepared.rs +++ b/scylla-rust-wrapper/src/statements/prepared.rs @@ -4,9 +4,9 @@ use std::{os::raw::c_char, sync::Arc}; use crate::{ argconv::*, cass_error::CassError, - cass_types::{CassDataType, get_column_type}, + cql_types::data_type::{CassDataType, get_column_type}, query_result::CassResultMetadata, - statement::{BoundPreparedStatement, CassStatement}, + statements::statement::{BoundPreparedStatement, CassStatement}, types::size_t, }; use scylla::statement::prepared::PreparedStatement; diff --git a/scylla-rust-wrapper/src/statement.rs b/scylla-rust-wrapper/src/statements/statement.rs similarity index 99% rename from scylla-rust-wrapper/src/statement.rs rename to scylla-rust-wrapper/src/statements/statement.rs index 7378b79c..643c8c1b 100644 --- a/scylla-rust-wrapper/src/statement.rs +++ b/scylla-rust-wrapper/src/statements/statement.rs @@ -1,14 +1,14 @@ +use crate::argconv::*; use crate::cass_error::CassError; -use crate::cass_types::CassConsistency; use crate::config_value::{MaybeUnsetConfig, RequestTimeout}; +use crate::cql_types::CassConsistency; +use crate::cql_types::inet::CassInet; +use crate::cql_types::value::{self, CassCqlValue}; use crate::exec_profile::PerStatementExecProfile; -use crate::inet::CassInet; -use crate::prepared::CassPrepared; use crate::query_result::{CassNode, CassResult}; use crate::retry_policy::CassRetryPolicy; +use crate::statements::prepared::CassPrepared; use crate::types::*; -use crate::value::CassCqlValue; -use crate::{argconv::*, value}; use scylla::frame::types::Consistency; use scylla::policies::load_balancing::{NodeIdentifier, SingleTargetLoadBalancingPolicy}; use scylla::response::{PagingState, PagingStateResponse}; @@ -875,11 +875,11 @@ mod tests { use crate::argconv::{BoxFFI, RefFFI}; use crate::cass_error::CassError; - use crate::inet::CassInet; - use crate::statement::{ + use crate::cql_types::inet::CassInet; + use crate::statements::statement::{ cass_statement_set_host, cass_statement_set_host_inet, cass_statement_set_node, }; - use crate::testing::assert_cass_error_eq; + use crate::testing::utils::assert_cass_error_eq; use super::{cass_statement_free, cass_statement_new}; diff --git a/scylla-rust-wrapper/src/integration_testing.rs b/scylla-rust-wrapper/src/testing/integration.rs similarity index 99% rename from scylla-rust-wrapper/src/integration_testing.rs rename to scylla-rust-wrapper/src/testing/integration.rs index c5cd3056..d350b7fb 100644 --- a/scylla-rust-wrapper/src/integration_testing.rs +++ b/scylla-rust-wrapper/src/testing/integration.rs @@ -12,13 +12,13 @@ use crate::argconv::{ ArcFFI, BoxFFI, CConst, CMut, CassBorrowedExclusivePtr, CassBorrowedSharedPtr, CassOwnedSharedPtr, }; -use crate::batch::CassBatch; use crate::cluster::CassCluster; use crate::future::{CassFuture, CassResultValue}; use crate::retry_policy::CassRetryPolicy; #[cfg(test)] use crate::runtime::Runtime; -use crate::statement::{BoundStatement, CassStatement}; +use crate::statements::batch::CassBatch; +use crate::statements::statement::{BoundStatement, CassStatement}; use crate::types::{cass_bool_t, cass_int32_t, cass_uint16_t, cass_uint64_t, size_t}; #[unsafe(no_mangle)] diff --git a/scylla-rust-wrapper/src/testing/mod.rs b/scylla-rust-wrapper/src/testing/mod.rs new file mode 100644 index 00000000..fee08a88 --- /dev/null +++ b/scylla-rust-wrapper/src/testing/mod.rs @@ -0,0 +1,8 @@ +#[cfg(cpp_integration_testing)] +pub(crate) mod integration; + +#[cfg(test)] +pub(crate) mod utils; + +#[cfg(test)] +mod ser_de_tests; diff --git a/scylla-rust-wrapper/src/ser_de_tests.rs b/scylla-rust-wrapper/src/testing/ser_de_tests.rs similarity index 99% rename from scylla-rust-wrapper/src/ser_de_tests.rs rename to scylla-rust-wrapper/src/testing/ser_de_tests.rs index 6102b555..d74e6a58 100644 --- a/scylla-rust-wrapper/src/ser_de_tests.rs +++ b/scylla-rust-wrapper/src/testing/ser_de_tests.rs @@ -26,8 +26,9 @@ use crate::argconv::{ CConst, CMut, CassBorrowedExclusivePtr, CassBorrowedSharedPtr, CassOwnedExclusivePtr, RefFFI, }; use crate::cass_error::CassError; -use crate::cass_types::get_column_type; -use crate::inet::CassInet; +use crate::cql_types::data_type::get_column_type; +use crate::cql_types::inet::CassInet; +use crate::cql_types::uuid::CassUuid; use crate::iterator::{ CassIterator, CassIteratorType, cass_iterator_fields_from_user_type, cass_iterator_free, cass_iterator_from_collection, cass_iterator_from_map, cass_iterator_from_tuple, @@ -42,9 +43,8 @@ use crate::query_result::{ cass_value_get_int8, cass_value_get_int16, cass_value_get_int32, cass_value_get_int64, cass_value_get_string, cass_value_get_uuid, cass_value_is_null, cass_value_item_count, }; -use crate::testing::{assert_cass_error_eq, setup_tracing}; +use crate::testing::utils::{assert_cass_error_eq, setup_tracing}; use crate::types::size_t; -use crate::uuid::CassUuid; fn do_serialize(t: T, typ: &ColumnType) -> Vec { let mut ret = Vec::new(); diff --git a/scylla-rust-wrapper/src/testing.rs b/scylla-rust-wrapper/src/testing/utils.rs similarity index 100% rename from scylla-rust-wrapper/src/testing.rs rename to scylla-rust-wrapper/src/testing/utils.rs diff --git a/scylla-rust-wrapper/tests/integration/consistency.rs b/scylla-rust-wrapper/tests/integration/consistency.rs index 4d2374f3..53b8d79f 100644 --- a/scylla-rust-wrapper/tests/integration/consistency.rs +++ b/scylla-rust-wrapper/tests/integration/consistency.rs @@ -34,7 +34,6 @@ use scylla_cpp_driver::api::statement::{ cass_statement_set_execution_profile, cass_statement_set_serial_consistency, }; use scylla_cpp_driver::argconv::{CConst, CMut, CassBorrowedExclusivePtr, CassBorrowedSharedPtr}; -use scylla_proxy::ShardAwareness; use scylla_proxy::{ Condition, ProxyError, Reaction, RequestFrame, RequestOpcode, RequestReaction, RequestRule, TargetShard, WorkerError, @@ -611,8 +610,8 @@ fn check_for_all_consistencies_and_setting_options( async fn consistency_is_correctly_set_in_cql_requests() { setup_tracing(); let res = test_with_3_node_dry_mode_cluster( - ShardAwareness::QueryNode, - |proxy_uris, mut running_proxy| async move { + || None, + |proxy_uris, mut running_proxy| { let request_rules = |tx| { handshake_rules() .into_iter() @@ -662,13 +661,7 @@ async fn consistency_is_correctly_set_in_cql_requests() { running_node.change_request_rules(Some(request_rules(request_tx.clone()))); } - // The test must be executed in a blocking context, because otherwise the tokio runtime - // will panic on blocking operations that C API performs. - tokio::task::spawn_blocking(move || { - check_for_all_consistencies_and_setting_options(request_rx, proxy_uris) - }) - .await - .unwrap(); + check_for_all_consistencies_and_setting_options(request_rx, proxy_uris); running_proxy }, diff --git a/scylla-rust-wrapper/tests/integration/main.rs b/scylla-rust-wrapper/tests/integration/main.rs index 1a5c3dda..7186cbab 100644 --- a/scylla-rust-wrapper/tests/integration/main.rs +++ b/scylla-rust-wrapper/tests/integration/main.rs @@ -1,2 +1,3 @@ mod consistency; +mod session; mod utils; diff --git a/scylla-rust-wrapper/tests/integration/session.rs b/scylla-rust-wrapper/tests/integration/session.rs new file mode 100644 index 00000000..006e0ee1 --- /dev/null +++ b/scylla-rust-wrapper/tests/integration/session.rs @@ -0,0 +1,734 @@ +use std::{ + ffi::{CStr, c_void}, + sync::atomic::{AtomicUsize, Ordering}, +}; + +use libc::c_char; +use rusty_fork::rusty_fork_test; +use scylla::errors::DbError; +use scylla_cpp_driver::{ + api::{ + batch::{ + CassBatch, CassBatchType, cass_batch_add_statement, cass_batch_free, cass_batch_new, + cass_batch_set_execution_profile, cass_batch_set_retry_policy, + }, + cluster::{ + cass_cluster_free, cass_cluster_new, cass_cluster_set_client_id, + cass_cluster_set_contact_points, cass_cluster_set_contact_points_n, + cass_cluster_set_execution_profile, cass_cluster_set_latency_aware_routing, + cass_cluster_set_retry_policy, + }, + error::CassError, + execution_profile::{ + cass_execution_profile_free, cass_execution_profile_new, + cass_execution_profile_set_latency_aware_routing, + cass_execution_profile_set_retry_policy, + }, + future::{ + CassFuture, cass_future_error_code, cass_future_free, cass_future_set_callback, + cass_future_wait, + }, + retry_policy::{ + CassRetryPolicy, cass_retry_policy_default_new, cass_retry_policy_fallthrough_new, + }, + session::{ + CassSession, cass_session_close, cass_session_connect, cass_session_execute, + cass_session_execute_batch, cass_session_free, cass_session_get_client_id, + cass_session_new, cass_session_prepare, + }, + statement::{ + CassStatement, cass_statement_free, cass_statement_new, + cass_statement_set_execution_profile, cass_statement_set_retry_policy, + }, + uuid::CassUuid, + }, + argconv::{ArcFFI, CConst, CMut, CassBorrowedExclusivePtr, CassBorrowedSharedPtr}, + types::cass_bool_t, +}; +use scylla_cql::Consistency; +use scylla_proxy::{ + Condition, ProxyError, RequestOpcode, RequestReaction, RequestRule, RunningProxy, WorkerError, +}; +use tracing::instrument::WithSubscriber as _; + +use crate::utils::{ + assert_cass_error_eq, cass_future_wait_check_and_free, generic_drop_queries_rules, + handshake_rules, make_c_str, mock_init_rules, proxy_uris_to_contact_points, setup_tracing, + str_to_c_str_n, test_with_3_node_dry_mode_cluster, +}; + +#[tokio::test] +#[ntest::timeout(30000)] +async fn retry_policy_on_statement_and_batch_is_handled_properly() { + setup_tracing(); + let res = test_with_3_node_dry_mode_cluster( + retry_policy_on_statement_and_batch_is_handled_properly_rules, + retry_policy_on_statement_and_batch_is_handled_properly_do, + ) + .with_current_subscriber() + .await; + + match res { + Ok(()) => (), + Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (), + Err(err) => panic!("{}", err), + } +} + +fn retry_policy_on_statement_and_batch_is_handled_properly_rules() +-> impl IntoIterator { + handshake_rules() + .into_iter() + .chain(std::iter::once(RequestRule( + Condition::RequestOpcode(RequestOpcode::Query) + .or(Condition::RequestOpcode(RequestOpcode::Batch)) + .and(Condition::BodyContainsCaseInsensitive(Box::new( + *b"SELECT host_id FROM system.", + ))) + // this 1 differentiates Fallthrough and Default retry policies. + .and(Condition::TrueForLimitedTimes(1)), + // We simulate the read timeout error in order to trigger DefaultRetryPolicy's + // retry on the same node. + // We don't use the example ReadTimeout error that is included in proxy, + // because in order to trigger a retry we need data_present=false. + RequestReaction::forge_with_error(DbError::ReadTimeout { + consistency: Consistency::All, + received: 1, + required: 1, + data_present: false, + }), + ))) + .chain(std::iter::once(RequestRule( + Condition::RequestOpcode(RequestOpcode::Query) + .or(Condition::RequestOpcode(RequestOpcode::Batch)) + .and(Condition::BodyContainsCaseInsensitive(Box::new( + *b"SELECT host_id FROM system.", + ))), + // We make the second attempt return a hard, nonrecoverable error. + RequestReaction::forge().read_failure(), + ))) + .chain(generic_drop_queries_rules()) +} + +// This test aims to verify that the retry policy emulation works properly, +// in any sequence of actions mutating the retry policy for a query. +// +// Below, the consecutive states of the test case are illustrated: +// Retry policy set on: ('F' - Fallthrough, 'D' - Default, '-' - no policy set) +// session default exec profile: F F F F F F F F F F F F F F +// per stmt/batch exec profile: - D - - D D D D D - - - D D +// stmt/batch (emulated): - - - F F - F D F F - D D - +fn retry_policy_on_statement_and_batch_is_handled_properly_do( + proxy_uris: [String; 3], + mut proxy: RunningProxy, +) -> RunningProxy { + unsafe { + let mut cluster_raw = cass_cluster_new(); + let contact_points = proxy_uris_to_contact_points(proxy_uris); + + assert_cass_error_eq( + cass_cluster_set_contact_points(cluster_raw.borrow_mut(), contact_points.as_ptr()), + CassError::CASS_OK, + ); + + let fallthrough_policy = cass_retry_policy_fallthrough_new(); + let default_policy = cass_retry_policy_default_new(); + cass_cluster_set_retry_policy(cluster_raw.borrow_mut(), fallthrough_policy.borrow()); + + let session_raw = cass_session_new(); + + let mut profile_raw = cass_execution_profile_new(); + // A name of a profile that will have been registered in the Cluster. + let profile_name_c_str = make_c_str!("profile"); + + assert_cass_error_eq( + cass_execution_profile_set_retry_policy( + profile_raw.borrow_mut(), + default_policy.borrow(), + ), + CassError::CASS_OK, + ); + + let query = make_c_str!("SELECT host_id FROM system.local WHERE key='local'"); + let mut statement_raw = cass_statement_new(query, 0); + let mut batch_raw = cass_batch_new(CassBatchType::CASS_BATCH_TYPE_LOGGED); + assert_cass_error_eq( + cass_batch_add_statement(batch_raw.borrow_mut(), statement_raw.borrow()), + CassError::CASS_OK, + ); + + assert_cass_error_eq( + cass_cluster_set_execution_profile( + cluster_raw.borrow_mut(), + profile_name_c_str, + profile_raw.borrow_mut(), + ), + CassError::CASS_OK, + ); + + cass_future_wait_check_and_free(cass_session_connect( + session_raw.borrow(), + cluster_raw.borrow().into_c_const(), + )); + { + unsafe fn execute_query( + session_raw: CassBorrowedSharedPtr, + statement_raw: CassBorrowedSharedPtr, + ) -> CassError { + unsafe { + cass_future_error_code( + cass_session_execute(session_raw, statement_raw).borrow(), + ) + } + } + unsafe fn execute_batch( + session_raw: CassBorrowedSharedPtr, + batch_raw: CassBorrowedSharedPtr, + ) -> CassError { + unsafe { + cass_future_error_code( + cass_session_execute_batch(session_raw, batch_raw).borrow(), + ) + } + } + + fn reset_proxy_rules(proxy: &mut RunningProxy) { + proxy.running_nodes.iter_mut().for_each(|node| { + node.change_request_rules(Some( + retry_policy_on_statement_and_batch_is_handled_properly_rules() + .into_iter() + .collect(), + )) + }) + } + + unsafe fn assert_query_with_fallthrough_policy( + proxy: &mut RunningProxy, + session_raw: CassBorrowedSharedPtr, + statement_raw: CassBorrowedSharedPtr, + batch_raw: CassBorrowedSharedPtr, + ) { + reset_proxy_rules(&mut *proxy); + unsafe { + assert_cass_error_eq( + execute_query(session_raw.borrow(), statement_raw), + CassError::CASS_ERROR_SERVER_READ_TIMEOUT, + ); + reset_proxy_rules(&mut *proxy); + assert_cass_error_eq( + execute_batch(session_raw, batch_raw), + CassError::CASS_ERROR_SERVER_READ_TIMEOUT, + ); + } + } + + unsafe fn assert_query_with_default_policy( + proxy: &mut RunningProxy, + session_raw: CassBorrowedSharedPtr, + statement_raw: CassBorrowedSharedPtr, + batch_raw: CassBorrowedSharedPtr, + ) { + reset_proxy_rules(&mut *proxy); + unsafe { + assert_cass_error_eq( + execute_query(session_raw.borrow(), statement_raw), + CassError::CASS_ERROR_SERVER_READ_FAILURE, + ); + reset_proxy_rules(&mut *proxy); + assert_cass_error_eq( + execute_batch(session_raw, batch_raw), + CassError::CASS_ERROR_SERVER_READ_FAILURE, + ); + } + } + + unsafe fn set_provided_exec_profile( + name: *const i8, + statement_raw: CassBorrowedExclusivePtr, + batch_raw: CassBorrowedExclusivePtr, + ) { + // Set statement/batch exec profile. + unsafe { + assert_cass_error_eq( + cass_statement_set_execution_profile(statement_raw, name), + CassError::CASS_OK, + ); + assert_cass_error_eq( + cass_batch_set_execution_profile(batch_raw, name), + CassError::CASS_OK, + ); + } + } + unsafe fn set_exec_profile( + profile_name_c_str: *const c_char, + statement_raw: CassBorrowedExclusivePtr, + batch_raw: CassBorrowedExclusivePtr, + ) { + unsafe { set_provided_exec_profile(profile_name_c_str, statement_raw, batch_raw) }; + } + unsafe fn unset_exec_profile( + statement_raw: CassBorrowedExclusivePtr, + batch_raw: CassBorrowedExclusivePtr, + ) { + unsafe { + set_provided_exec_profile(std::ptr::null::(), statement_raw, batch_raw) + }; + } + unsafe fn set_retry_policy_on_stmt( + policy: CassBorrowedSharedPtr, + statement_raw: CassBorrowedExclusivePtr, + batch_raw: CassBorrowedExclusivePtr, + ) { + unsafe { + assert_cass_error_eq( + cass_statement_set_retry_policy(statement_raw, policy.borrow()), + CassError::CASS_OK, + ); + assert_cass_error_eq( + cass_batch_set_retry_policy(batch_raw, policy), + CassError::CASS_OK, + ); + } + } + unsafe fn unset_retry_policy_on_stmt( + statement_raw: CassBorrowedExclusivePtr, + batch_raw: CassBorrowedExclusivePtr, + ) { + unsafe { set_retry_policy_on_stmt(ArcFFI::null(), statement_raw, batch_raw) }; + } + + // ### START TESTING + + // With no exec profile nor retry policy set on statement/batch, + // the default cluster-wide retry policy should be used: in this case, fallthrough. + + // F - - + assert_query_with_fallthrough_policy( + &mut proxy, + session_raw.borrow(), + statement_raw.borrow().into_c_const(), + batch_raw.borrow().into_c_const(), + ); + + // F D - + set_exec_profile( + profile_name_c_str, + statement_raw.borrow_mut(), + batch_raw.borrow_mut(), + ); + assert_query_with_default_policy( + &mut proxy, + session_raw.borrow(), + statement_raw.borrow().into_c_const(), + batch_raw.borrow().into_c_const(), + ); + + // F - - + unset_exec_profile(statement_raw.borrow_mut(), batch_raw.borrow_mut()); + assert_query_with_fallthrough_policy( + &mut proxy, + session_raw.borrow(), + statement_raw.borrow().into_c_const(), + batch_raw.borrow().into_c_const(), + ); + + // F - F + set_retry_policy_on_stmt( + fallthrough_policy.borrow(), + statement_raw.borrow_mut(), + batch_raw.borrow_mut(), + ); + assert_query_with_fallthrough_policy( + &mut proxy, + session_raw.borrow(), + statement_raw.borrow().into_c_const(), + batch_raw.borrow().into_c_const(), + ); + + // F D F + set_exec_profile( + profile_name_c_str, + statement_raw.borrow_mut(), + batch_raw.borrow_mut(), + ); + assert_query_with_fallthrough_policy( + &mut proxy, + session_raw.borrow(), + statement_raw.borrow().into_c_const(), + batch_raw.borrow().into_c_const(), + ); + + // F D - + unset_retry_policy_on_stmt(statement_raw.borrow_mut(), batch_raw.borrow_mut()); + assert_query_with_default_policy( + &mut proxy, + session_raw.borrow(), + statement_raw.borrow().into_c_const(), + batch_raw.borrow().into_c_const(), + ); + + // F D F + set_retry_policy_on_stmt( + fallthrough_policy.borrow(), + statement_raw.borrow_mut(), + batch_raw.borrow_mut(), + ); + assert_query_with_fallthrough_policy( + &mut proxy, + session_raw.borrow(), + statement_raw.borrow().into_c_const(), + batch_raw.borrow().into_c_const(), + ); + + // F D D + set_retry_policy_on_stmt( + default_policy.borrow(), + statement_raw.borrow_mut(), + batch_raw.borrow_mut(), + ); + assert_query_with_default_policy( + &mut proxy, + session_raw.borrow(), + statement_raw.borrow().into_c_const(), + batch_raw.borrow().into_c_const(), + ); + + // F D F + set_retry_policy_on_stmt( + fallthrough_policy.borrow(), + statement_raw.borrow_mut(), + batch_raw.borrow_mut(), + ); + assert_query_with_fallthrough_policy( + &mut proxy, + session_raw.borrow(), + statement_raw.borrow().into_c_const(), + batch_raw.borrow().into_c_const(), + ); + + // F - F + unset_exec_profile(statement_raw.borrow_mut(), batch_raw.borrow_mut()); + assert_query_with_fallthrough_policy( + &mut proxy, + session_raw.borrow(), + statement_raw.borrow().into_c_const(), + batch_raw.borrow().into_c_const(), + ); + + // F - - + unset_retry_policy_on_stmt(statement_raw.borrow_mut(), batch_raw.borrow_mut()); + assert_query_with_fallthrough_policy( + &mut proxy, + session_raw.borrow(), + statement_raw.borrow().into_c_const(), + batch_raw.borrow().into_c_const(), + ); + + // F - D + set_retry_policy_on_stmt( + default_policy.borrow(), + statement_raw.borrow_mut(), + batch_raw.borrow_mut(), + ); + assert_query_with_default_policy( + &mut proxy, + session_raw.borrow(), + statement_raw.borrow().into_c_const(), + batch_raw.borrow().into_c_const(), + ); + + // F D D + set_exec_profile( + profile_name_c_str, + statement_raw.borrow_mut(), + batch_raw.borrow_mut(), + ); + assert_query_with_default_policy( + &mut proxy, + session_raw.borrow(), + statement_raw.borrow().into_c_const(), + batch_raw.borrow().into_c_const(), + ); + + // F D - + unset_retry_policy_on_stmt(statement_raw.borrow_mut(), batch_raw.borrow_mut()); + assert_query_with_default_policy( + &mut proxy, + session_raw.borrow(), + statement_raw.borrow().into_c_const(), + batch_raw.borrow().into_c_const(), + ); + } + + cass_future_wait_check_and_free(cass_session_close(session_raw.borrow())); + cass_execution_profile_free(profile_raw); + cass_statement_free(statement_raw); + cass_batch_free(batch_raw); + cass_session_free(session_raw); + cass_cluster_free(cluster_raw); + } + + proxy +} + +#[test] +#[ntest::timeout(5000)] +fn session_with_latency_aware_load_balancing_does_not_panic() { + unsafe { + let mut cluster_raw = cass_cluster_new(); + + // An IP with very little chance of having a ScyllaDB node listening + let ip = "127.0.1.231"; + let (c_ip, c_ip_len) = str_to_c_str_n(ip); + + assert_cass_error_eq( + cass_cluster_set_contact_points_n(cluster_raw.borrow_mut(), c_ip, c_ip_len), + CassError::CASS_OK, + ); + cass_cluster_set_latency_aware_routing(cluster_raw.borrow_mut(), true as cass_bool_t); + let session_raw = cass_session_new(); + let mut profile_raw = cass_execution_profile_new(); + assert_cass_error_eq( + cass_execution_profile_set_latency_aware_routing( + profile_raw.borrow_mut(), + true as cass_bool_t, + ), + CassError::CASS_OK, + ); + let profile_name = make_c_str!("latency_aware"); + cass_cluster_set_execution_profile( + cluster_raw.borrow_mut(), + profile_name, + profile_raw.borrow_mut(), + ); + { + let cass_future = + cass_session_connect(session_raw.borrow(), cluster_raw.borrow().into_c_const()); + cass_future_wait(cass_future.borrow()); + // The exact outcome is not important, we only test that we don't panic. + } + cass_execution_profile_free(profile_raw); + cass_session_free(session_raw); + cass_cluster_free(cluster_raw); + } +} + +rusty_fork_test! { + #![rusty_fork(timeout_ms = 1000)] + #[test] + fn cluster_is_not_referenced_by_session_connect_future() { + // An IP with very little chance of having a ScyllaDB node listening + let ip = "127.0.1.231"; + let (c_ip, c_ip_len) = str_to_c_str_n(ip); + let profile_name = make_c_str!("latency_aware"); + + unsafe { + let mut cluster_raw = cass_cluster_new(); + + assert_cass_error_eq( + cass_cluster_set_contact_points_n(cluster_raw.borrow_mut(), c_ip, c_ip_len), + CassError::CASS_OK + ); + cass_cluster_set_latency_aware_routing(cluster_raw.borrow_mut(), true as cass_bool_t); + let session_raw = cass_session_new(); + let mut profile_raw = cass_execution_profile_new(); + assert_cass_error_eq( + cass_execution_profile_set_latency_aware_routing(profile_raw.borrow_mut(), true as cass_bool_t), + CassError::CASS_OK + ); + cass_cluster_set_execution_profile(cluster_raw.borrow_mut(), profile_name, profile_raw.borrow_mut()); + { + let cass_future = cass_session_connect(session_raw.borrow(), cluster_raw.borrow().into_c_const()); + + // This checks that we don't use-after-free the cluster inside the future. + cass_cluster_free(cluster_raw); + + cass_future_wait(cass_future.borrow()); + // The exact outcome is not important, we only test that we don't segfault. + } + cass_execution_profile_free(profile_raw); + cass_session_free(session_raw); + } + } +} + +#[tokio::test] +#[ntest::timeout(5000)] +async fn test_cass_session_get_client_id_on_disconnected_session() { + setup_tracing(); + let res = test_with_3_node_dry_mode_cluster( + mock_init_rules, + |proxy_uris: [String; 3], proxy: RunningProxy| { + unsafe { + let session_raw = cass_session_new(); + + // Check that we can get a client ID from a disconnected session. + let _random_client_id = cass_session_get_client_id(session_raw.borrow()); + + let mut cluster_raw = cass_cluster_new(); + let contact_points = proxy_uris_to_contact_points(proxy_uris); + assert_cass_error_eq( + cass_cluster_set_contact_points( + cluster_raw.borrow_mut(), + contact_points.as_ptr(), + ), + CassError::CASS_OK, + ); + + let cluster_client_id = CassUuid { + time_and_version: 2137, + clock_seq_and_node: 7312, + }; + cass_cluster_set_client_id(cluster_raw.borrow_mut(), cluster_client_id); + + let connect_fut = + cass_session_connect(session_raw.borrow(), cluster_raw.borrow().into_c_const()); + assert_cass_error_eq(cass_future_error_code(connect_fut), CassError::CASS_OK); + + // Verify that the session inherits the client ID from the cluster. + let session_client_id = cass_session_get_client_id(session_raw.borrow()); + assert_eq!(session_client_id, cluster_client_id); + + // Verify that we can still get a client ID after disconnecting. + let session_client_id = cass_session_get_client_id(session_raw.borrow()); + assert_eq!(session_client_id, cluster_client_id); + + cass_session_free(session_raw); + cass_cluster_free(cluster_raw); + } + + proxy + }, + ) + .with_current_subscriber() + .await; + + match res { + Ok(()) => (), + Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (), + Err(err) => panic!("{}", err), + } +} + +#[tokio::test] +#[ntest::timeout(50000)] +async fn session_free_waits_for_requests_to_complete() { + setup_tracing(); + let res = test_with_3_node_dry_mode_cluster( + mock_init_rules, + session_free_waits_for_requests_to_complete_do, + ) + .with_current_subscriber() + .await; + + match res { + Ok(()) => (), + Err(ProxyError::Worker(WorkerError::DriverDisconnected(_))) => (), + Err(err) => panic!("{}", err), + } +} + +fn session_free_waits_for_requests_to_complete_do( + proxy_uris: [String; 3], + proxy: RunningProxy, +) -> RunningProxy { + unsafe { + let mut cluster_raw = cass_cluster_new(); + let contact_points = proxy_uris_to_contact_points(proxy_uris); + + assert_cass_error_eq( + cass_cluster_set_contact_points(cluster_raw.borrow_mut(), contact_points.as_ptr()), + CassError::CASS_OK, + ); + let session_raw = cass_session_new(); + cass_future_wait_check_and_free(cass_session_connect( + session_raw.borrow(), + cluster_raw.borrow().into_c_const(), + )); + + tracing::debug!("Session connected, starting to execute requests..."); + + let statement = + c"SELECT host_id FROM system.local WHERE key='local'" as *const CStr as *const c_char; + let statement_raw = cass_statement_new(statement, 0); + + let mut batch_raw = cass_batch_new(CassBatchType::CASS_BATCH_TYPE_LOGGED); + // This batch is obviously invalid, because it contains a SELECT statement. This is OK for us, + // because we anyway expect the batch to fail. The goal is to have the future set, no matter if it's + // set with a success or an error. + cass_batch_add_statement(batch_raw.borrow_mut(), statement_raw.borrow()); + + let finished_executions = AtomicUsize::new(0); + unsafe extern "C" fn finished_execution_callback( + _future_raw: CassBorrowedSharedPtr, + data: *mut c_void, + ) { + let finished_executions = unsafe { &*(data as *const AtomicUsize) }; + finished_executions.fetch_add(1, Ordering::SeqCst); + } + + const ITERATIONS: usize = 1; + const EXECUTIONS: usize = 3 * ITERATIONS; // One prepare, one statement and one batch per iteration. + + let futures = (0..ITERATIONS) + .flat_map(|_| { + // Prepare a statement + let prepare_fut = cass_session_prepare(session_raw.borrow(), statement); + + // Execute a statement + let statement_fut = cass_session_execute( + session_raw.borrow(), + statement_raw.borrow().into_c_const(), + ); + + // Execute a batch + let batch_fut = cass_session_execute_batch( + session_raw.borrow(), + batch_raw.borrow().into_c_const(), + ); + for fut in [ + prepare_fut.borrow(), + statement_fut.borrow(), + batch_fut.borrow(), + ] { + cass_future_set_callback( + fut, + Some(finished_execution_callback), + std::ptr::addr_of!(finished_executions) as _, + ); + } + + [prepare_fut, statement_fut, batch_fut] + }) + .collect::>(); + + tracing::debug!("Started all requests. Now, freeing statements and session..."); + + // Free the statement + cass_statement_free(statement_raw); + // Free the batch + cass_batch_free(batch_raw); + + // Session is freed, but the requests may still be in-flight. + cass_session_free(session_raw); + + tracing::debug!("Session freed."); + + // Assert that the session awaited completion of all requests. + let actually_finished_executions = finished_executions.load(Ordering::SeqCst); + assert_eq!( + actually_finished_executions, EXECUTIONS, + "Expected {} requests to complete before the session was freed, but only {} did.", + EXECUTIONS, actually_finished_executions + ); + + futures.into_iter().for_each(|fut| { + // As per cassandra.h, "a future can be freed anytime". + cass_future_free(fut); + }); + + cass_cluster_free(cluster_raw); + } + + proxy +} diff --git a/scylla-rust-wrapper/tests/integration/utils.rs b/scylla-rust-wrapper/tests/integration/utils.rs index 091bd8fa..87e2d7f7 100644 --- a/scylla-rust-wrapper/tests/integration/utils.rs +++ b/scylla-rust-wrapper/tests/integration/utils.rs @@ -1,5 +1,4 @@ use bytes::BytesMut; -use futures::Future; use libc::c_char; use scylla_cpp_driver::api::error::{CassError, cass_error_desc}; use scylla_cpp_driver::api::future::{ @@ -16,7 +15,7 @@ use std::sync::Arc; use scylla_proxy::{ Condition, Node, Proxy, ProxyError, Reaction as _, RequestFrame, RequestOpcode, - RequestReaction, RequestRule, ResponseFrame, RunningProxy, ShardAwareness, + RequestReaction, RequestRule, ResponseFrame, RunningProxy, }; pub(crate) fn setup_tracing() { @@ -26,13 +25,37 @@ pub(crate) fn setup_tracing() { .try_init(); } -pub(crate) async fn test_with_3_node_dry_mode_cluster( - shard_awareness: ShardAwareness, +unsafe fn write_str_to_c(s: &str, c_str: *mut *const c_char, c_strlen: *mut size_t) { + unsafe { + *c_str = s.as_ptr() as *const c_char; + *c_strlen = s.len() as u64; + } +} + +pub(crate) fn str_to_c_str_n(s: &str) -> (*const c_char, size_t) { + let mut c_str = std::ptr::null(); + let mut c_strlen = size_t::default(); + + // SAFETY: The pointers that are passed to `write_str_to_c` are compile-checked references. + unsafe { write_str_to_c(s, &mut c_str, &mut c_strlen) }; + + (c_str, c_strlen) +} + +macro_rules! make_c_str { + ($str:literal) => { + concat!($str, "\0").as_ptr() as *const c_char + }; +} +pub(crate) use make_c_str; + +pub(crate) async fn test_with_3_node_dry_mode_cluster( + initial_request_rules: impl Fn() -> I, test: F, ) -> Result<(), ProxyError> where - F: FnOnce([String; 3], RunningProxy) -> Fut, - Fut: Future, + I: IntoIterator, + F: FnOnce([String; 3], RunningProxy) -> RunningProxy + Send + 'static, { let proxy1_uri = format!("{}:9042", scylla_proxy::get_exclusive_local_address()); let proxy2_uri = format!("{}:9042", scylla_proxy::get_exclusive_local_address()); @@ -45,17 +68,21 @@ where let proxy = Proxy::new([proxy1_addr, proxy2_addr, proxy3_addr].map(|proxy_addr| { Node::builder() .proxy_address(proxy_addr) - .shard_awareness(shard_awareness) + .request_rules(Vec::from_iter(initial_request_rules())) .build_dry_mode() })); let running_proxy = proxy.run().await.unwrap(); - let running_proxy = test([proxy1_uri, proxy2_uri, proxy3_uri], running_proxy).await; + let running_proxy = + tokio::task::spawn_blocking(|| test([proxy1_uri, proxy2_uri, proxy3_uri], running_proxy)) + .await + .unwrap(); running_proxy.finish().await } +#[track_caller] pub(crate) fn assert_cass_error_eq(errcode1: CassError, errcode2: CassError) { unsafe { assert_eq!( @@ -128,6 +155,33 @@ pub(crate) fn handshake_rules() -> impl IntoIterator { ] } +// As these are very generic, they should be put last in the rules Vec. +pub(crate) fn generic_drop_queries_rules() -> impl IntoIterator { + [RequestRule( + Condition::RequestOpcode(RequestOpcode::Query), + // We won't respond to any queries (including metadata fetch), + // but the driver will manage to continue with dummy metadata. + RequestReaction::forge().server_error(), + )] +} + +/// A set of rules that are needed to finish session initialization. +// They are used in tests that require a session to be connected. +// All connections are successfully negotiated. +// All requests are replied with a server error. +pub(crate) fn mock_init_rules() -> impl IntoIterator { + handshake_rules() + .into_iter() + .chain(std::iter::once(RequestRule( + Condition::RequestOpcode(RequestOpcode::Query) + .or(Condition::RequestOpcode(RequestOpcode::Prepare)) + .or(Condition::RequestOpcode(RequestOpcode::Batch)), + // We won't respond to any queries (including metadata fetch), + // but the driver will manage to continue with dummy metadata. + RequestReaction::forge().server_error(), + ))) +} + pub(crate) fn drop_metadata_queries_rules() -> impl IntoIterator { [RequestRule( Condition::ConnectionRegisteredAnyEvent.and(Condition::or(