Skip to content

Commit

Permalink
fix(reduce transform): improve array handling (vectordotdev#3076)
Browse files Browse the repository at this point in the history
Signed-off-by: Luke Steensen <luke.steensen@gmail.com>
Signed-off-by: Brian Menges <brian.menges@anaplan.com>
  • Loading branch information
lukesteensen authored and Brian Menges committed Dec 9, 2020
1 parent cb59472 commit ced0bb8
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 14 deletions.
52 changes: 44 additions & 8 deletions src/transforms/reduce/merge_strategy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,45 @@ impl ReduceValueMerger for ConcatMerger {
//------------------------------------------------------------------------------

#[derive(Debug, Clone)]
struct ArrayMerger {
struct ConcatArrayMerger {
v: Vec<Value>,
}

impl ArrayMerger {
impl ConcatArrayMerger {
fn new(v: Vec<Value>) -> Self {
Self { v }
}
}

impl ReduceValueMerger for ConcatArrayMerger {
fn add(&mut self, v: Value) -> Result<(), String> {
if let Value::Array(a) = v {
self.v.extend_from_slice(&a);
} else {
self.v.push(v);
}
Ok(())
}

fn insert_into(self: Box<Self>, k: String, v: &mut LogEvent) -> Result<(), String> {
v.insert(k, Value::Array(self.v));
Ok(())
}
}

//------------------------------------------------------------------------------

#[derive(Debug, Clone)]
struct ArrayMerger {
v: Vec<Value>,
}

impl ArrayMerger {
fn new(v: Value) -> Self {
Self { v: vec![v] }
}
}

impl ReduceValueMerger for ArrayMerger {
fn add(&mut self, v: Value) -> Result<(), String> {
self.v.push(v);
Expand Down Expand Up @@ -370,15 +399,13 @@ pub fn get_value_merger(v: Value, m: &MergeStrategy) -> Result<Box<dyn ReduceVal
},
MergeStrategy::Concat => match v {
Value::Bytes(b) => Ok(Box::new(ConcatMerger::new(b))),
Value::Array(a) => Ok(Box::new(ConcatArrayMerger::new(a))),
_ => Err(format!(
"expected string value, found: '{}'",
"expected string or array value, found: '{}'",
v.to_string_lossy()
)),
},
MergeStrategy::Array => match v {
Value::Array(a) => Ok(Box::new(ArrayMerger::new(a))),
_ => Ok(Box::new(ArrayMerger::new(vec![v]))),
},
MergeStrategy::Array => Ok(Box::new(ArrayMerger::new(v))),
MergeStrategy::Discard => Ok(Box::new(DiscardMerger::new(v))),
}
}
Expand Down Expand Up @@ -432,7 +459,7 @@ mod test {
assert!(get_value_merger(json!([]).into(), &MergeStrategy::Max).is_err());
assert!(get_value_merger(json!([]).into(), &MergeStrategy::Min).is_err());
assert!(get_value_merger(json!([]).into(), &MergeStrategy::Array).is_ok());
assert!(get_value_merger(json!([]).into(), &MergeStrategy::Concat).is_err());
assert!(get_value_merger(json!([]).into(), &MergeStrategy::Concat).is_ok());

assert!(get_value_merger(json!({}).into(), &MergeStrategy::Discard).is_ok());
assert!(get_value_merger(json!({}).into(), &MergeStrategy::Sum).is_err());
Expand Down Expand Up @@ -512,6 +539,15 @@ mod test {
merge(4.3.into(), 4.2.into(), &MergeStrategy::Min),
Ok(4.2.into())
);

assert_eq!(
merge(json!([4]).into(), json!([2]).into(), &MergeStrategy::Concat),
Ok(json!([4, 2]).into())
);
assert_eq!(
merge(json!([]).into(), 42.into(), &MergeStrategy::Concat),
Ok(json!([42]).into())
);
}

fn merge(initial: Value, additional: Value, strategy: &MergeStrategy) -> Result<Value, String> {
Expand Down
94 changes: 88 additions & 6 deletions src/transforms/reduce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,33 +77,32 @@ impl ReduceState {
fn new(e: LogEvent, strategies: &IndexMap<String, MergeStrategy>) -> Self {
Self {
stale_since: Instant::now(),
// TODO: all_fields alternative that consumes
fields: e
.all_fields()
.into_iter()
.filter_map(|(k, v)| {
if let Some(strat) = strategies.get(&k) {
match get_value_merger(v.clone(), strat) {
match get_value_merger(v, strat) {
Ok(m) => Some((k, m)),
Err(err) => {
warn!("failed to create merger for field '{}': {}", k, err);
None
}
}
} else {
Some((k, v.clone().into()))
Some((k, v.into()))
}
})
.collect(),
}
}

fn add_event(&mut self, e: LogEvent, strategies: &IndexMap<String, MergeStrategy>) {
for (k, v) in e.all_fields() {
for (k, v) in e.into_iter() {
let strategy = strategies.get(&k);
match self.fields.entry(k) {
hash_map::Entry::Vacant(entry) => {
if let Some(strat) = strategy {
match get_value_merger(v.clone(), strat) {
match get_value_merger(v, strat) {
Ok(m) => {
entry.insert(m);
}
Expand Down Expand Up @@ -291,6 +290,7 @@ mod test {
topology::config::{TransformConfig, TransformContext},
Event,
};
use serde_json::json;

#[test]
fn reduce_from_condition() {
Expand Down Expand Up @@ -493,4 +493,86 @@ identifier_fields = [ "request_id" ]
Value::from(7)
);
}

#[test]
fn arrays() {
let mut reduce = toml::from_str::<ReduceConfig>(
r#"
identifier_fields = [ "request_id" ]
merge_strategies.foo = "array"
merge_strategies.bar = "concat"
[ends_when]
"test_end.exists" = true
"#,
)
.unwrap()
.build(TransformContext::new_test())
.unwrap();

let mut outputs = Vec::new();

let mut e = Event::from("test message 1");
e.as_mut_log().insert("foo", json!([1, 3]));
e.as_mut_log().insert("bar", json!([1, 3]));
e.as_mut_log().insert("request_id", "1");
reduce.transform_into(&mut outputs, e);

let mut e = Event::from("test message 2");
e.as_mut_log().insert("foo", json!([2, 4]));
e.as_mut_log().insert("bar", json!([2, 4]));
e.as_mut_log().insert("request_id", "2");
reduce.transform_into(&mut outputs, e);

let mut e = Event::from("test message 3");
e.as_mut_log().insert("foo", json!([5, 7]));
e.as_mut_log().insert("bar", json!([5, 7]));
e.as_mut_log().insert("request_id", "1");
reduce.transform_into(&mut outputs, e);

let mut e = Event::from("test message 4");
e.as_mut_log().insert("foo", json!("done"));
e.as_mut_log().insert("bar", json!("done"));
e.as_mut_log().insert("request_id", "1");
e.as_mut_log().insert("test_end", "yep");
reduce.transform_into(&mut outputs, e);

assert_eq!(outputs.len(), 1);
assert_eq!(
outputs.first().unwrap().as_log()[&"foo".into()],
json!([[1, 3], [5, 7], "done"]).into()
);

assert_eq!(outputs.len(), 1);
assert_eq!(
outputs.first().unwrap().as_log()[&"bar".into()],
json!([1, 3, 5, 7, "done"]).into()
);

outputs.clear();

let mut e = Event::from("test message 5");
e.as_mut_log().insert("foo", json!([6, 8]));
e.as_mut_log().insert("bar", json!([6, 8]));
e.as_mut_log().insert("request_id", "2");
reduce.transform_into(&mut outputs, e);

let mut e = Event::from("test message 6");
e.as_mut_log().insert("foo", json!("done"));
e.as_mut_log().insert("bar", json!("done"));
e.as_mut_log().insert("request_id", "2");
e.as_mut_log().insert("test_end", "yep");
reduce.transform_into(&mut outputs, e);

assert_eq!(outputs.len(), 1);
assert_eq!(
outputs.first().unwrap().as_log()[&"foo".into()],
json!([[2, 4], [6, 8], "done"]).into()
);
assert_eq!(
outputs.first().unwrap().as_log()[&"bar".into()],
json!([2, 4, 6, 8, "done"]).into()
);
}
}

0 comments on commit ced0bb8

Please sign in to comment.