Skip to content

Commit

Permalink
rename TransformConfig::get_transform -> TransformConfig::get_builder (
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Feb 7, 2023
1 parent 2932326 commit 34a8764
Show file tree
Hide file tree
Showing 20 changed files with 50 additions and 54 deletions.
4 changes: 2 additions & 2 deletions shotover-proxy/benches/benches/chain.rs
Expand Up @@ -182,7 +182,7 @@ fn criterion_benchmark(c: &mut Criterion) {
// an absurdly large value is given so that all messages will pass through
max_requests_per_second: std::num::NonZeroU32::new(100_000_000).unwrap(),
}
.get_transform(),
.get_builder(),
)
.unwrap(),
TransformBuilder::NullSink(NullSink::default()),
Expand Down Expand Up @@ -290,7 +290,7 @@ fn criterion_benchmark(c: &mut Criterion) {
kek_id: "".to_string(),
},
}
.get_transform(),
.get_builder(),
)
.unwrap(),
TransformBuilder::NullSink(NullSink::default()),
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/transforms/cassandra/peers_rewrite.rs
Expand Up @@ -19,7 +19,7 @@ pub struct CassandraPeersRewriteConfig {
}

impl CassandraPeersRewriteConfig {
pub async fn get_transform(&self) -> Result<TransformBuilder> {
pub async fn get_builder(&self) -> Result<TransformBuilder> {
Ok(TransformBuilder::CassandraPeersRewrite(
CassandraPeersRewrite::new(self.port),
))
Expand Down
Expand Up @@ -79,7 +79,7 @@ pub struct CassandraSinkClusterConfig {
}

impl CassandraSinkClusterConfig {
pub async fn get_transform(&self, chain_name: String) -> Result<TransformBuilder> {
pub async fn get_builder(&self, chain_name: String) -> Result<TransformBuilder> {
let tls = self.tls.clone().map(TlsConnector::new).transpose()?;
let mut shotover_nodes = self.shotover_nodes.clone();
let index = self
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/transforms/cassandra/sink_single.rs
Expand Up @@ -26,7 +26,7 @@ pub struct CassandraSinkSingleConfig {
}

impl CassandraSinkSingleConfig {
pub async fn get_transform(&self, chain_name: String) -> Result<TransformBuilder> {
pub async fn get_builder(&self, chain_name: String) -> Result<TransformBuilder> {
let tls = self.tls.clone().map(TlsConnector::new).transpose()?;
Ok(TransformBuilder::CassandraSinkSingle(
CassandraSinkSingle::new(
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/transforms/coalesce.rs
Expand Up @@ -21,7 +21,7 @@ pub struct CoalesceConfig {
}

impl CoalesceConfig {
pub async fn get_transform(&self) -> Result<TransformBuilder> {
pub async fn get_builder(&self) -> Result<TransformBuilder> {
Ok(TransformBuilder::Coalesce(Coalesce {
buffer: Vec::with_capacity(self.flush_when_buffered_message_count.unwrap_or(0)),
flush_when_buffered_message_count: self.flush_when_buffered_message_count,
Expand Down
4 changes: 2 additions & 2 deletions shotover-proxy/src/transforms/debug/force_parse.rs
Expand Up @@ -19,7 +19,7 @@ pub struct DebugForceParseConfig {
}

impl DebugForceParseConfig {
pub async fn get_transform(&self) -> Result<TransformBuilder> {
pub async fn get_builder(&self) -> Result<TransformBuilder> {
Ok(TransformBuilder::DebugForceParse(DebugForceParse {
parse_requests: self.parse_requests,
parse_responses: self.parse_responses,
Expand All @@ -38,7 +38,7 @@ pub struct DebugForceEncodeConfig {
}

impl DebugForceEncodeConfig {
pub async fn get_transform(&self) -> Result<TransformBuilder> {
pub async fn get_builder(&self) -> Result<TransformBuilder> {
Ok(TransformBuilder::DebugForceParse(DebugForceParse {
parse_requests: self.encode_requests,
parse_responses: self.encode_responses,
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/transforms/debug/returner.rs
Expand Up @@ -12,7 +12,7 @@ pub struct DebugReturnerConfig {
}

impl DebugReturnerConfig {
pub async fn get_transform(&self) -> Result<TransformBuilder> {
pub async fn get_builder(&self) -> Result<TransformBuilder> {
Ok(TransformBuilder::DebugReturner(DebugReturner::new(
self.response.clone(),
)))
Expand Down
Expand Up @@ -27,7 +27,7 @@ pub struct ConsistentScatterConfig {
}

impl ConsistentScatterConfig {
pub async fn get_transform(&self) -> Result<TransformBuilder> {
pub async fn get_builder(&self) -> Result<TransformBuilder> {
let mut route_map = Vec::with_capacity(self.route_map.len());
warn!("Using this transform is considered unstable - Does not work with REDIS pipelines");

Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/transforms/filter.rs
Expand Up @@ -19,7 +19,7 @@ pub struct QueryTypeFilterConfig {
}

impl QueryTypeFilterConfig {
pub async fn get_transform(&self) -> Result<TransformBuilder> {
pub async fn get_builder(&self) -> Result<TransformBuilder> {
Ok(TransformBuilder::QueryTypeFilter(QueryTypeFilter {
filter: self.filter.clone(),
}))
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/transforms/load_balance.rs
Expand Up @@ -17,7 +17,7 @@ pub struct ConnectionBalanceAndPoolConfig {
}

impl ConnectionBalanceAndPoolConfig {
pub async fn get_transform(&self) -> Result<TransformBuilder> {
pub async fn get_builder(&self) -> Result<TransformBuilder> {
let chain = build_chain_from_config(self.name.clone(), &self.chain).await?;

Ok(TransformBuilder::PoolConnections(
Expand Down
54 changes: 25 additions & 29 deletions shotover-proxy/src/transforms/mod.rs
Expand Up @@ -401,40 +401,39 @@ pub enum TransformsConfig {

impl TransformsConfig {
#[async_recursion]
/// Return a new instance of the transform that the config is specifying.
pub async fn get_transform(&self, chain_name: String) -> Result<TransformBuilder> {
pub async fn get_builder(&self, chain_name: String) -> Result<TransformBuilder> {
match self {
TransformsConfig::CassandraSinkSingle(c) => c.get_transform(chain_name).await,
TransformsConfig::CassandraSinkCluster(c) => c.get_transform(chain_name).await,
TransformsConfig::CassandraPeersRewrite(c) => c.get_transform().await,
TransformsConfig::RedisCache(r) => r.get_transform().await,
TransformsConfig::Tee(t) => t.get_transform().await,
TransformsConfig::RedisSinkSingle(r) => r.get_transform(chain_name).await,
TransformsConfig::ConsistentScatter(c) => c.get_transform().await,
TransformsConfig::CassandraSinkSingle(c) => c.get_builder(chain_name).await,
TransformsConfig::CassandraSinkCluster(c) => c.get_builder(chain_name).await,
TransformsConfig::CassandraPeersRewrite(c) => c.get_builder().await,
TransformsConfig::RedisCache(r) => r.get_builder().await,
TransformsConfig::Tee(t) => t.get_builder().await,
TransformsConfig::RedisSinkSingle(r) => r.get_builder(chain_name).await,
TransformsConfig::ConsistentScatter(c) => c.get_builder().await,
TransformsConfig::RedisTimestampTagger => Ok(TransformBuilder::RedisTimestampTagger(
RedisTimestampTagger::new(),
)),
TransformsConfig::RedisClusterPortsRewrite(r) => r.get_transform().await,
TransformsConfig::RedisClusterPortsRewrite(r) => r.get_builder().await,
TransformsConfig::DebugPrinter => {
Ok(TransformBuilder::DebugPrinter(DebugPrinter::new()))
}
TransformsConfig::DebugReturner(d) => d.get_transform().await,
TransformsConfig::DebugReturner(d) => d.get_builder().await,
TransformsConfig::NullSink => Ok(TransformBuilder::NullSink(NullSink::default())),
#[cfg(test)]
TransformsConfig::Loopback => Ok(TransformBuilder::Loopback(Loopback::default())),
#[cfg(feature = "alpha-transforms")]
TransformsConfig::Protect(p) => p.get_transform().await,
TransformsConfig::Protect(p) => p.get_builder().await,
#[cfg(feature = "alpha-transforms")]
TransformsConfig::DebugForceParse(d) => d.get_transform().await,
TransformsConfig::DebugForceParse(d) => d.get_builder().await,
#[cfg(feature = "alpha-transforms")]
TransformsConfig::DebugForceEncode(d) => d.get_transform().await,
TransformsConfig::RedisSinkCluster(r) => r.get_transform(chain_name).await,
TransformsConfig::ParallelMap(s) => s.get_transform().await,
//TransformsConfig::PoolConnections(s) => s.get_transform().await,
TransformsConfig::Coalesce(s) => s.get_transform().await,
TransformsConfig::QueryTypeFilter(s) => s.get_transform().await,
TransformsConfig::QueryCounter(s) => s.get_transform().await,
TransformsConfig::RequestThrottling(s) => s.get_transform().await,
TransformsConfig::DebugForceEncode(d) => d.get_builder().await,
TransformsConfig::RedisSinkCluster(r) => r.get_builder(chain_name).await,
TransformsConfig::ParallelMap(s) => s.get_builder().await,
//TransformsConfig::PoolConnections(s) => s.get_builder().await,
TransformsConfig::Coalesce(s) => s.get_builder().await,
TransformsConfig::QueryTypeFilter(s) => s.get_builder().await,
TransformsConfig::QueryCounter(s) => s.get_builder().await,
TransformsConfig::RequestThrottling(s) => s.get_builder().await,
}
}
}
Expand All @@ -445,7 +444,7 @@ pub async fn build_chain_from_config(
) -> Result<TransformChainBuilder> {
let mut transforms: Vec<TransformBuilder> = Vec::new();
for tc in transform_configs {
transforms.push(tc.get_transform(name.clone()).await?)
transforms.push(tc.get_builder(name.clone()).await?)
}
Ok(TransformChainBuilder::new(transforms, name))
}
Expand Down Expand Up @@ -633,21 +632,18 @@ impl<'a> Wrapper<'a> {
/// The trait has one method where you implement the majority of your logic [Transform::transform],
/// however it also includes a setup and naming method.
///
/// Transforms are cloned on a per TCP connection basis from a copy of the struct originally created
/// by the call to the `get_transform` method on each transform's config struct.
/// Transforms are created on a per TCP connection basis by calling `TransformBuilder::build()`.
/// This means that each member of your struct that implements this trait can be considered private for
/// each TCP connection or connected client. If you wish to share data between all copies of your struct
/// then wrapping a member in an [`Arc<Mutex<_>>`](std::sync::Mutex) will achieve that.
///
/// Changing the clone behavior of this struct can also control this behavior.
/// then wrapping a member in an [`Arc<Mutex<_>>`](std::sync::Mutex) will achieve that,
/// but make sure to copy the value from the TransformBuilder to ensure all instances are referring to the same value.
///
/// Once you have created your [`Transform`], you will need to create
/// new enum variants in [Transforms] and [TransformsConfig] to make them configurable in Shotover.
/// new enum variants in [Transforms], [TransformBuilder] and [TransformsConfig] to make them configurable in Shotover.
/// Shotover uses a concept called enum dispatch to provide dynamic configuration of transform chains
/// with minimal impact on performance.
///
/// Implementing this trait is usually done using `#[async_trait]` macros.
///
#[async_trait]
pub trait Transform: Send {
/// This method should be implemented by your transform. The wrapper object contains the queries/
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/transforms/parallel_map.rs
Expand Up @@ -74,7 +74,7 @@ pub struct ParallelMapConfig {
}

impl ParallelMapConfig {
pub async fn get_transform(&self) -> Result<TransformBuilder> {
pub async fn get_builder(&self) -> Result<TransformBuilder> {
let chain = build_chain_from_config("parallel_map_chain".into(), &self.chain).await?;

Ok(TransformBuilder::ParallelMap(ParallelMapBuilder {
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/transforms/protect/mod.rs
Expand Up @@ -26,7 +26,7 @@ pub struct ProtectConfig {
}

impl ProtectConfig {
pub async fn get_transform(&self) -> Result<TransformBuilder> {
pub async fn get_builder(&self) -> Result<TransformBuilder> {
Ok(TransformBuilder::Protect(Box::new(Protect {
keyspace_table_columns: self
.keyspace_table_columns
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/transforms/query_counter.rs
Expand Up @@ -72,7 +72,7 @@ fn get_redis_query_type(frame: &RedisFrame) -> Option<String> {
}

impl QueryCounterConfig {
pub async fn get_transform(&self) -> Result<TransformBuilder> {
pub async fn get_builder(&self) -> Result<TransformBuilder> {
Ok(TransformBuilder::QueryCounter(QueryCounter::new(
self.name.clone(),
)))
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/transforms/redis/cache.rs
Expand Up @@ -82,7 +82,7 @@ pub struct RedisConfig {
}

impl RedisConfig {
pub async fn get_transform(&self) -> Result<TransformBuilder> {
pub async fn get_builder(&self) -> Result<TransformBuilder> {
let missed_requests = register_counter!("cache_miss");

let caching_schema: HashMap<FQName, TableCacheSchema> = self
Expand Down
Expand Up @@ -14,7 +14,7 @@ pub struct RedisClusterPortsRewriteConfig {
}

impl RedisClusterPortsRewriteConfig {
pub async fn get_transform(&self) -> Result<TransformBuilder> {
pub async fn get_builder(&self) -> Result<TransformBuilder> {
Ok(TransformBuilder::RedisClusterPortsRewrite(
RedisClusterPortsRewrite {
new_port: self.new_port,
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/transforms/redis/sink_cluster.rs
Expand Up @@ -43,7 +43,7 @@ pub struct RedisSinkClusterConfig {
}

impl RedisSinkClusterConfig {
pub async fn get_transform(&self, chain_name: String) -> Result<TransformBuilder> {
pub async fn get_builder(&self, chain_name: String) -> Result<TransformBuilder> {
let mut cluster = RedisSinkCluster::new(
self.first_contact_points.clone(),
self.direct_destination.clone(),
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/transforms/redis/sink_single.rs
Expand Up @@ -29,7 +29,7 @@ pub struct RedisSinkSingleConfig {
}

impl RedisSinkSingleConfig {
pub async fn get_transform(&self, chain_name: String) -> Result<TransformBuilder> {
pub async fn get_builder(&self, chain_name: String) -> Result<TransformBuilder> {
let tls = self.tls.clone().map(TlsConnector::new).transpose()?;
Ok(TransformBuilder::RedisSinkSingle(RedisSinkSingle::new(
self.address.clone(),
Expand Down
10 changes: 5 additions & 5 deletions shotover-proxy/src/transforms/tee.rs
Expand Up @@ -50,7 +50,7 @@ pub struct TeeConfig {
}

impl TeeConfig {
pub async fn get_transform(&self) -> Result<TransformBuilder> {
pub async fn get_builder(&self) -> Result<TransformBuilder> {
let buffer_size = self.buffer_size.unwrap_or(5);
let mismatch_chain =
if let Some(ConsistencyBehavior::SubchainOnMismatch(mismatch_chain)) = &self.behavior {
Expand Down Expand Up @@ -190,7 +190,7 @@ mod tests {
chain: vec![TransformsConfig::NullSink],
buffer_size: None,
};
let transform = config.get_transform().await.unwrap();
let transform = config.get_builder().await.unwrap();
let result = transform.validate();
assert_eq!(result, Vec::<String>::new());
}
Expand All @@ -202,7 +202,7 @@ mod tests {
chain: vec![TransformsConfig::NullSink],
buffer_size: None,
};
let transform = config.get_transform().await.unwrap();
let transform = config.get_builder().await.unwrap();
let result = transform.validate();
assert_eq!(result, Vec::<String>::new());
}
Expand All @@ -220,7 +220,7 @@ mod tests {
buffer_size: None,
};

let transform = config.get_transform().await.unwrap();
let transform = config.get_builder().await.unwrap();
let result = transform.validate();
let expected = vec!["Tee:", " mismatch_chain:", " Terminating transform \"NullSink\" is not last in chain. Terminating transform must be last in chain."];
assert_eq!(result, expected);
Expand All @@ -237,7 +237,7 @@ mod tests {
buffer_size: None,
};

let transform = config.get_transform().await.unwrap();
let transform = config.get_builder().await.unwrap();
let result = transform.validate();
assert_eq!(result, Vec::<String>::new());
}
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/transforms/throttling.rs
Expand Up @@ -22,7 +22,7 @@ pub struct RequestThrottlingConfig {
}

impl RequestThrottlingConfig {
pub async fn get_transform(&self) -> Result<TransformBuilder> {
pub async fn get_builder(&self) -> Result<TransformBuilder> {
Ok(TransformBuilder::RequestThrottling(RequestThrottling {
limiter: Arc::new(RateLimiter::direct(Quota::per_second(
self.max_requests_per_second,
Expand Down

0 comments on commit 34a8764

Please sign in to comment.