Skip to content

Commit

Permalink
rename ConsistentScatter -> TuneableConsistencyScatter (#1079)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Mar 14, 2023
1 parent 7df6e52 commit 682af0b
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 80 deletions.
48 changes: 24 additions & 24 deletions docs/src/transforms.md
Expand Up @@ -24,27 +24,27 @@ Future transforms won't be added to the public API while in alpha. But in these

## Transforms

| Transform | Terminating | Implementation Status |
|-------------------------------------------------------|-------------|-----------------------|
| [CassandraSinkCluster](#cassandrasinkcluster) || Beta |
| [CassandraSinkSingle](#cassandrasinksingle) || Alpha |
| [CassandraPeersRewrite](#cassandrapeersrewrite) || Alpha |
| [Coalesce](#coalesce) || Alpha |
| [ConsistentScatter](#consistentscatter) || Alpha |
| [DebugPrinter](#debugprinter) || Alpha |
| [DebugReturner](#debugreturner) || Alpha |
| [NullSink](#NullSink) || Beta |
| [ParallelMap](#parallelmap) || Alpha |
| [Protect](#protect) || Alpha |
| [QueryCounter](#querycounter) || Alpha |
| [QueryTypeFilter](#querytypefilter) || Alpha |
| [RedisCache](#rediscache) || Alpha |
| [RedisClusterPortsRewrite](#redisclusterportsrewrite) || Beta |
| [RedisSinkCluster](#redissinkcluster) || Beta |
| [RedisSinkSingle](#redissinksingle) || Beta |
| [RedisTimestampTagger](#redistimestamptagger) || Alpha |
| [Tee](#tee) || Alpha |
| [RequestThrottling](#requestthrottling) || Alpha |
| Transform | Terminating | Implementation Status |
|----------------------------------------------------------|-------------|-----------------------|
| [CassandraSinkCluster](#cassandrasinkcluster) || Beta |
| [CassandraSinkSingle](#cassandrasinksingle) || Alpha |
| [CassandraPeersRewrite](#cassandrapeersrewrite) || Alpha |
| [Coalesce](#coalesce) || Alpha |
| [TuneableConsistencyScatter](#tuneableconsistencyscatter)|| Alpha |
| [DebugPrinter](#debugprinter) || Alpha |
| [DebugReturner](#debugreturner) || Alpha |
| [NullSink](#nullsink) || Beta |
| [ParallelMap](#parallelmap) || Alpha |
| [Protect](#protect) || Alpha |
| [QueryCounter](#querycounter) || Alpha |
| [QueryTypeFilter](#querytypefilter) || Alpha |
| [RedisCache](#rediscache) || Alpha |
| [RedisClusterPortsRewrite](#redisclusterportsrewrite) || Beta |
| [RedisSinkCluster](#redissinkcluster) || Beta |
| [RedisSinkSingle](#redissinksingle) || Beta |
| [RedisTimestampTagger](#redistimestamptagger) || Alpha |
| [Tee](#tee) || Alpha |
| [RequestThrottling](#requestthrottling) || Alpha |
<!--| [DebugRandomDelay](#debugrandomdelay) | ❌ | Alpha |-->

### CassandraSinkCluster
Expand Down Expand Up @@ -204,7 +204,7 @@ Validation will fail if none of the `flush_when_` fields are provided, as this w
flush_when_millis_since_last_flush: 10000
```

### ConsistentScatter
### TuneableConsistencyScatter

This transform implements a distributed eventual consistency mechanism between the set of defined sub-chains. This transform will wait for a user configurable number of chains to return an OK response before returning the value up-chain. This follows a similar model as used by Cassandra for its consistency model. Strong consistency can be achieved when W + R > RF. In this case RF is always the number of chains in the `route_map`.

Expand All @@ -213,7 +213,7 @@ No sharding occurs within this transform and all requests/messages are sent to a
Upon receiving the configured number of responses, the transform will attempt to resolve or unify the response based on metadata about the result. Currently it will try to return the newest response based on a metadata timestamp (last write wins) or it will simply return the largest response if no timestamp information is available.

```yaml
- ConsistentScatter:
- TuneableConsistencyScatter:
# The number of chains to wait for a "write" response on.
write_consistency: 2
# The number of chains to wait for a "read" response on.
Expand Down Expand Up @@ -503,7 +503,7 @@ This transfrom emits a metrics [counter](user-guide/observability.md#counter) na

A transform that wraps each Redis command in a Lua script that also fetches the key for the operations idletime. This is then used to build a last modified timestamp and insert it into a response's timestamp. The response from the Lua operation is unwrapped and returned to up-chain transforms looking like a normal Redis response.

This is mainly used in conjunction with the `ConsistentScatter` transform to enable a Cassandra style consistency model within Redis.
This is mainly used in conjunction with the `TuneableConsistencyScatter` transform to enable a Cassandra style consistency model within Redis.

```yaml
- RedisTimestampTagger
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/example-configs/redis-multi/topology.yaml
Expand Up @@ -5,7 +5,7 @@ sources:
listen_addr: "127.0.0.1:6379"
chain_config:
redis_chain:
- ConsistentScatter:
- TuneableConsistencyScatter:
write_consistency: 2
read_consistency: 2
route_map:
Expand Down
28 changes: 14 additions & 14 deletions shotover-proxy/src/config/topology.rs
Expand Up @@ -100,7 +100,7 @@ mod topology_tests {
use crate::{
sources::{redis::RedisConfig, Sources, SourcesConfig},
transforms::{
distributed::consistent_scatter::ConsistentScatterConfig,
distributed::tuneable_consistency_scatter::TuneableConsistencyScatterConfig,
parallel_map::ParallelMapConfig, redis::cache::RedisConfig as RedisCacheConfig,
TransformsConfig,
},
Expand Down Expand Up @@ -242,7 +242,7 @@ redis_chain:
}

#[tokio::test]
async fn test_validate_chain_valid_subchain_consistent_scatter() {
async fn test_validate_chain_valid_subchain_scatter() {
let subchain = vec![
TransformsConfig::DebugPrinter,
TransformsConfig::DebugPrinter,
Expand All @@ -255,7 +255,7 @@ redis_chain:
run_test_topology(vec![
TransformsConfig::DebugPrinter,
TransformsConfig::DebugPrinter,
TransformsConfig::ConsistentScatter(ConsistentScatterConfig {
TransformsConfig::TuneableConsistencyScatter(TuneableConsistencyScatterConfig {
route_map,
write_consistency: 1,
read_consistency: 1,
Expand All @@ -266,10 +266,10 @@ redis_chain:
}

#[tokio::test]
async fn test_validate_chain_invalid_subchain_consistent_scatter() {
async fn test_validate_chain_invalid_subchain_scatter() {
let expected = r#"Topology errors
redis_chain:
ConsistentScatter:
TuneableConsistencyScatter:
subchain-1:
Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain.
"#;
Expand All @@ -287,7 +287,7 @@ redis_chain:
let error = run_test_topology(vec![
TransformsConfig::DebugPrinter,
TransformsConfig::DebugPrinter,
TransformsConfig::ConsistentScatter(ConsistentScatterConfig {
TransformsConfig::TuneableConsistencyScatter(TuneableConsistencyScatterConfig {
route_map,
write_consistency: 1,
read_consistency: 1,
Expand Down Expand Up @@ -412,7 +412,7 @@ redis_chain:
async fn test_validate_chain_subchain_terminating_in_middle() {
let expected = r#"Topology errors
redis_chain:
ConsistentScatter:
TuneableConsistencyScatter:
subchain-1:
Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain.
"#;
Expand All @@ -430,7 +430,7 @@ redis_chain:
let error = run_test_topology(vec![
TransformsConfig::DebugPrinter,
TransformsConfig::DebugPrinter,
TransformsConfig::ConsistentScatter(ConsistentScatterConfig {
TransformsConfig::TuneableConsistencyScatter(TuneableConsistencyScatterConfig {
route_map,
write_consistency: 1,
read_consistency: 1,
Expand All @@ -447,7 +447,7 @@ redis_chain:
async fn test_validate_chain_subchain_non_terminating_at_end() {
let expected = r#"Topology errors
redis_chain:
ConsistentScatter:
TuneableConsistencyScatter:
subchain-1:
Non-terminating transform "DebugPrinter" is last in chain. Last transform must be terminating.
"#;
Expand All @@ -463,7 +463,7 @@ redis_chain:
let error = run_test_topology(vec![
TransformsConfig::DebugPrinter,
TransformsConfig::DebugPrinter,
TransformsConfig::ConsistentScatter(ConsistentScatterConfig {
TransformsConfig::TuneableConsistencyScatter(TuneableConsistencyScatterConfig {
route_map,
write_consistency: 1,
read_consistency: 1,
Expand All @@ -480,7 +480,7 @@ redis_chain:
async fn test_validate_chain_subchain_terminating_middle_non_terminating_at_end() {
let expected = r#"Topology errors
redis_chain:
ConsistentScatter:
TuneableConsistencyScatter:
subchain-1:
Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain.
Non-terminating transform "DebugPrinter" is last in chain. Last transform must be terminating.
Expand All @@ -498,7 +498,7 @@ redis_chain:
let error = run_test_topology(vec![
TransformsConfig::DebugPrinter,
TransformsConfig::DebugPrinter,
TransformsConfig::ConsistentScatter(ConsistentScatterConfig {
TransformsConfig::TuneableConsistencyScatter(TuneableConsistencyScatterConfig {
route_map,
write_consistency: 1,
read_consistency: 1,
Expand Down Expand Up @@ -528,14 +528,14 @@ a_first_chain:
Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain.
Non-terminating transform "DebugPrinter" is last in chain. Last transform must be terminating.
b_second_chain:
ConsistentScatter:
TuneableConsistencyScatter:
a_chain_1:
Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain.
Non-terminating transform "DebugPrinter" is last in chain. Last transform must be terminating.
b_chain_2:
Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain.
c_chain_3:
ConsistentScatter:
TuneableConsistencyScatter:
sub_chain_2:
Terminating transform "NullSink" is not last in chain. Terminating transform must be last in chain.
"#;
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/message/mod.rs
Expand Up @@ -64,7 +64,7 @@ pub struct Message {
/// The only reason it is an Option is to allow temporarily taking ownership of the value from an &mut T
inner: Option<MessageInner>,

// TODO: Not a fan of this field and we could get rid of it by making TimestampTagger an implicit part of ConsistentScatter
// TODO: Not a fan of this field and we could get rid of it by making TimestampTagger an implicit part of TuneableConsistencyScatter
// This metadata field is only used for communication between transforms and should not be touched by sinks or sources
pub meta_timestamp: Option<i64>,

Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/transforms/distributed/mod.rs
@@ -1 +1 @@
pub mod consistent_scatter;
pub mod tuneable_consistency_scatter;
Expand Up @@ -13,13 +13,13 @@ use std::collections::HashMap;
use tracing::{debug, error, trace, warn};

#[derive(Deserialize, Debug)]
pub struct ConsistentScatterConfig {
pub struct TuneableConsistencyScatterConfig {
pub route_map: HashMap<String, Vec<TransformsConfig>>,
pub write_consistency: i32,
pub read_consistency: i32,
}

impl ConsistentScatterConfig {
impl TuneableConsistencyScatterConfig {
pub async fn get_builder(&self) -> Result<Box<dyn TransformBuilder>> {
let mut route_map = Vec::with_capacity(self.route_map.len());
warn!("Using this transform is considered unstable - Does not work with REDIS pipelines");
Expand All @@ -29,7 +29,7 @@ impl ConsistentScatterConfig {
}
route_map.sort_by_key(|x| x.name.clone());

Ok(Box::new(ConsistentScatterBuilder {
Ok(Box::new(TuneableConsistencyScatterBuilder {
route_map,
write_consistency: self.write_consistency,
read_consistency: self.read_consistency,
Expand All @@ -38,15 +38,15 @@ impl ConsistentScatterConfig {
}

#[derive(Clone)]
pub struct ConsistentScatterBuilder {
pub struct TuneableConsistencyScatterBuilder {
route_map: Vec<TransformChainBuilder>,
write_consistency: i32,
read_consistency: i32,
}

impl TransformBuilder for ConsistentScatterBuilder {
impl TransformBuilder for TuneableConsistencyScatterBuilder {
fn build(&self) -> Transforms {
Transforms::ConsistentScatter(ConsistentScatter {
Transforms::TuneableConsistencyScatter(TuneableConsistentencyScatter {
route_map: self
.route_map
.iter()
Expand All @@ -58,7 +58,7 @@ impl TransformBuilder for ConsistentScatterBuilder {
}

fn get_name(&self) -> &'static str {
"ConsistentScatter"
"TuneableConsistencyScatter"
}

fn is_terminating(&self) -> bool {
Expand All @@ -85,7 +85,7 @@ impl TransformBuilder for ConsistentScatterBuilder {
}

#[derive(Clone)]
pub struct ConsistentScatter {
pub struct TuneableConsistentencyScatter {
route_map: Vec<BufferedChain>,
write_consistency: i32,
read_consistency: i32,
Expand Down Expand Up @@ -140,7 +140,7 @@ fn resolve_fragments(fragments: &mut Vec<Message>) -> Option<Message> {
}

#[async_trait]
impl Transform for ConsistentScatter {
impl Transform for TuneableConsistentencyScatter {
async fn transform<'a>(&'a mut self, mut message_wrapper: Wrapper<'a>) -> ChainResponse {
let required_successes: Vec<_> = message_wrapper
.messages
Expand Down Expand Up @@ -216,8 +216,8 @@ mod scatter_transform_tests {
use crate::transforms::chain::{BufferedChain, TransformChainBuilder};
use crate::transforms::debug::printer::DebugPrinter;
use crate::transforms::debug::returner::{DebugReturner, Response};
use crate::transforms::distributed::consistent_scatter::{
ConsistentScatter, ConsistentScatterBuilder,
use crate::transforms::distributed::tuneable_consistency_scatter::{
TuneableConsistencyScatterBuilder, TuneableConsistentencyScatter,
};
use crate::transforms::null::NullSink;
use crate::transforms::{TransformBuilder, Transforms, Wrapper};
Expand Down Expand Up @@ -270,11 +270,12 @@ mod scatter_transform_tests {
TransformChainBuilder::new(vec![err_repeat.clone()], "three".to_string()),
);

let mut tuneable_success_consistency = Transforms::ConsistentScatter(ConsistentScatter {
route_map: build_chains(two_of_three).await,
write_consistency: 2,
read_consistency: 2,
});
let mut tuneable_success_consistency =
Transforms::TuneableConsistencyScatter(TuneableConsistentencyScatter {
route_map: build_chains(two_of_three).await,
write_consistency: 2,
read_consistency: 2,
});

let test = tuneable_success_consistency
.transform(wrapper.clone())
Expand All @@ -297,11 +298,12 @@ mod scatter_transform_tests {
TransformChainBuilder::new(vec![err_repeat.clone()], "three".to_string()),
);

let mut tuneable_fail_consistency = Transforms::ConsistentScatter(ConsistentScatter {
route_map: build_chains(one_of_three).await,
write_consistency: 2,
read_consistency: 2,
});
let mut tuneable_fail_consistency =
Transforms::TuneableConsistencyScatter(TuneableConsistentencyScatter {
route_map: build_chains(one_of_three).await,
write_consistency: 2,
read_consistency: 2,
});

let response_fail = tuneable_fail_consistency
.transform(wrapper.clone())
Expand All @@ -323,7 +325,7 @@ mod scatter_transform_tests {
);
let chain_2 = TransformChainBuilder::new(vec![], "test-chain-2".to_string());

let transform = ConsistentScatterBuilder {
let transform = TuneableConsistencyScatterBuilder {
route_map: vec![chain_1, chain_2],
write_consistency: 1,
read_consistency: 1,
Expand All @@ -332,7 +334,7 @@ mod scatter_transform_tests {
assert_eq!(
transform.validate(),
vec![
"ConsistentScatter:",
"TuneableConsistencyScatter:",
" test-chain-2:",
" Chain cannot be empty"
]
Expand All @@ -358,7 +360,7 @@ mod scatter_transform_tests {
"test-chain-2".to_string(),
);

let transform = ConsistentScatterBuilder {
let transform = TuneableConsistencyScatterBuilder {
route_map: vec![chain_1, chain_2],
write_consistency: 1,
read_consistency: 1,
Expand Down

0 comments on commit 682af0b

Please sign in to comment.