Skip to content

Commit

Permalink
Use cassandra_cpp async api (#865)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Oct 20, 2022
1 parent b6edc8f commit 1d34bbb
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 79 deletions.
7 changes: 6 additions & 1 deletion shotover-proxy/benches/benches/cassandra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,12 @@ impl BenchResources {

let ca_cert = "example-configs/docker-images/cassandra-tls-4.0.6/certs/localhost_CA.crt";

let connection = CassandraConnection::new_tls("127.0.0.1", 9042, ca_cert, DRIVER);
let connection = futures::executor::block_on(CassandraConnection::new_tls(
"127.0.0.1",
9042,
ca_cert,
DRIVER,
));

let mut bench_resources = Self {
_compose: compose,
Expand Down
30 changes: 18 additions & 12 deletions shotover-proxy/tests/cassandra_int_tests/batch_statements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ use crate::helpers::cassandra::{assert_query_result, run_query, CassandraConnect
async fn use_statement(connection: &CassandraConnection) {
{
run_query(connection, "USE batch_keyspace;").await;
connection.execute_batch(vec![
"INSERT INTO batch_table (id, lastname, firstname) VALUES (0, 'text1', 'text2')".into(),
"INSERT INTO batch_table (id, lastname, firstname) VALUES (1, 'text1', 'text2')".into(),
]);
connection
.execute_batch(vec![
"INSERT INTO batch_table (id, lastname, firstname) VALUES (0, 'text1', 'text2')"
.into(),
"INSERT INTO batch_table (id, lastname, firstname) VALUES (1, 'text1', 'text2')"
.into(),
])
.await;
assert_query_result(
connection,
"SELECT id, lastname, firstname FROM batch_table;",
Expand All @@ -27,10 +31,12 @@ async fn use_statement(connection: &CassandraConnection) {
}

{
connection.execute_batch(vec![
"DELETE FROM batch_table WHERE id = 0;".into(),
"DELETE FROM batch_table WHERE id = 1;".into(),
]);
connection
.execute_batch(vec![
"DELETE FROM batch_table WHERE id = 0;".into(),
"DELETE FROM batch_table WHERE id = 1;".into(),
])
.await;
assert_query_result(connection, "SELECT * FROM batch_table;", &[]).await;
}
}
Expand All @@ -49,7 +55,7 @@ pub async fn test(connection: &CassandraConnection) {
for i in 0..2 {
batch.push(format!("INSERT INTO batch_keyspace.batch_table (id, lastname, firstname) VALUES ({}, 'text1', 'text2')", i));
}
connection.execute_batch(batch);
connection.execute_batch(batch).await;

assert_query_result(
connection,
Expand Down Expand Up @@ -78,7 +84,7 @@ pub async fn test(connection: &CassandraConnection) {
i
));
}
connection.execute_batch(batch);
connection.execute_batch(batch).await;

assert_query_result(
connection,
Expand Down Expand Up @@ -107,13 +113,13 @@ pub async fn test(connection: &CassandraConnection) {
i
));
}
connection.execute_batch(batch);
connection.execute_batch(batch).await;
assert_query_result(connection, "SELECT * FROM batch_keyspace.batch_table;", &[]).await;
}

{
let batch = vec![];
connection.execute_batch(batch);
connection.execute_batch(batch).await;
}

// test batch statements over QUERY PROTOCOL
Expand Down
42 changes: 22 additions & 20 deletions shotover-proxy/tests/cassandra_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use cdrs_tokio::frame::events::{
SchemaChange, SchemaChangeOptions, SchemaChangeTarget, SchemaChangeType, ServerEvent,
};
#[cfg(feature = "cassandra-cpp-driver-tests")]
use futures::future::{join_all, try_join_all};
use futures::future::join_all;
use futures::Future;
use metrics_util::debugging::DebuggingRecorder;
use rstest::rstest;
Expand Down Expand Up @@ -109,7 +109,8 @@ async fn test_source_tls_and_single_tls(#[case] driver: CassandraDriver) {

{
// Run a quick test straight to Cassandra to check our assumptions that Shotover and Cassandra TLS are behaving exactly the same
let direct_connection = CassandraConnection::new_tls("127.0.0.1", 9042, ca_cert, driver);
let direct_connection =
CassandraConnection::new_tls("127.0.0.1", 9042, ca_cert, driver).await;
assert_query_result(
&direct_connection,
"SELECT bootstrapped FROM system.local",
Expand All @@ -118,7 +119,7 @@ async fn test_source_tls_and_single_tls(#[case] driver: CassandraDriver) {
.await;
}

let connection = || async { CassandraConnection::new_tls("127.0.0.1", 9043, ca_cert, driver) };
let connection = || CassandraConnection::new_tls("127.0.0.1", 9043, ca_cert, driver);

standard_test_suite(&connection, driver).await;
}
Expand Down Expand Up @@ -260,7 +261,7 @@ async fn test_source_tls_and_cluster_tls(#[case] driver: CassandraDriver) {
{
// Run a quick test straight to Cassandra to check our assumptions that Shotover and Cassandra TLS are behaving exactly the same
let direct_connection =
CassandraConnection::new_tls("172.16.1.2", 9042, ca_cert, driver);
CassandraConnection::new_tls("172.16.1.2", 9042, ca_cert, driver).await;
assert_query_result(
&direct_connection,
"SELECT bootstrapped FROM system.local",
Expand All @@ -270,7 +271,8 @@ async fn test_source_tls_and_cluster_tls(#[case] driver: CassandraDriver) {
}

let connection = || async {
let mut connection = CassandraConnection::new_tls("127.0.0.1", 9042, ca_cert, driver);
let mut connection =
CassandraConnection::new_tls("127.0.0.1", 9042, ca_cert, driver).await;
connection
.enable_schema_awaiter("172.16.1.2:9042", Some(ca_cert))
.await;
Expand Down Expand Up @@ -471,7 +473,7 @@ async fn test_cassandra_peers_rewrite_cassandra3(#[case] driver: CassandraDriver
// Assert that the error cassandra gives because system.peers_v2 does not exist on cassandra v3
// is passed through shotover unchanged.
let statement = "SELECT data_center, native_port, rack FROM system.peers_v2;";
let result = connection.execute_expect_err(statement);
let result = connection.execute_expect_err(statement).await;
assert_eq!(
result,
ErrorBody {
Expand Down Expand Up @@ -503,25 +505,25 @@ async fn test_cassandra_request_throttling(#[case] driver: CassandraDriver) {

// these should all be let through the request throttling
{
let mut futures = vec![];
let mut future_list = vec![];
for _ in 0..25 {
futures.push(connection.execute_async(statement));
futures.push(connection_2.execute_async(statement));
future_list.push(connection.execute(statement));
future_list.push(connection_2.execute(statement));
}
try_join_all(futures).await.unwrap();
join_all(future_list).await;
}

// sleep to reset the window
std::thread::sleep(std::time::Duration::from_secs(1));
tokio::time::sleep(std::time::Duration::from_secs(1)).await;

// only around half of these should be let through the request throttling
{
let mut futures = vec![];
let mut future_list = vec![];
for _ in 0..50 {
futures.push(connection.execute_async(statement));
futures.push(connection_2.execute_async(statement));
future_list.push(connection.execute_fallible(statement));
future_list.push(connection_2.execute_fallible(statement));
}
let mut results = join_all(futures).await;
let mut results = join_all(future_list).await;
results.retain(|result| match result {
Ok(_) => true,
Err(Error(
Expand All @@ -538,7 +540,7 @@ async fn test_cassandra_request_throttling(#[case] driver: CassandraDriver) {
assert!(50 < len && len <= 60, "got {len}");
}

std::thread::sleep(std::time::Duration::from_secs(1)); // sleep to reset the window
tokio::time::sleep(std::time::Duration::from_secs(1)).await; // sleep to reset the window

// setup keyspace and table for the batch statement tests
{
Expand All @@ -552,18 +554,18 @@ async fn test_cassandra_request_throttling(#[case] driver: CassandraDriver) {
for i in 0..25 {
queries.push(format!("INSERT INTO test_keyspace.my_table (id, lastname, firstname) VALUES ({}, 'text', 'text')", i));
}
connection.execute_batch(queries);
connection.execute_batch(queries).await;
}

std::thread::sleep(std::time::Duration::from_secs(1)); // sleep to reset the window
tokio::time::sleep(std::time::Duration::from_secs(1)).await; // sleep to reset the window

// this batch set should not be allowed through
{
let mut queries: Vec<String> = vec![];
for i in 0..60 {
queries.push(format!("INSERT INTO test_keyspace.my_table (id, lastname, firstname) VALUES ({}, 'text', 'text')", i));
}
let result = connection.execute_batch_expect_err(queries);
let result = connection.execute_batch_expect_err(queries).await;
assert_eq!(
result,
ErrorBody {
Expand All @@ -573,7 +575,7 @@ async fn test_cassandra_request_throttling(#[case] driver: CassandraDriver) {
);
}

std::thread::sleep(std::time::Duration::from_secs(1)); // sleep to reset the window
tokio::time::sleep(std::time::Duration::from_secs(1)).await; // sleep to reset the window

batch_statements::test(&connection).await;
}
Expand Down
43 changes: 26 additions & 17 deletions shotover-proxy/tests/cassandra_int_tests/prepared_statements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ use crate::helpers::cassandra::{
};

async fn delete(session: &CassandraConnection) {
let prepared = session.prepare("DELETE FROM test_prepare_statements.table_1 WHERE id = ?;");
let prepared = session
.prepare("DELETE FROM test_prepare_statements.table_1 WHERE id = ?;")
.await;

assert_eq!(
session.execute_prepared(&prepared, 1),
session.execute_prepared(&prepared, 1).await,
Vec::<Vec<ResultValue>>::new()
);

Expand All @@ -20,29 +22,33 @@ async fn delete(session: &CassandraConnection) {
.await;
}

fn insert(session: &CassandraConnection) {
let prepared = session.prepare("INSERT INTO test_prepare_statements.table_1 (id) VALUES (?);");
async fn insert(session: &CassandraConnection) {
let prepared = session
.prepare("INSERT INTO test_prepare_statements.table_1 (id) VALUES (?);")
.await;

assert_eq!(
session.execute_prepared(&prepared, 1),
session.execute_prepared(&prepared, 1).await,
Vec::<Vec<ResultValue>>::new()
);

assert_eq!(
session.execute_prepared(&prepared, 2),
session.execute_prepared(&prepared, 2).await,
Vec::<Vec<ResultValue>>::new()
);

assert_eq!(
session.execute_prepared(&prepared, 2),
session.execute_prepared(&prepared, 2).await,
Vec::<Vec<ResultValue>>::new()
);
}

fn select(session: &CassandraConnection) {
let prepared = session.prepare("SELECT id FROM test_prepare_statements.table_1 WHERE id = ?");
async fn select(session: &CassandraConnection) {
let prepared = session
.prepare("SELECT id FROM test_prepare_statements.table_1 WHERE id = ?")
.await;

let result_rows = session.execute_prepared(&prepared, 1);
let result_rows = session.execute_prepared(&prepared, 1).await;

assert_rows(result_rows, &[&[ResultValue::Int(1)]]);
}
Expand All @@ -55,25 +61,28 @@ async fn select_cross_connection<Fut>(
let connection_before = connection_creator().await;

// query is purposely slightly different to past queries to avoid being cached
let prepared =
connection.prepare("SELECT id, id FROM test_prepare_statements.table_1 WHERE id = ?");
let prepared = connection
.prepare("SELECT id, id FROM test_prepare_statements.table_1 WHERE id = ?")
.await;

let connection_after = connection_creator().await;

assert_rows(
connection_before.execute_prepared(&prepared, 1),
connection_before.execute_prepared(&prepared, 1).await,
&[&[ResultValue::Int(1), ResultValue::Int(1)]],
);
assert_rows(
connection_after.execute_prepared(&prepared, 1),
connection_after.execute_prepared(&prepared, 1).await,
&[&[ResultValue::Int(1), ResultValue::Int(1)]],
);
}

async fn use_statement(session: &CassandraConnection) {
// Create prepared command with the correct keyspace
run_query(session, "USE test_prepare_statements;").await;
let _prepared = session.prepare("INSERT INTO table_1 (id) VALUES (?);");
let _prepared = session
.prepare("INSERT INTO table_1 (id) VALUES (?);")
.await;

// change the keyspace to be incorrect
run_query(session, "USE test_prepare_statements_empty;").await;
Expand Down Expand Up @@ -107,8 +116,8 @@ where
)
.await;

insert(session);
select(session);
insert(session).await;
select(session).await;
select_cross_connection(session, connection_creator).await;
delete(session).await;
use_statement(session).await;
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/tests/cassandra_int_tests/protect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub async fn test(shotover_session: &CassandraConnection, direct_session: &Cassa
shotover_session.execute_batch(vec![
"INSERT INTO test_protect_keyspace.test_table (pk, cluster, col1, col2, col3) VALUES ('pk2', 'cluster', 'encrypted2', 1, true)".into(),
"INSERT INTO test_protect_keyspace.test_table (pk, cluster, col1, col2, col3) VALUES ('pk3', 'cluster', 'encrypted3', 2, false)".into()
]);
]).await;

let insert_statement = "BEGIN BATCH
INSERT INTO test_protect_keyspace.test_table (pk, cluster, col1, col2, col3) VALUES ('pk4', 'cluster', 'encrypted4', 3, true);
Expand Down

0 comments on commit 1d34bbb

Please sign in to comment.