Skip to content

Commit

Permalink
Fix redis_int_tests::multi intermittent failure (#1080)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Mar 15, 2023
1 parent 682af0b commit 0d03cec
Showing 1 changed file with 68 additions and 20 deletions.
@@ -1,4 +1,5 @@
use crate::error::ChainResponse;
use crate::frame::{Frame, RedisFrame};
use crate::message::{Message, QueryType};
use crate::transforms::chain::{BufferedChain, TransformChainBuilder};
use crate::transforms::{
Expand All @@ -10,7 +11,7 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use serde::Deserialize;
use std::collections::HashMap;
use tracing::{debug, error, trace, warn};
use tracing::{error, trace, warn};

#[derive(Deserialize, Debug)]
pub struct TuneableConsistencyScatterConfig {
Expand Down Expand Up @@ -139,26 +140,75 @@ fn resolve_fragments(fragments: &mut Vec<Message>) -> Option<Message> {
}
}

fn resolve_fragments_max_integer(fragments: Vec<Message>) -> Option<Message> {
fragments
.into_iter()
.map(|mut m| {
let integer = if let Some(Frame::Redis(RedisFrame::Integer(i))) = m.frame() {
*i
} else {
// Not an expected value so prefer all other values over it
-1
};
(integer, m)
})
.max_by_key(|(key, _)| *key)
.map(|(_, m)| m)
}

fn get_upper_command_name(message: &mut Message) -> Vec<u8> {
if let Some(Frame::Redis(RedisFrame::Array(frames))) = message.frame() {
if let Some(RedisFrame::BulkString(bytes)) = frames.get(0) {
return bytes.to_ascii_uppercase();
}
}
vec![]
}

struct Consistency {
pub consistency: i32,
pub resolver: Resolver,
}

enum Resolver {
MaxInteger,
Standard,
}

#[async_trait]
impl Transform for TuneableConsistentencyScatter {
async fn transform<'a>(&'a mut self, mut message_wrapper: Wrapper<'a>) -> ChainResponse {
let required_successes: Vec<_> = message_wrapper
let consistency: Vec<_> = message_wrapper
.messages
.iter_mut()
.map(|m| {
if m.get_query_type() == QueryType::Read {
self.read_consistency
} else {
self.write_consistency
.map(|m| match get_upper_command_name(m).as_slice() {
b"DBSIZE" => Consistency {
consistency: self.route_map.len() as i32,
resolver: Resolver::MaxInteger,
},
b"FLUSHDB" => Consistency {
consistency: self.route_map.len() as i32,
resolver: Resolver::Standard,
},
_ => {
let consistency = if m.get_query_type() == QueryType::Read {
self.read_consistency
} else {
self.write_consistency
};
Consistency {
consistency,
resolver: Resolver::Standard,
}
}
})
.collect();
let max_required_successes = *required_successes
let max_required_successes = consistency
.iter()
.map(|x| x.consistency)
.max()
.unwrap_or(&self.write_consistency);
.unwrap_or(self.write_consistency);

// Bias towards the write_consistency value for everything else
let mut rec_fu = FuturesUnordered::new();

//TODO: FuturesUnordered does bias to polling the first submitted task - this will bias all requests
Expand All @@ -169,13 +219,8 @@ impl Transform for TuneableConsistentencyScatter {
let mut results = Vec::new();
while let Some(res) = rec_fu.next().await {
match res {
Ok(messages) => {
debug!("{:#?}", messages);
results.push(messages);
}
Err(e) => {
error!("failed response {}", e);
}
Ok(messages) => results.push(messages),
Err(e) => error!("failed response {}", e),
}
if results.len() >= max_required_successes as usize {
break;
Expand All @@ -191,16 +236,19 @@ impl Transform for TuneableConsistentencyScatter {
}
messages
} else {
required_successes
consistency
.into_iter()
.filter_map(|_required_successes| {
.filter_map(|consistency| {
let mut collated_results = vec![];
for res in &mut results {
if let Some(m) = res.pop() {
collated_results.push(m);
}
}
resolve_fragments(&mut collated_results)
match consistency.resolver {
Resolver::MaxInteger => resolve_fragments_max_integer(collated_results),
Resolver::Standard => resolve_fragments(&mut collated_results),
}
})
// We do this as we are pop'ing from the end of the results in the filter_map above
.rev()
Expand Down

0 comments on commit 0d03cec

Please sign in to comment.