From 0d03cecc63832b3045357783849b9504425d03e4 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Wed, 15 Mar 2023 12:00:15 +1100 Subject: [PATCH] Fix redis_int_tests::multi intermittent failure (#1080) --- .../tuneable_consistency_scatter.rs | 88 ++++++++++++++----- 1 file changed, 68 insertions(+), 20 deletions(-) diff --git a/shotover-proxy/src/transforms/distributed/tuneable_consistency_scatter.rs b/shotover-proxy/src/transforms/distributed/tuneable_consistency_scatter.rs index f7557458a..5c40a1c05 100644 --- a/shotover-proxy/src/transforms/distributed/tuneable_consistency_scatter.rs +++ b/shotover-proxy/src/transforms/distributed/tuneable_consistency_scatter.rs @@ -1,4 +1,5 @@ use crate::error::ChainResponse; +use crate::frame::{Frame, RedisFrame}; use crate::message::{Message, QueryType}; use crate::transforms::chain::{BufferedChain, TransformChainBuilder}; use crate::transforms::{ @@ -10,7 +11,7 @@ use futures::stream::FuturesUnordered; use futures::StreamExt; use serde::Deserialize; use std::collections::HashMap; -use tracing::{debug, error, trace, warn}; +use tracing::{error, trace, warn}; #[derive(Deserialize, Debug)] pub struct TuneableConsistencyScatterConfig { @@ -139,26 +140,75 @@ fn resolve_fragments(fragments: &mut Vec) -> Option { } } +fn resolve_fragments_max_integer(fragments: Vec) -> Option { + fragments + .into_iter() + .map(|mut m| { + let integer = if let Some(Frame::Redis(RedisFrame::Integer(i))) = m.frame() { + *i + } else { + // Not an expected value so prefer all other values over it + -1 + }; + (integer, m) + }) + .max_by_key(|(key, _)| *key) + .map(|(_, m)| m) +} + +fn get_upper_command_name(message: &mut Message) -> Vec { + if let Some(Frame::Redis(RedisFrame::Array(frames))) = message.frame() { + if let Some(RedisFrame::BulkString(bytes)) = frames.get(0) { + return bytes.to_ascii_uppercase(); + } + } + vec![] +} + +struct Consistency { + pub consistency: i32, + pub resolver: Resolver, +} + +enum Resolver { + MaxInteger, + Standard, +} + #[async_trait] impl Transform for TuneableConsistentencyScatter { async fn transform<'a>(&'a mut self, mut message_wrapper: Wrapper<'a>) -> ChainResponse { - let required_successes: Vec<_> = message_wrapper + let consistency: Vec<_> = message_wrapper .messages .iter_mut() - .map(|m| { - if m.get_query_type() == QueryType::Read { - self.read_consistency - } else { - self.write_consistency + .map(|m| match get_upper_command_name(m).as_slice() { + b"DBSIZE" => Consistency { + consistency: self.route_map.len() as i32, + resolver: Resolver::MaxInteger, + }, + b"FLUSHDB" => Consistency { + consistency: self.route_map.len() as i32, + resolver: Resolver::Standard, + }, + _ => { + let consistency = if m.get_query_type() == QueryType::Read { + self.read_consistency + } else { + self.write_consistency + }; + Consistency { + consistency, + resolver: Resolver::Standard, + } } }) .collect(); - let max_required_successes = *required_successes + let max_required_successes = consistency .iter() + .map(|x| x.consistency) .max() - .unwrap_or(&self.write_consistency); + .unwrap_or(self.write_consistency); - // Bias towards the write_consistency value for everything else let mut rec_fu = FuturesUnordered::new(); //TODO: FuturesUnordered does bias to polling the first submitted task - this will bias all requests @@ -169,13 +219,8 @@ impl Transform for TuneableConsistentencyScatter { let mut results = Vec::new(); while let Some(res) = rec_fu.next().await { match res { - Ok(messages) => { - debug!("{:#?}", messages); - results.push(messages); - } - Err(e) => { - error!("failed response {}", e); - } + Ok(messages) => results.push(messages), + Err(e) => error!("failed response {}", e), } if results.len() >= max_required_successes as usize { break; @@ -191,16 +236,19 @@ impl Transform for TuneableConsistentencyScatter { } messages } else { - required_successes + consistency .into_iter() - .filter_map(|_required_successes| { + .filter_map(|consistency| { let mut collated_results = vec![]; for res in &mut results { if let Some(m) = res.pop() { collated_results.push(m); } } - resolve_fragments(&mut collated_results) + match consistency.resolver { + Resolver::MaxInteger => resolve_fragments_max_integer(collated_results), + Resolver::Standard => resolve_fragments(&mut collated_results), + } }) // We do this as we are pop'ing from the end of the results in the filter_map above .rev()