Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TuneableConsistencyScatter: filter out errors #1086

Merged
merged 2 commits into from Mar 17, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -11,7 +11,7 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use serde::Deserialize;
use std::collections::HashMap;
use tracing::{error, trace, warn};
use tracing::{error, warn};

#[derive(Deserialize, Debug)]
pub struct TuneableConsistencyScatterConfig {
Expand Down Expand Up @@ -96,7 +96,7 @@ fn get_size(_message: &Message) -> usize {
4 // TODO: Implement. Old impl was just removed because it was broken anyway
}

fn resolve_fragments(fragments: &mut Vec<Message>) -> Option<Message> {
fn resolve_fragments(fragments: &mut Vec<Message>) -> Message {
let mut newest_fragment: Option<Message> = None;
let mut biggest_fragment: Option<Message> = None;

Expand Down Expand Up @@ -131,16 +131,11 @@ fn resolve_fragments(fragments: &mut Vec<Message>) -> Option<Message> {
}
}
}
trace!("fragments {:?}-{:?}", newest_fragment, biggest_fragment);
if newest_fragment.is_some() {
newest_fragment
} else {
// panic!("shouldn't happen");
biggest_fragment
}

newest_fragment.or(biggest_fragment).unwrap()
}

fn resolve_fragments_max_integer(fragments: Vec<Message>) -> Option<Message> {
fn resolve_fragments_max_integer(fragments: Vec<Message>) -> Message {
fragments
.into_iter()
.map(|mut m| {
Expand All @@ -154,6 +149,7 @@ fn resolve_fragments_max_integer(fragments: Vec<Message>) -> Option<Message> {
})
.max_by_key(|(key, _)| *key)
.map(|(_, m)| m)
.unwrap()
}

fn get_upper_command_name(message: &mut Message) -> Vec<u8> {
Expand All @@ -165,6 +161,15 @@ fn get_upper_command_name(message: &mut Message) -> Vec<u8> {
vec![]
}

fn is_error(message: &mut Message) -> bool {
match message.frame() {
Some(Frame::Redis(RedisFrame::Error(_))) => true,
// Also consider a failure to parse the message as an error message
None => true,
_ => false,
}
}

struct Consistency {
pub consistency: i32,
pub resolver: Resolver,
Expand Down Expand Up @@ -238,16 +243,30 @@ impl Transform for TuneableConsistentencyScatter {
} else {
consistency
.into_iter()
.filter_map(|consistency| {
.map(|consistency| {
// filter out errors, storing a single error in case we need it later
let mut collated_results = vec![];
let mut an_error = None;
for res in &mut results {
if let Some(m) = res.pop() {
collated_results.push(m);
if let Some(mut m) = res.pop() {
if is_error(&mut m) {
if an_error.is_none() {
an_error = Some(m);
}
} else {
collated_results.push(m);
}
}
}
match consistency.resolver {
Resolver::MaxInteger => resolve_fragments_max_integer(collated_results),
Resolver::Standard => resolve_fragments(&mut collated_results),

// If every message got filtered out then return an error, otherwise apply a resolver to figure out which message to keep
if collated_results.is_empty() {
an_error.unwrap()
} else {
match consistency.resolver {
Resolver::MaxInteger => resolve_fragments_max_integer(collated_results),
Resolver::Standard => resolve_fragments(&mut collated_results),
}
}
})
// We do this as we are pop'ing from the end of the results in the filter_map above
Expand Down