Skip to content

Commit

Permalink
cassandra_int_tests add case that forces cassandra message encoding (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Sep 29, 2022
1 parent 9c03153 commit 3955be9
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
sources:
cassandra_prod:
Cassandra:
listen_addr: "127.0.0.1:9042"
chain_config:
main_chain:
- DebugForceEncode:
encode_requests: true
encode_responses: true
- CassandraSinkSingle:
remote_address: "127.0.0.1:9043"
source_to_chain_mapping:
cassandra_prod: main_chain
49 changes: 40 additions & 9 deletions shotover-proxy/src/transforms/debug/force_parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,40 @@ use anyhow::Result;
use async_trait::async_trait;
use serde::Deserialize;

/// Messages that pass through this transform will be parsed.
/// Must be individually enabled at the request or response level.
#[derive(Deserialize, Debug, Clone)]
pub struct DebugForceParseConfig {
parse_requests: Option<bool>,
parse_responses: Option<bool>,
parse_requests: bool,
parse_responses: bool,
}

impl DebugForceParseConfig {
pub async fn get_transform(&self) -> Result<Transforms> {
Ok(Transforms::DebugForceParse(DebugForceParse {
parse_requests: self.parse_requests.unwrap_or(true),
parse_responses: self.parse_responses.unwrap_or(true),
parse_requests: self.parse_requests,
parse_responses: self.parse_responses,
encode_requests: false,
encode_responses: false,
}))
}
}

/// Messages that pass through this transform will be parsed and then reencoded.
/// Must be individually enabled at the request or response level.
#[derive(Deserialize, Debug, Clone)]
pub struct DebugForceEncodeConfig {
encode_requests: bool,
encode_responses: bool,
}

impl DebugForceEncodeConfig {
pub async fn get_transform(&self) -> Result<Transforms> {
Ok(Transforms::DebugForceParse(DebugForceParse {
parse_requests: self.encode_requests,
parse_responses: self.encode_responses,
encode_requests: self.encode_requests,
encode_responses: self.encode_responses,
}))
}
}
Expand All @@ -29,24 +52,32 @@ impl DebugForceParseConfig {
pub struct DebugForceParse {
parse_requests: bool,
parse_responses: bool,
encode_requests: bool,
encode_responses: bool,
}

#[async_trait]
impl Transform for DebugForceParse {
async fn transform<'a>(&'a mut self, mut message_wrapper: Wrapper<'a>) -> ChainResponse {
if self.parse_requests {
for message in &mut message_wrapper.messages {
for message in &mut message_wrapper.messages {
if self.parse_requests {
message.frame();
}
if self.encode_requests {
message.invalidate_cache();
}
}

let mut response = message_wrapper.call_next_transform().await;

if self.parse_responses {
if let Ok(response) = response.as_mut() {
for message in response {
if let Ok(response) = response.as_mut() {
for message in response {
if self.parse_responses {
message.frame();
}
if self.encode_responses {
message.invalidate_cache();
}
}
}

Expand Down
6 changes: 5 additions & 1 deletion shotover-proxy/src/transforms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::transforms::chain::TransformChain;
use crate::transforms::coalesce::{Coalesce, CoalesceConfig};
use crate::transforms::debug::force_parse::DebugForceParse;
#[cfg(feature = "alpha-transforms")]
use crate::transforms::debug::force_parse::DebugForceParseConfig;
use crate::transforms::debug::force_parse::{DebugForceEncodeConfig, DebugForceParseConfig};
use crate::transforms::debug::printer::DebugPrinter;
use crate::transforms::debug::random_delay::DebugRandomDelay;
use crate::transforms::debug::returner::{DebugReturner, DebugReturnerConfig};
Expand Down Expand Up @@ -310,6 +310,8 @@ pub enum TransformsConfig {
Protect(ProtectConfig),
#[cfg(feature = "alpha-transforms")]
DebugForceParse(DebugForceParseConfig),
#[cfg(feature = "alpha-transforms")]
DebugForceEncode(DebugForceEncodeConfig),
ParallelMap(ParallelMapConfig),
//PoolConnections(ConnectionBalanceAndPoolConfig),
Coalesce(CoalesceConfig),
Expand Down Expand Up @@ -343,6 +345,8 @@ impl TransformsConfig {
TransformsConfig::Protect(p) => p.get_transform().await,
#[cfg(feature = "alpha-transforms")]
TransformsConfig::DebugForceParse(d) => d.get_transform().await,
#[cfg(feature = "alpha-transforms")]
TransformsConfig::DebugForceEncode(d) => d.get_transform().await,
TransformsConfig::RedisSinkCluster(r) => r.get_transform(chain_name).await,
TransformsConfig::ParallelMap(s) => s.get_transform().await,
//TransformsConfig::PoolConnections(s) => s.get_transform().await,
Expand Down
17 changes: 17 additions & 0 deletions shotover-proxy/tests/cassandra_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,23 @@ async fn test_passthrough(#[case] driver: CassandraDriver) {
standard_test_suite(&connection, driver).await;
}

#[rstest]
#[case(CdrsTokio)]
#[cfg_attr(feature = "cassandra-cpp-driver-tests", case(Datastax))]
#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn test_passthrough_encode(#[case] driver: CassandraDriver) {
let _compose = DockerCompose::new("example-configs/cassandra-passthrough/docker-compose.yml");

let _shotover_manager = ShotoverManager::from_topology_file(
"example-configs/cassandra-passthrough/topology-encode.yaml",
);

let connection = CassandraConnection::new("127.0.0.1", 9042, driver).await;

standard_test_suite(&connection, driver).await;
}

#[cfg(feature = "cassandra-cpp-driver-tests")]
#[rstest]
//#[case(CdrsTokio)] // TODO
Expand Down

0 comments on commit 3955be9

Please sign in to comment.