Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid storing ConsistencyBehaviour::SubchainOnMismatch config in the final tee transform #1061

Merged
merged 1 commit into from Mar 1, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
78 changes: 46 additions & 32 deletions shotover-proxy/src/transforms/tee.rs
Expand Up @@ -12,26 +12,30 @@ use tracing::trace;
#[derive(Clone)]
pub struct TeeBuilder {
pub tx: TransformChainBuilder,
pub mismatch_chain: Option<TransformChainBuilder>,
pub buffer_size: usize,
pub behavior: ConsistencyBehavior,
pub behavior: ConsistencyBehaviorBuilder,
pub timeout_micros: Option<u64>,
dropped_messages: Counter,
}

#[derive(Clone)]
pub enum ConsistencyBehaviorBuilder {
Ignore,
FailOnMismatch,
SubchainOnMismatch(TransformChainBuilder),
}

impl TeeBuilder {
pub fn new(
tx: TransformChainBuilder,
mismatch_chain: Option<TransformChainBuilder>,
buffer_size: usize,
behavior: ConsistencyBehavior,
behavior: ConsistencyBehaviorBuilder,
timeout_micros: Option<u64>,
) -> Self {
let dropped_messages = register_counter!("tee_dropped_messages", "chain" => "Tee");

TeeBuilder {
tx,
mismatch_chain,
buffer_size,
behavior,
timeout_micros,
Expand All @@ -44,12 +48,14 @@ impl TransformBuilder for TeeBuilder {
fn build(&self) -> Transforms {
Transforms::Tee(Tee {
tx: self.tx.build_buffered(self.buffer_size),
mismatch_chain: self
.mismatch_chain
.as_ref()
.map(|x| x.build_buffered(self.buffer_size)),
behavior: match &self.behavior {
ConsistencyBehaviorBuilder::Ignore => ConsistencyBehavior::Ignore,
ConsistencyBehaviorBuilder::FailOnMismatch => ConsistencyBehavior::FailOnMismatch,
ConsistencyBehaviorBuilder::SubchainOnMismatch(chain) => {
ConsistencyBehavior::SubchainOnMismatch(chain.build_buffered(self.buffer_size))
}
},
buffer_size: self.buffer_size,
behavior: self.behavior.clone(),
timeout_micros: self.timeout_micros,
dropped_messages: self.dropped_messages.clone(),
})
Expand All @@ -60,7 +66,7 @@ impl TransformBuilder for TeeBuilder {
}

fn validate(&self) -> Vec<String> {
if let Some(mismatch_chain) = &self.mismatch_chain {
if let ConsistencyBehaviorBuilder::SubchainOnMismatch(mismatch_chain) = &self.behavior {
let mut errors = mismatch_chain
.validate()
.iter()
Expand All @@ -80,44 +86,54 @@ impl TransformBuilder for TeeBuilder {

pub struct Tee {
pub tx: BufferedChain,
pub mismatch_chain: Option<BufferedChain>,
pub buffer_size: usize,
pub behavior: ConsistencyBehavior,
pub timeout_micros: Option<u64>,
dropped_messages: Counter,
}

#[derive(Deserialize, Debug, Clone)]
pub enum ConsistencyBehavior {
Ignore,
FailOnMismatch,
SubchainOnMismatch(Vec<TransformsConfig>),
SubchainOnMismatch(BufferedChain),
}

#[derive(Deserialize, Debug, Clone)]
pub struct TeeConfig {
pub behavior: Option<ConsistencyBehavior>,
pub behavior: Option<ConsistencyBehaviorConfig>,
pub timeout_micros: Option<u64>,
pub chain: Vec<TransformsConfig>,
pub buffer_size: Option<usize>,
}

#[derive(Deserialize, Debug, Clone)]
pub enum ConsistencyBehaviorConfig {
Ignore,
FailOnMismatch,
SubchainOnMismatch(Vec<TransformsConfig>),
}

impl TeeConfig {
pub async fn get_builder(&self) -> Result<Box<dyn TransformBuilder>> {
let buffer_size = self.buffer_size.unwrap_or(5);
let mismatch_chain =
if let Some(ConsistencyBehavior::SubchainOnMismatch(mismatch_chain)) = &self.behavior {
Some(build_chain_from_config("mismatch_chain".to_string(), mismatch_chain).await?)
} else {
None
};
let behavior = match &self.behavior {
Some(ConsistencyBehaviorConfig::Ignore) => ConsistencyBehaviorBuilder::Ignore,
Some(ConsistencyBehaviorConfig::FailOnMismatch) => {
ConsistencyBehaviorBuilder::FailOnMismatch
}
Some(ConsistencyBehaviorConfig::SubchainOnMismatch(mismatch_chain)) => {
ConsistencyBehaviorBuilder::SubchainOnMismatch(
build_chain_from_config("mismatch_chain".to_string(), mismatch_chain).await?,
)
}
None => ConsistencyBehaviorBuilder::Ignore,
};
let tee_chain = build_chain_from_config("tee_chain".to_string(), &self.chain).await?;

Ok(Box::new(TeeBuilder::new(
tee_chain,
mismatch_chain,
buffer_size,
self.behavior.clone().unwrap_or(ConsistencyBehavior::Ignore),
behavior,
self.timeout_micros,
)))
}
Expand All @@ -126,7 +142,7 @@ impl TeeConfig {
#[async_trait]
impl Transform for Tee {
async fn transform<'a>(&'a mut self, message_wrapper: Wrapper<'a>) -> ChainResponse {
match self.behavior {
match &mut self.behavior {
ConsistencyBehavior::Ignore => {
let (tee_result, chain_result) = tokio::join!(
self.tx
Expand Down Expand Up @@ -156,7 +172,7 @@ impl Transform for Tee {
}
Ok(chain_response)
}
ConsistencyBehavior::SubchainOnMismatch(_) => {
ConsistencyBehavior::SubchainOnMismatch(mismatch_chain) => {
let failed_message = message_wrapper.clone();
let (tee_result, chain_result) = tokio::join!(
self.tx
Expand All @@ -168,9 +184,7 @@ impl Transform for Tee {
let chain_response = chain_result?;

if !chain_response.eq(&tee_response) {
if let Some(mismatch_chain) = &mut self.mismatch_chain {
mismatch_chain.process_request(failed_message, None).await?;
}
mismatch_chain.process_request(failed_message, None).await?;
}

Ok(chain_response)
Expand All @@ -188,7 +202,7 @@ mod tests {
async fn test_validate_no_subchain() {
{
let config = TeeConfig {
behavior: Some(ConsistencyBehavior::Ignore),
behavior: Some(ConsistencyBehaviorConfig::Ignore),
timeout_micros: None,
chain: vec![TransformsConfig::NullSink],
buffer_size: None,
Expand All @@ -200,7 +214,7 @@ mod tests {

{
let config = TeeConfig {
behavior: Some(ConsistencyBehavior::FailOnMismatch),
behavior: Some(ConsistencyBehaviorConfig::FailOnMismatch),
timeout_micros: None,
chain: vec![TransformsConfig::NullSink],
buffer_size: None,
Expand All @@ -214,7 +228,7 @@ mod tests {
#[tokio::test]
async fn test_validate_invalid_chain() {
let config = TeeConfig {
behavior: Some(ConsistencyBehavior::SubchainOnMismatch(vec![
behavior: Some(ConsistencyBehaviorConfig::SubchainOnMismatch(vec![
TransformsConfig::NullSink,
TransformsConfig::NullSink,
])),
Expand All @@ -232,7 +246,7 @@ mod tests {
#[tokio::test]
async fn test_validate_valid_chain() {
let config = TeeConfig {
behavior: Some(ConsistencyBehavior::SubchainOnMismatch(vec![
behavior: Some(ConsistencyBehaviorConfig::SubchainOnMismatch(vec![
TransformsConfig::NullSink,
])),
timeout_micros: None,
Expand Down