Skip to content

Commit

Permalink
TuneableConsistencyScatter: filter out errors
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Mar 16, 2023
1 parent 0d03cec commit 06f1caa
Showing 1 changed file with 35 additions and 16 deletions.
Expand Up @@ -11,7 +11,7 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use serde::Deserialize;
use std::collections::HashMap;
use tracing::{error, trace, warn};
use tracing::{error, warn};

#[derive(Deserialize, Debug)]
pub struct TuneableConsistencyScatterConfig {
Expand Down Expand Up @@ -96,7 +96,7 @@ fn get_size(_message: &Message) -> usize {
4 // TODO: Implement. Old impl was just removed because it was broken anyway
}

fn resolve_fragments(fragments: &mut Vec<Message>) -> Option<Message> {
fn resolve_fragments(fragments: &mut Vec<Message>) -> Message {
let mut newest_fragment: Option<Message> = None;
let mut biggest_fragment: Option<Message> = None;

Expand Down Expand Up @@ -131,16 +131,11 @@ fn resolve_fragments(fragments: &mut Vec<Message>) -> Option<Message> {
}
}
}
trace!("fragments {:?}-{:?}", newest_fragment, biggest_fragment);
if newest_fragment.is_some() {
newest_fragment
} else {
// panic!("shouldn't happen");
biggest_fragment
}

newest_fragment.or(biggest_fragment).unwrap()
}

fn resolve_fragments_max_integer(fragments: Vec<Message>) -> Option<Message> {
fn resolve_fragments_max_integer(fragments: Vec<Message>) -> Message {
fragments
.into_iter()
.map(|mut m| {
Expand All @@ -154,6 +149,7 @@ fn resolve_fragments_max_integer(fragments: Vec<Message>) -> Option<Message> {
})
.max_by_key(|(key, _)| *key)
.map(|(_, m)| m)
.unwrap()
}

fn get_upper_command_name(message: &mut Message) -> Vec<u8> {
Expand All @@ -165,6 +161,15 @@ fn get_upper_command_name(message: &mut Message) -> Vec<u8> {
vec![]
}

fn is_error(message: &mut Message) -> bool {
match message.frame() {
Some(Frame::Redis(RedisFrame::Error(_))) => true,
// Also consider a failure to parse the message as an error message
None => true,
_ => false,
}
}

struct Consistency {
pub consistency: i32,
pub resolver: Resolver,
Expand Down Expand Up @@ -238,16 +243,30 @@ impl Transform for TuneableConsistentencyScatter {
} else {
consistency
.into_iter()
.filter_map(|consistency| {
.map(|consistency| {
// filter out errors, storing a single error in case we need it later
let mut collated_results = vec![];
let mut an_error = None;
for res in &mut results {
if let Some(m) = res.pop() {
collated_results.push(m);
if let Some(mut m) = res.pop() {
if is_error(&mut m) {
if an_error.is_none() {
an_error = Some(m);
}
} else {
collated_results.push(m);
}
}
}
match consistency.resolver {
Resolver::MaxInteger => resolve_fragments_max_integer(collated_results),
Resolver::Standard => resolve_fragments(&mut collated_results),

// If every message got filtered out then return an error, otherwise apply a resolver to figure out which message to keep
if collated_results.is_empty() {
an_error.unwrap()
} else {
match consistency.resolver {
Resolver::MaxInteger => resolve_fragments_max_integer(collated_results),
Resolver::Standard => resolve_fragments(&mut collated_results),
}
}
})
// We do this as we are pop'ing from the end of the results in the filter_map above
Expand Down

0 comments on commit 06f1caa

Please sign in to comment.