Skip to content

Commit

Permalink
protocol level instead of transform
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros committed Jan 30, 2023
1 parent 6f319a2 commit e9c409f
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 138 deletions.
11 changes: 11 additions & 0 deletions shotover-proxy/src/codec/cassandra.rs
Expand Up @@ -6,6 +6,7 @@ use anyhow::{anyhow, Result};
use bytes::{Buf, BufMut, BytesMut};
use cassandra_protocol::compression::Compression;
use cassandra_protocol::frame::message_error::{ErrorBody, ErrorType};
use cassandra_protocol::frame::message_supported::BodyResSupported;
use cassandra_protocol::frame::{
CheckEnvelopeSizeError, Envelope as RawCassandraFrame, Opcode, Version,
};
Expand Down Expand Up @@ -80,6 +81,16 @@ impl Decoder for CassandraCodec {
return Err(reject_compression(frame.stream_id, compression));
}
}

if let CassandraOperation::Supported(BodyResSupported { data }) =
&mut frame.operation
{
if let Some(value) = data.get_mut("COMPRESSION") {
*value = vec![];
}

message.invalidate_cache();
}
}

if let Ok(Metadata::Cassandra(CassandraMetadata {
Expand Down
1 change: 0 additions & 1 deletion shotover-proxy/src/transforms/cassandra/mod.rs
@@ -1,5 +1,4 @@
mod connection;
pub mod options_rewrite;
pub mod peers_rewrite;
pub mod sink_cluster;
pub mod sink_single;
64 changes: 0 additions & 64 deletions shotover-proxy/src/transforms/cassandra/options_rewrite.rs

This file was deleted.

14 changes: 0 additions & 14 deletions shotover-proxy/src/transforms/mod.rs
@@ -1,8 +1,5 @@
use crate::error::ChainResponse;
use crate::message::Messages;
use crate::transforms::cassandra::options_rewrite::{
CassandraOptionsRewrite, CassandraOptionsRewriteConfig,
};
use crate::transforms::cassandra::peers_rewrite::{
CassandraPeersRewrite, CassandraPeersRewriteConfig,
};
Expand Down Expand Up @@ -84,7 +81,6 @@ pub enum TransformBuilder {
CassandraSinkCluster(Box<CassandraSinkClusterBuilder>),
RedisSinkSingle(RedisSinkSingle),
CassandraPeersRewrite(CassandraPeersRewrite),
CassandraOptionsRewrite(CassandraOptionsRewrite),
RedisCache(SimpleRedisCacheBuilder),
Tee(Tee),
Null(Null),
Expand Down Expand Up @@ -115,7 +111,6 @@ impl TransformBuilder {
Transforms::CassandraSinkCluster(t.build())
}
TransformBuilder::CassandraPeersRewrite(t) => Transforms::CassandraPeersRewrite(t),
TransformBuilder::CassandraOptionsRewrite(t) => Transforms::CassandraOptionsRewrite(t),
TransformBuilder::RedisCache(t) => Transforms::RedisCache(t.build()),
TransformBuilder::Tee(t) => Transforms::Tee(t),
TransformBuilder::RedisSinkSingle(t) => Transforms::RedisSinkSingle(t),
Expand Down Expand Up @@ -151,7 +146,6 @@ impl TransformBuilder {
TransformBuilder::CassandraSinkSingle(c) => c.validate(),
TransformBuilder::CassandraSinkCluster(c) => c.validate(),
TransformBuilder::CassandraPeersRewrite(c) => c.validate(),
TransformBuilder::CassandraOptionsRewrite(c) => c.validate(),
TransformBuilder::RedisCache(r) => r.validate(),
TransformBuilder::Tee(t) => t.validate(),
TransformBuilder::RedisSinkSingle(r) => r.validate(),
Expand Down Expand Up @@ -181,7 +175,6 @@ impl TransformBuilder {
TransformBuilder::CassandraSinkSingle(c) => c.is_terminating(),
TransformBuilder::CassandraSinkCluster(c) => c.is_terminating(),
TransformBuilder::CassandraPeersRewrite(c) => c.is_terminating(),
TransformBuilder::CassandraOptionsRewrite(c) => c.is_terminating(),
TransformBuilder::RedisCache(r) => r.is_terminating(),
TransformBuilder::Tee(t) => t.is_terminating(),
TransformBuilder::RedisSinkSingle(r) => r.is_terminating(),
Expand Down Expand Up @@ -223,7 +216,6 @@ pub enum Transforms {
CassandraSinkCluster(Box<CassandraSinkCluster>),
RedisSinkSingle(RedisSinkSingle),
CassandraPeersRewrite(CassandraPeersRewrite),
CassandraOptionsRewrite(CassandraOptionsRewrite),
RedisCache(SimpleRedisCache),
Tee(Tee),
Null(Null),
Expand Down Expand Up @@ -258,7 +250,6 @@ impl Transforms {
Transforms::CassandraSinkSingle(c) => c.transform(message_wrapper).await,
Transforms::CassandraSinkCluster(c) => c.transform(message_wrapper).await,
Transforms::CassandraPeersRewrite(c) => c.transform(message_wrapper).await,
Transforms::CassandraOptionsRewrite(c) => c.transform(message_wrapper).await,
Transforms::RedisCache(r) => r.transform(message_wrapper).await,
Transforms::Tee(m) => m.transform(message_wrapper).await,
Transforms::DebugPrinter(p) => p.transform(message_wrapper).await,
Expand Down Expand Up @@ -287,7 +278,6 @@ impl Transforms {
match self {
Transforms::CassandraSinkSingle(c) => c.transform_pushed(message_wrapper).await,
Transforms::CassandraSinkCluster(c) => c.transform_pushed(message_wrapper).await,
Transforms::CassandraOptionsRewrite(c) => c.transform_pushed(message_wrapper).await,
Transforms::CassandraPeersRewrite(c) => c.transform_pushed(message_wrapper).await,
Transforms::RedisCache(r) => r.transform_pushed(message_wrapper).await,
Transforms::Tee(m) => m.transform_pushed(message_wrapper).await,
Expand Down Expand Up @@ -322,7 +312,6 @@ impl Transforms {
Transforms::CassandraSinkSingle(a) => a.prep_transform_chain(t).await,
Transforms::CassandraSinkCluster(a) => a.prep_transform_chain(t).await,
Transforms::CassandraPeersRewrite(c) => c.prep_transform_chain(t).await,
Transforms::CassandraOptionsRewrite(c) => c.prep_transform_chain(t).await,
Transforms::RedisSinkSingle(a) => a.prep_transform_chain(t).await,
Transforms::RedisCache(a) => a.prep_transform_chain(t).await,
Transforms::Tee(a) => a.prep_transform_chain(t).await,
Expand Down Expand Up @@ -352,7 +341,6 @@ impl Transforms {
Transforms::CassandraSinkSingle(c) => c.set_pushed_messages_tx(pushed_messages_tx),
Transforms::CassandraSinkCluster(c) => c.set_pushed_messages_tx(pushed_messages_tx),
Transforms::CassandraPeersRewrite(c) => c.set_pushed_messages_tx(pushed_messages_tx),
Transforms::CassandraOptionsRewrite(c) => c.set_pushed_messages_tx(pushed_messages_tx),
Transforms::RedisCache(r) => r.set_pushed_messages_tx(pushed_messages_tx),
Transforms::Tee(t) => t.set_pushed_messages_tx(pushed_messages_tx),
Transforms::RedisSinkSingle(r) => r.set_pushed_messages_tx(pushed_messages_tx),
Expand Down Expand Up @@ -386,7 +374,6 @@ pub enum TransformsConfig {
CassandraSinkCluster(CassandraSinkClusterConfig),
RedisSinkSingle(RedisSinkSingleConfig),
CassandraPeersRewrite(CassandraPeersRewriteConfig),
CassandraOptionsRewrite(CassandraOptionsRewriteConfig),
RedisCache(RedisConfig),
Tee(TeeConfig),
ConsistentScatter(ConsistentScatterConfig),
Expand Down Expand Up @@ -420,7 +407,6 @@ impl TransformsConfig {
TransformsConfig::CassandraSinkSingle(c) => c.get_transform(chain_name).await,
TransformsConfig::CassandraSinkCluster(c) => c.get_transform(chain_name).await,
TransformsConfig::CassandraPeersRewrite(c) => c.get_transform().await,
TransformsConfig::CassandraOptionsRewrite(c) => c.get_transform().await,
TransformsConfig::RedisCache(r) => r.get_transform().await,
TransformsConfig::Tee(t) => t.get_transform().await,
TransformsConfig::RedisSinkSingle(r) => r.get_transform(chain_name).await,
Expand Down
59 changes: 0 additions & 59 deletions shotover-proxy/tests/cassandra_int_tests/mod.rs
@@ -1,14 +1,8 @@
use crate::helpers::ShotoverManager;
use cassandra_protocol::frame::message_error::{ErrorBody, ErrorType};
use cdrs_tokio::cluster::send_envelope::send_envelope;
use cdrs_tokio::frame::events::{
SchemaChange, SchemaChangeOptions, SchemaChangeTarget, SchemaChangeType, ServerEvent,
};
use cdrs_tokio::frame::message_response::ResponseBody;
use cdrs_tokio::frame::message_supported::BodyResSupported;
use cdrs_tokio::frame::Envelope;
use cdrs_tokio::frame::Version;
use cdrs_tokio::retry::DefaultRetrySession;
use futures::future::join_all;
use futures::Future;
use metrics_util::debugging::DebuggingRecorder;
Expand Down Expand Up @@ -582,59 +576,6 @@ async fn peers_rewrite_v3(#[case] driver: CassandraDriver) {
shotover.shutdown_and_then_consume_events(&[]).await;
}

#[rstest]
#[case::cdrs(CdrsTokio)]
#[tokio::test(flavor = "multi_thread")]
#[serial]
async fn test_options_rewrite(#[case] driver: CassandraDriver) {
let _docker_compose =
DockerCompose::new("tests/test-configs/cassandra-options-rewrite/docker-compose.yaml");

let _shotover_manager = ShotoverManager::from_topology_file(
"tests/test-configs/cassandra-options-rewrite/topology.yaml",
);

let envelope = Envelope::new_req_options(Version::V4);

let normal_connection = CassandraConnection::new("127.0.0.1", 9043, driver).await;
let normal_query_plan = normal_connection.as_cdrs().query_plan(None);
let normal_result = send_envelope(
normal_query_plan.into_iter(),
&envelope,
false,
Box::<DefaultRetrySession>::default(),
)
.await
.unwrap()
.unwrap()
.response_body()
.unwrap();
if let ResponseBody::Supported(BodyResSupported { data }) = normal_result {
assert_eq!(*data.get("CQL_VERSION").unwrap(), vec!["3.4.5".to_string()]);
}

let options_rewrite_connection = CassandraConnection::new("127.0.0.1", 9044, driver).await;
let options_rewrite_query_plan = options_rewrite_connection.as_cdrs().query_plan(None);
let options_rewrite_result = send_envelope(
options_rewrite_query_plan.into_iter(),
&envelope,
false,
Box::<DefaultRetrySession>::default(),
)
.await
.unwrap()
.unwrap()
.response_body()
.unwrap();

if let ResponseBody::Supported(BodyResSupported { data }) = options_rewrite_result {
assert_eq!(
*data.get("CQL_VERSION").unwrap(),
vec!["changed".to_string()]
);
}
}

#[rstest]
//#[case::cdrs(CdrsTokio)] // TODO: cdrs-tokio seems to be sending extra messages triggering the rate limiter
#[case::scylla(Scylla)]
Expand Down
1 change: 1 addition & 0 deletions test-helpers/src/connection/cassandra.rs
Expand Up @@ -179,6 +179,7 @@ impl CassandraConnection {
)
.user("cassandra", "cassandra")
.default_consistency(Consistency::One)
.compression(Some(scylla::transport::Compression::Snappy))
.build()
.await
.unwrap();
Expand Down

0 comments on commit e9c409f

Please sign in to comment.