Skip to content

Commit

Permalink
enhancement(reduce transform): Add additional merge strategies (#8559)
Browse files Browse the repository at this point in the history
* Add additional merge strategies: longest, shortest, retain, unique

Signed-off-by: dbcfd <bdbrowning2@gmail.com>

* Fix tests, fmt

Signed-off-by: dbcfd <bdbrowning2@gmail.com>

* Fix tests, adjust float hashing

Signed-off-by: dbcfd <bdbrowning2@gmail.com>

* Dependency fixes

Signed-off-by: dbcfd <bdbrowning2@gmail.com>

* Error for shortest/longest when not array

Signed-off-by: dbcfd <bdbrowning2@gmail.com>

* Revert dependency changes

Signed-off-by: dbcfd <bdbrowning2@gmail.com>

* Review changes

Signed-off-by: dbcfd <bdbrowning2@gmail.com>

* Revert nom change

Signed-off-by: dbcfd <bdbrowning2@gmail.com>

* Revert build pin

Signed-off-by: dbcfd <bdbrowning2@gmail.com>

* Rename strategy to indicate it is for array, handle infinity properly

Signed-off-by: dbcfd <bdbrowning2@gmail.com>

* fmt

Signed-off-by: dbcfd <bdbrowning2@gmail.com>

* Fix clippy warnings by implementing PartialEq and using bits

Signed-off-by: dbcfd <bdbrowning2@gmail.com>

* Another clippy fix

Signed-off-by: dbcfd <bdbrowning2@gmail.com>
  • Loading branch information
dbcfd committed Sep 2, 2021
1 parent 91a4534 commit 0f95401
Show file tree
Hide file tree
Showing 3 changed files with 386 additions and 2 deletions.
131 changes: 130 additions & 1 deletion lib/vector-core/src/event/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ use serde::{Deserialize, Serialize, Serializer};
use std::collections::BTreeMap;
use std::convert::{TryFrom, TryInto};
use std::fmt::Debug;
use std::hash::{Hash, Hasher};
use std::iter::FromIterator;
use toml::value::Value as TomlValue;

#[derive(PartialEq, PartialOrd, Debug, Clone, Deserialize)]
#[derive(PartialOrd, Debug, Clone, Deserialize)]
pub enum Value {
Bytes(Bytes),
Integer(i64),
Expand All @@ -22,6 +23,82 @@ pub enum Value {
Null,
}

impl Eq for Value {}

impl PartialEq<Value> for Value {
fn eq(&self, other: &Value) -> bool {
match (self, other) {
(Value::Array(a), Value::Array(b)) => a.eq(b),
(Value::Boolean(a), Value::Boolean(b)) => a.eq(b),
(Value::Bytes(a), Value::Bytes(b)) => a.eq(b),
(Value::Float(a), Value::Float(b)) => {
// This compares floats with the following rules:
// * NaNs compare as equal
// * Positive and negative infinity are not equal
// * -0 and +0 are not equal
// * Floats will compare using truncated portion
if a.is_sign_negative() == b.is_sign_negative() {
if a.is_finite() && b.is_finite() {
a.trunc().eq(&b.trunc())
} else {
a.is_finite() == b.is_finite()
}
} else {
false
}
}
(Value::Integer(a), Value::Integer(b)) => a.eq(b),
(Value::Map(a), Value::Map(b)) => a.eq(b),
(Value::Null, Value::Null) => true,
(Value::Timestamp(a), Value::Timestamp(b)) => a.eq(b),
_ => false,
}
}
}

impl Hash for Value {
fn hash<H: Hasher>(&self, state: &mut H) {
core::mem::discriminant(self).hash(state);
match self {
Value::Array(v) => {
v.hash(state);
}
Value::Boolean(v) => {
v.hash(state);
}
Value::Bytes(v) => {
v.hash(state);
}
Value::Float(v) => {
// This hashes floats with the following rules:
// * NaNs hash as equal (covered by above discriminant hash)
// * Positive and negative infinity has to different values
// * -0 and +0 hash to different values
// * otherwise transmute to u64 and hash
if v.is_finite() {
v.is_sign_negative().hash(state);
let trunc: u64 = unsafe { std::mem::transmute(v.trunc().to_bits()) };
trunc.hash(state);
} else if !v.is_nan() {
v.is_sign_negative().hash(state);
} //else covered by discriminant hash
}
Value::Integer(v) => {
v.hash(state);
}
Value::Map(v) => {
v.hash(state);
}
Value::Null => {
//covered by discriminant hash
}
Value::Timestamp(v) => {
v.hash(state);
}
}
}
}

impl ByteSizeOf for Value {
fn allocated_bytes(&self) -> usize {
match self {
Expand Down Expand Up @@ -1278,6 +1355,58 @@ mod test {
Ok(buf)
}

mod value_compare {
use super::*;

#[test]
fn compare_correctly() {
assert!(Value::Integer(0).eq(&Value::Integer(0)));
assert!(!Value::Integer(0).eq(&Value::Integer(1)));
assert!(!Value::Boolean(true).eq(&Value::Integer(2)));
assert!(Value::Float(1.2).eq(&Value::Float(1.4)));
assert!(!Value::Float(1.2).eq(&Value::Float(-1.2)));
assert!(!Value::Float(-0.0).eq(&Value::Float(0.0)));
assert!(!Value::Float(f64::NEG_INFINITY).eq(&Value::Float(f64::INFINITY)));
assert!(Value::Array(vec![Value::Integer(0), Value::Boolean(true)])
.eq(&Value::Array(vec![Value::Integer(0), Value::Boolean(true)])));
assert!(!Value::Array(vec![Value::Integer(0), Value::Boolean(true)])
.eq(&Value::Array(vec![Value::Integer(1), Value::Boolean(true)])));
}
}

mod value_hash {
use super::*;

fn hash(a: &Value) -> u64 {
let mut h = std::collections::hash_map::DefaultHasher::new();

a.hash(&mut h);
h.finish()
}

#[test]
fn hash_correctly() {
assert_eq!(hash(&Value::Integer(0)), hash(&Value::Integer(0)));
assert_ne!(hash(&Value::Integer(0)), hash(&Value::Integer(1)));
assert_ne!(hash(&Value::Boolean(true)), hash(&Value::Integer(2)));
assert_eq!(hash(&Value::Float(1.2)), hash(&Value::Float(1.4)));
assert_ne!(hash(&Value::Float(1.2)), hash(&Value::Float(-1.2)));
assert_ne!(hash(&Value::Float(-0.0)), hash(&Value::Float(0.0)));
assert_ne!(
hash(&Value::Float(f64::NEG_INFINITY)),
hash(&Value::Float(f64::INFINITY))
);
assert_eq!(
hash(&Value::Array(vec![Value::Integer(0), Value::Boolean(true)])),
hash(&Value::Array(vec![Value::Integer(0), Value::Boolean(true)]))
);
assert_ne!(
hash(&Value::Array(vec![Value::Integer(0), Value::Boolean(true)])),
hash(&Value::Array(vec![Value::Integer(1), Value::Boolean(true)]))
);
}
}

mod insert_get_remove {
use super::*;

Expand Down

0 comments on commit 0f95401

Please sign in to comment.