diff --git a/docs/src/transforms.md b/docs/src/transforms.md index 7ab5d7832..cd32c00b8 100644 --- a/docs/src/transforms.md +++ b/docs/src/transforms.md @@ -24,27 +24,27 @@ Future transforms won't be added to the public API while in alpha. But in these ## Transforms -| Transform | Terminating | Implementation Status | -|-------------------------------------------------------|-------------|-----------------------| -| [CassandraSinkCluster](#cassandrasinkcluster) | ✅ | Beta | -| [CassandraSinkSingle](#cassandrasinksingle) | ✅ | Alpha | -| [CassandraPeersRewrite](#cassandrapeersrewrite) | ❌ | Alpha | -| [Coalesce](#coalesce) | ❌ | Alpha | -| [ConsistentScatter](#consistentscatter) | ✅ | Alpha | -| [DebugPrinter](#debugprinter) | ❌ | Alpha | -| [DebugReturner](#debugreturner) | ✅ | Alpha | -| [NullSink](#NullSink) | ✅ | Beta | -| [ParallelMap](#parallelmap) | ✅ | Alpha | -| [Protect](#protect) | ❌ | Alpha | -| [QueryCounter](#querycounter) | ❌ | Alpha | -| [QueryTypeFilter](#querytypefilter) | ❌ | Alpha | -| [RedisCache](#rediscache) | ❌ | Alpha | -| [RedisClusterPortsRewrite](#redisclusterportsrewrite) | ❌ | Beta | -| [RedisSinkCluster](#redissinkcluster) | ✅ | Beta | -| [RedisSinkSingle](#redissinksingle) | ✅ | Beta | -| [RedisTimestampTagger](#redistimestamptagger) | ❌ | Alpha | -| [Tee](#tee) | ✅ | Alpha | -| [RequestThrottling](#requestthrottling) |❌ | Alpha | +| Transform | Terminating | Implementation Status | +|----------------------------------------------------------|-------------|-----------------------| +| [CassandraSinkCluster](#cassandrasinkcluster) | ✅ | Beta | +| [CassandraSinkSingle](#cassandrasinksingle) | ✅ | Alpha | +| [CassandraPeersRewrite](#cassandrapeersrewrite) | ❌ | Alpha | +| [Coalesce](#coalesce) | ❌ | Alpha | +| [TuneableConsistencyScatter](#tuneableconsistencyscatter)| ✅ | Alpha | +| [DebugPrinter](#debugprinter) | ❌ | Alpha | +| [DebugReturner](#debugreturner) | ✅ | Alpha | +| [NullSink](#nullsink) | ✅ | Beta | +| [ParallelMap](#parallelmap) | ✅ | Alpha | +| [Protect](#protect) | ❌ | Alpha | +| [QueryCounter](#querycounter) | ❌ | Alpha | +| [QueryTypeFilter](#querytypefilter) | ❌ | Alpha | +| [RedisCache](#rediscache) | ❌ | Alpha | +| [RedisClusterPortsRewrite](#redisclusterportsrewrite) | ❌ | Beta | +| [RedisSinkCluster](#redissinkcluster) | ✅ | Beta | +| [RedisSinkSingle](#redissinksingle) | ✅ | Beta | +| [RedisTimestampTagger](#redistimestamptagger) | ❌ | Alpha | +| [Tee](#tee) | ✅ | Alpha | +| [RequestThrottling](#requestthrottling) |❌ | Alpha | ### CassandraSinkCluster @@ -204,7 +204,7 @@ Validation will fail if none of the `flush_when_` fields are provided, as this w flush_when_millis_since_last_flush: 10000 ``` -### ConsistentScatter +### TuneableConsistencyScatter This transform implements a distributed eventual consistency mechanism between the set of defined sub-chains. This transform will wait for a user configurable number of chains to return an OK response before returning the value up-chain. This follows a similar model as used by Cassandra for its consistency model. Strong consistency can be achieved when W + R > RF. In this case RF is always the number of chains in the `route_map`. @@ -213,7 +213,7 @@ No sharding occurs within this transform and all requests/messages are sent to a Upon receiving the configured number of responses, the transform will attempt to resolve or unify the response based on metadata about the result. Currently it will try to return the newest response based on a metadata timestamp (last write wins) or it will simply return the largest response if no timestamp information is available. ```yaml -- ConsistentScatter: +- TuneableConsistencyScatter: # The number of chains to wait for a "write" response on. write_consistency: 2 # The number of chains to wait for a "read" response on. @@ -503,7 +503,7 @@ This transfrom emits a metrics [counter](user-guide/observability.md#counter) na A transform that wraps each Redis command in a Lua script that also fetches the key for the operations idletime. This is then used to build a last modified timestamp and insert it into a response's timestamp. The response from the Lua operation is unwrapped and returned to up-chain transforms looking like a normal Redis response. -This is mainly used in conjunction with the `ConsistentScatter` transform to enable a Cassandra style consistency model within Redis. +This is mainly used in conjunction with the `TuneableConsistencyScatter` transform to enable a Cassandra style consistency model within Redis. ```yaml - RedisTimestampTagger diff --git a/shotover-proxy/example-configs/redis-multi/topology.yaml b/shotover-proxy/example-configs/redis-multi/topology.yaml index 2cfe139e5..fb56db211 100644 --- a/shotover-proxy/example-configs/redis-multi/topology.yaml +++ b/shotover-proxy/example-configs/redis-multi/topology.yaml @@ -5,7 +5,7 @@ sources: listen_addr: "127.0.0.1:6379" chain_config: redis_chain: - - ConsistentScatter: + - TuneableConsistencyScatter: write_consistency: 2 read_consistency: 2 route_map: diff --git a/shotover-proxy/src/config/topology.rs b/shotover-proxy/src/config/topology.rs index 3911e6b8b..edce2b34e 100644 --- a/shotover-proxy/src/config/topology.rs +++ b/shotover-proxy/src/config/topology.rs @@ -100,7 +100,7 @@ mod topology_tests { use crate::{ sources::{redis::RedisConfig, Sources, SourcesConfig}, transforms::{ - distributed::consistent_scatter::ConsistentScatterConfig, + distributed::tuneable_consistency_scatter::TuneableConsistencyScatterConfig, parallel_map::ParallelMapConfig, redis::cache::RedisConfig as RedisCacheConfig, TransformsConfig, }, @@ -242,7 +242,7 @@ redis_chain: } #[tokio::test] - async fn test_validate_chain_valid_subchain_consistent_scatter() { + async fn test_validate_chain_valid_subchain_scatter() { let subchain = vec![ TransformsConfig::DebugPrinter, TransformsConfig::DebugPrinter, @@ -255,7 +255,7 @@ redis_chain: run_test_topology(vec![ TransformsConfig::DebugPrinter, TransformsConfig::DebugPrinter, - TransformsConfig::ConsistentScatter(ConsistentScatterConfig { + TransformsConfig::TuneableConsistencyScatter(TuneableConsistencyScatterConfig { route_map, write_consistency: 1, read_consistency: 1, @@ -266,10 +266,10 @@ redis_chain: } #[tokio::test] - async fn test_validate_chain_invalid_subchain_consistent_scatter() { + async fn test_validate_chain_invalid_subchain_scatter() { let expected = r#"Topology errors redis_chain: - ConsistentScatter: + TuneableConsistencyScatter: subchain-1: Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain. "#; @@ -287,7 +287,7 @@ redis_chain: let error = run_test_topology(vec![ TransformsConfig::DebugPrinter, TransformsConfig::DebugPrinter, - TransformsConfig::ConsistentScatter(ConsistentScatterConfig { + TransformsConfig::TuneableConsistencyScatter(TuneableConsistencyScatterConfig { route_map, write_consistency: 1, read_consistency: 1, @@ -412,7 +412,7 @@ redis_chain: async fn test_validate_chain_subchain_terminating_in_middle() { let expected = r#"Topology errors redis_chain: - ConsistentScatter: + TuneableConsistencyScatter: subchain-1: Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain. "#; @@ -430,7 +430,7 @@ redis_chain: let error = run_test_topology(vec![ TransformsConfig::DebugPrinter, TransformsConfig::DebugPrinter, - TransformsConfig::ConsistentScatter(ConsistentScatterConfig { + TransformsConfig::TuneableConsistencyScatter(TuneableConsistencyScatterConfig { route_map, write_consistency: 1, read_consistency: 1, @@ -447,7 +447,7 @@ redis_chain: async fn test_validate_chain_subchain_non_terminating_at_end() { let expected = r#"Topology errors redis_chain: - ConsistentScatter: + TuneableConsistencyScatter: subchain-1: Non-terminating transform "DebugPrinter" is last in chain. Last transform must be terminating. "#; @@ -463,7 +463,7 @@ redis_chain: let error = run_test_topology(vec![ TransformsConfig::DebugPrinter, TransformsConfig::DebugPrinter, - TransformsConfig::ConsistentScatter(ConsistentScatterConfig { + TransformsConfig::TuneableConsistencyScatter(TuneableConsistencyScatterConfig { route_map, write_consistency: 1, read_consistency: 1, @@ -480,7 +480,7 @@ redis_chain: async fn test_validate_chain_subchain_terminating_middle_non_terminating_at_end() { let expected = r#"Topology errors redis_chain: - ConsistentScatter: + TuneableConsistencyScatter: subchain-1: Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain. Non-terminating transform "DebugPrinter" is last in chain. Last transform must be terminating. @@ -498,7 +498,7 @@ redis_chain: let error = run_test_topology(vec![ TransformsConfig::DebugPrinter, TransformsConfig::DebugPrinter, - TransformsConfig::ConsistentScatter(ConsistentScatterConfig { + TransformsConfig::TuneableConsistencyScatter(TuneableConsistencyScatterConfig { route_map, write_consistency: 1, read_consistency: 1, @@ -528,14 +528,14 @@ a_first_chain: Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain. Non-terminating transform "DebugPrinter" is last in chain. Last transform must be terminating. b_second_chain: - ConsistentScatter: + TuneableConsistencyScatter: a_chain_1: Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain. Non-terminating transform "DebugPrinter" is last in chain. Last transform must be terminating. b_chain_2: Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain. c_chain_3: - ConsistentScatter: + TuneableConsistencyScatter: sub_chain_2: Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain. "#; diff --git a/shotover-proxy/src/message/mod.rs b/shotover-proxy/src/message/mod.rs index 859eba5e9..4d184d7eb 100644 --- a/shotover-proxy/src/message/mod.rs +++ b/shotover-proxy/src/message/mod.rs @@ -64,7 +64,7 @@ pub struct Message { /// The only reason it is an Option is to allow temporarily taking ownership of the value from an &mut T inner: Option, - // TODO: Not a fan of this field and we could get rid of it by making TimestampTagger an implicit part of ConsistentScatter + // TODO: Not a fan of this field and we could get rid of it by making TimestampTagger an implicit part of TuneableConsistencyScatter // This metadata field is only used for communication between transforms and should not be touched by sinks or sources pub meta_timestamp: Option, diff --git a/shotover-proxy/src/transforms/distributed/mod.rs b/shotover-proxy/src/transforms/distributed/mod.rs index 5371e67df..d6065e321 100644 --- a/shotover-proxy/src/transforms/distributed/mod.rs +++ b/shotover-proxy/src/transforms/distributed/mod.rs @@ -1 +1 @@ -pub mod consistent_scatter; +pub mod tuneable_consistency_scatter; diff --git a/shotover-proxy/src/transforms/distributed/consistent_scatter.rs b/shotover-proxy/src/transforms/distributed/tuneable_consistency_scatter.rs similarity index 89% rename from shotover-proxy/src/transforms/distributed/consistent_scatter.rs rename to shotover-proxy/src/transforms/distributed/tuneable_consistency_scatter.rs index 55eb0fe9d..f7557458a 100644 --- a/shotover-proxy/src/transforms/distributed/consistent_scatter.rs +++ b/shotover-proxy/src/transforms/distributed/tuneable_consistency_scatter.rs @@ -13,13 +13,13 @@ use std::collections::HashMap; use tracing::{debug, error, trace, warn}; #[derive(Deserialize, Debug)] -pub struct ConsistentScatterConfig { +pub struct TuneableConsistencyScatterConfig { pub route_map: HashMap>, pub write_consistency: i32, pub read_consistency: i32, } -impl ConsistentScatterConfig { +impl TuneableConsistencyScatterConfig { pub async fn get_builder(&self) -> Result> { let mut route_map = Vec::with_capacity(self.route_map.len()); warn!("Using this transform is considered unstable - Does not work with REDIS pipelines"); @@ -29,7 +29,7 @@ impl ConsistentScatterConfig { } route_map.sort_by_key(|x| x.name.clone()); - Ok(Box::new(ConsistentScatterBuilder { + Ok(Box::new(TuneableConsistencyScatterBuilder { route_map, write_consistency: self.write_consistency, read_consistency: self.read_consistency, @@ -38,15 +38,15 @@ impl ConsistentScatterConfig { } #[derive(Clone)] -pub struct ConsistentScatterBuilder { +pub struct TuneableConsistencyScatterBuilder { route_map: Vec, write_consistency: i32, read_consistency: i32, } -impl TransformBuilder for ConsistentScatterBuilder { +impl TransformBuilder for TuneableConsistencyScatterBuilder { fn build(&self) -> Transforms { - Transforms::ConsistentScatter(ConsistentScatter { + Transforms::TuneableConsistencyScatter(TuneableConsistentencyScatter { route_map: self .route_map .iter() @@ -58,7 +58,7 @@ impl TransformBuilder for ConsistentScatterBuilder { } fn get_name(&self) -> &'static str { - "ConsistentScatter" + "TuneableConsistencyScatter" } fn is_terminating(&self) -> bool { @@ -85,7 +85,7 @@ impl TransformBuilder for ConsistentScatterBuilder { } #[derive(Clone)] -pub struct ConsistentScatter { +pub struct TuneableConsistentencyScatter { route_map: Vec, write_consistency: i32, read_consistency: i32, @@ -140,7 +140,7 @@ fn resolve_fragments(fragments: &mut Vec) -> Option { } #[async_trait] -impl Transform for ConsistentScatter { +impl Transform for TuneableConsistentencyScatter { async fn transform<'a>(&'a mut self, mut message_wrapper: Wrapper<'a>) -> ChainResponse { let required_successes: Vec<_> = message_wrapper .messages @@ -216,8 +216,8 @@ mod scatter_transform_tests { use crate::transforms::chain::{BufferedChain, TransformChainBuilder}; use crate::transforms::debug::printer::DebugPrinter; use crate::transforms::debug::returner::{DebugReturner, Response}; - use crate::transforms::distributed::consistent_scatter::{ - ConsistentScatter, ConsistentScatterBuilder, + use crate::transforms::distributed::tuneable_consistency_scatter::{ + TuneableConsistencyScatterBuilder, TuneableConsistentencyScatter, }; use crate::transforms::null::NullSink; use crate::transforms::{TransformBuilder, Transforms, Wrapper}; @@ -270,11 +270,12 @@ mod scatter_transform_tests { TransformChainBuilder::new(vec![err_repeat.clone()], "three".to_string()), ); - let mut tuneable_success_consistency = Transforms::ConsistentScatter(ConsistentScatter { - route_map: build_chains(two_of_three).await, - write_consistency: 2, - read_consistency: 2, - }); + let mut tuneable_success_consistency = + Transforms::TuneableConsistencyScatter(TuneableConsistentencyScatter { + route_map: build_chains(two_of_three).await, + write_consistency: 2, + read_consistency: 2, + }); let test = tuneable_success_consistency .transform(wrapper.clone()) @@ -297,11 +298,12 @@ mod scatter_transform_tests { TransformChainBuilder::new(vec![err_repeat.clone()], "three".to_string()), ); - let mut tuneable_fail_consistency = Transforms::ConsistentScatter(ConsistentScatter { - route_map: build_chains(one_of_three).await, - write_consistency: 2, - read_consistency: 2, - }); + let mut tuneable_fail_consistency = + Transforms::TuneableConsistencyScatter(TuneableConsistentencyScatter { + route_map: build_chains(one_of_three).await, + write_consistency: 2, + read_consistency: 2, + }); let response_fail = tuneable_fail_consistency .transform(wrapper.clone()) @@ -323,7 +325,7 @@ mod scatter_transform_tests { ); let chain_2 = TransformChainBuilder::new(vec![], "test-chain-2".to_string()); - let transform = ConsistentScatterBuilder { + let transform = TuneableConsistencyScatterBuilder { route_map: vec![chain_1, chain_2], write_consistency: 1, read_consistency: 1, @@ -332,7 +334,7 @@ mod scatter_transform_tests { assert_eq!( transform.validate(), vec![ - "ConsistentScatter:", + "TuneableConsistencyScatter:", " test-chain-2:", " Chain cannot be empty" ] @@ -358,7 +360,7 @@ mod scatter_transform_tests { "test-chain-2".to_string(), ); - let transform = ConsistentScatterBuilder { + let transform = TuneableConsistencyScatterBuilder { route_map: vec![chain_1, chain_2], write_consistency: 1, read_consistency: 1, diff --git a/shotover-proxy/src/transforms/mod.rs b/shotover-proxy/src/transforms/mod.rs index 0c211d326..8d9fa29c0 100644 --- a/shotover-proxy/src/transforms/mod.rs +++ b/shotover-proxy/src/transforms/mod.rs @@ -15,8 +15,8 @@ use crate::transforms::debug::force_parse::{DebugForceEncodeConfig, DebugForcePa use crate::transforms::debug::printer::DebugPrinter; use crate::transforms::debug::random_delay::DebugRandomDelay; use crate::transforms::debug::returner::{DebugReturner, DebugReturnerConfig}; -use crate::transforms::distributed::consistent_scatter::{ - ConsistentScatter, ConsistentScatterConfig, +use crate::transforms::distributed::tuneable_consistency_scatter::{ + TuneableConsistencyScatterConfig, TuneableConsistentencyScatter, }; use crate::transforms::filter::{QueryTypeFilter, QueryTypeFilterConfig}; use crate::transforms::kafka::sink_single::KafkaSinkSingle; @@ -116,7 +116,7 @@ pub enum Transforms { NullSink(NullSink), Loopback(Loopback), Protect(Box), - ConsistentScatter(ConsistentScatter), + TuneableConsistencyScatter(TuneableConsistentencyScatter), RedisTimestampTagger(RedisTimestampTagger), RedisSinkCluster(RedisSinkCluster), RedisClusterPortsRewrite(RedisClusterPortsRewrite), @@ -154,7 +154,7 @@ impl Transforms { Transforms::Protect(p) => p.transform(message_wrapper).await, Transforms::DebugReturner(p) => p.transform(message_wrapper).await, Transforms::DebugRandomDelay(p) => p.transform(message_wrapper).await, - Transforms::ConsistentScatter(tc) => tc.transform(message_wrapper).await, + Transforms::TuneableConsistencyScatter(tc) => tc.transform(message_wrapper).await, Transforms::RedisSinkSingle(r) => r.transform(message_wrapper).await, Transforms::RedisTimestampTagger(r) => r.transform(message_wrapper).await, Transforms::RedisClusterPortsRewrite(r) => r.transform(message_wrapper).await, @@ -183,7 +183,9 @@ impl Transforms { Transforms::Protect(p) => p.transform_pushed(message_wrapper).await, Transforms::DebugReturner(p) => p.transform_pushed(message_wrapper).await, Transforms::DebugRandomDelay(p) => p.transform_pushed(message_wrapper).await, - Transforms::ConsistentScatter(tc) => tc.transform_pushed(message_wrapper).await, + Transforms::TuneableConsistencyScatter(tc) => { + tc.transform_pushed(message_wrapper).await + } Transforms::RedisSinkSingle(r) => r.transform_pushed(message_wrapper).await, Transforms::RedisTimestampTagger(r) => r.transform_pushed(message_wrapper).await, Transforms::RedisClusterPortsRewrite(r) => r.transform_pushed(message_wrapper).await, @@ -210,7 +212,9 @@ impl Transforms { Transforms::RedisCache(r) => r.set_pushed_messages_tx(pushed_messages_tx), Transforms::Tee(t) => t.set_pushed_messages_tx(pushed_messages_tx), Transforms::RedisSinkSingle(r) => r.set_pushed_messages_tx(pushed_messages_tx), - Transforms::ConsistentScatter(c) => c.set_pushed_messages_tx(pushed_messages_tx), + Transforms::TuneableConsistencyScatter(c) => { + c.set_pushed_messages_tx(pushed_messages_tx) + } Transforms::RedisTimestampTagger(r) => r.set_pushed_messages_tx(pushed_messages_tx), Transforms::RedisClusterPortsRewrite(r) => r.set_pushed_messages_tx(pushed_messages_tx), Transforms::DebugPrinter(p) => p.set_pushed_messages_tx(pushed_messages_tx), @@ -243,7 +247,7 @@ pub enum TransformsConfig { CassandraPeersRewrite(CassandraPeersRewriteConfig), RedisCache(RedisConfig), Tee(TeeConfig), - ConsistentScatter(ConsistentScatterConfig), + TuneableConsistencyScatter(TuneableConsistencyScatterConfig), RedisSinkCluster(RedisSinkClusterConfig), RedisClusterPortsRewrite(RedisClusterPortsRewriteConfig), RedisTimestampTagger, @@ -278,7 +282,7 @@ impl TransformsConfig { TransformsConfig::RedisCache(r) => r.get_builder().await, TransformsConfig::Tee(t) => t.get_builder().await, TransformsConfig::RedisSinkSingle(r) => r.get_builder(chain_name).await, - TransformsConfig::ConsistentScatter(c) => c.get_builder().await, + TransformsConfig::TuneableConsistencyScatter(c) => c.get_builder().await, TransformsConfig::RedisTimestampTagger => { Ok(Box::new(RedisTimestampTagger::new()) as Box) } diff --git a/shotover-proxy/src/transforms/redis/timestamp_tagging.rs b/shotover-proxy/src/transforms/redis/timestamp_tagging.rs index 09029072b..c52031521 100644 --- a/shotover-proxy/src/transforms/redis/timestamp_tagging.rs +++ b/shotover-proxy/src/transforms/redis/timestamp_tagging.rs @@ -130,7 +130,7 @@ fn unwrap_response(message: &mut Message) -> Result<()> { } } - todo!("ConsistentScatter isnt built to handle multiple timestamps yet: {timestamps:?} {results:?}",) + todo!("TuneableConsistencyScatter isnt built to handle multiple timestamps yet: {timestamps:?} {results:?}",) } else if values.len() == 2 { let timestamp = values.pop().unwrap(); let frame = values.pop().unwrap(); diff --git a/shotover-proxy/tests/runner/runner_int_tests.rs b/shotover-proxy/tests/runner/runner_int_tests.rs index 1eb7b4dff..ba6f59a84 100644 --- a/shotover-proxy/tests/runner/runner_int_tests.rs +++ b/shotover-proxy/tests/runner/runner_int_tests.rs @@ -99,19 +99,19 @@ a_first_chain: Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain. Non-terminating transform "DebugPrinter" is last in chain. Last transform must be terminating. b_second_chain: - ConsistentScatter: + TuneableConsistencyScatter: a_chain_1: Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain. Non-terminating transform "DebugPrinter" is last in chain. Last transform must be terminating. b_chain_2: Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain. c_chain_3: - ConsistentScatter: + TuneableConsistencyScatter: sub_chain_2: Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain. "#), EventMatcher::new().with_level(Level::Warn) - .with_target("shotover_proxy::transforms::distributed::consistent_scatter") + .with_target("shotover_proxy::transforms::distributed::tuneable_consistency_scatter") .with_message("Using this transform is considered unstable - Does not work with REDIS pipelines") .with_count(Count::Times(2)), // TODO: Investigate these @@ -119,7 +119,7 @@ b_second_chain: .with_message("failed response Couldn't send message to wrapped chain SendError(BufferedChainMessages { local_addr: 127.0.0.1:10000, messages: [], flush: true, return_chan: Some(Sender { inner: Some(Inner { state: State { is_complete: false, is_closed: false, is_rx_task_set: false, is_tx_task_set: false } }) }) })") .with_count(Count::Any), EventMatcher::new().with_level(Level::Error) - .with_target("shotover_proxy::transforms::distributed::consistent_scatter") + .with_target("shotover_proxy::transforms::distributed::tuneable_consistency_scatter") .with_message("failed response channel closed") .with_count(Count::Any), ], diff --git a/shotover-proxy/tests/test-configs/invalid_subchains.yaml b/shotover-proxy/tests/test-configs/invalid_subchains.yaml index eac7267cf..61fb296f4 100644 --- a/shotover-proxy/tests/test-configs/invalid_subchains.yaml +++ b/shotover-proxy/tests/test-configs/invalid_subchains.yaml @@ -10,7 +10,7 @@ chain_config: - DebugPrinter b_second_chain: - DebugPrinter - - ConsistentScatter: + - TuneableConsistencyScatter: read_consistency: 1 write_consistency: 1 route_map: @@ -21,7 +21,7 @@ chain_config: - NullSink - NullSink c_chain_3: - - ConsistentScatter: + - TuneableConsistencyScatter: read_consistency: 1 write_consistency: 1 route_map: