Skip to content

Commit

Permalink
enhancement(remap): Add unnest function
Browse files Browse the repository at this point in the history
Part of #6330

This adds an `unnest` function to facilitate turning one event that has
a field with an array into an array of events. See documentation
examples.

This is slightly different than the proposed implementation in #7036,
but I think it results in a straightforward and easy to understand
transformation that works regardless of what type the array elements
are. The proposed implementation would have required them to be objects
to be able to merge into the parent object.

Merging the child element into the parent, if desired, will be possible
by stringing together another remap transform that would effectively do
`subfield = del(.subfield); . |= subfield`. Or presumably via iteration
/ mapping when we get to that.

Signed-off-by: Jesse Szwedko <jesse@szwedko.me>
  • Loading branch information
jszwedko committed May 11, 2021
1 parent b8a0cc0 commit 0ffd617
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 2 deletions.
78 changes: 78 additions & 0 deletions docs/reference/remap/functions/unnest.cue
@@ -0,0 +1,78 @@
package metadata

remap: functions: unnest: {
category: "Object"
description: """
Unnest an array field from an object to create an array of objects using that field; keeping all other fields.
Assigning the array result of this to `.` will result in multiple events being emitted from `remap`.
This is also referred to as `explode`ing in some languages.
"""

arguments: [
{
name: "path"
description: "The path of the field to unnest."
required: true
type: ["string"]
},
]
internal_failure_reasons: [
"Field path refers to is not an array",
]
notices: []
return: {
types: ["array"]
rules: [
"Returns an array of objects that matches the original object, but each with the specified path replaced with a single element from the original path.",
]
}

examples: [
{
title: "Unnest an array field"
input: log: {
hostname: "localhost"
messages: [
"message 1",
"message 2",
]
}
source: ". = unnest!(.messages)"
output: [
{log: {
hostname: "localhost"
messages: "message 1"
}},
{log: {
hostname: "localhost"
messages: "message 2"
}},
]
},
{
title: "Unnest nested an array field"
input: log: {
hostname: "localhost"
event: {
messages: [
"message 1",
"message 2",
]
}
}
source: ". = unnest!(.event.messages)"
output: [
{log: {
hostname: "localhost"
event: messages: "message 1"
}},
{log: {
hostname: "localhost"
event: messages: "message 2"
}},
]
},
]
}
4 changes: 4 additions & 0 deletions lib/vrl/compiler/src/expression/query.rs
Expand Up @@ -23,6 +23,10 @@ impl Query {
&self.path
}

pub fn target(&self) -> &Target {
&self.target
}

pub fn is_external(&self) -> bool {
matches!(self.target, Target::External)
}
Expand Down
4 changes: 2 additions & 2 deletions lib/vrl/compiler/src/expression/variable.rs
Expand Up @@ -21,11 +21,11 @@ impl Variable {
Self { ident, value }
}

pub(crate) fn ident(&self) -> &Ident {
pub fn ident(&self) -> &Ident {
&self.ident
}

pub(crate) fn value(&self) -> Option<&Value> {
pub fn value(&self) -> Option<&Value> {
self.value.as_ref()
}
}
Expand Down
2 changes: 2 additions & 0 deletions lib/vrl/stdlib/Cargo.toml
Expand Up @@ -133,6 +133,7 @@ default = [
"to_timestamp",
"to_unix_timestamp",
"truncate",
"unnest",
"upcase",
"uuid_v4",
]
Expand Down Expand Up @@ -230,6 +231,7 @@ to_syslog_severity = []
to_timestamp = ["shared/conversion", "chrono"]
to_unix_timestamp = ["chrono"]
truncate = []
unnest = []
upcase = []
uuid_v4 = ["bytes", "uuid"]

Expand Down
2 changes: 2 additions & 0 deletions lib/vrl/stdlib/benches/benches.rs
Expand Up @@ -95,6 +95,8 @@ criterion_group!(
to_timestamp,
to_unix_timestamp,
truncate,
// TODO: Cannot pass a Path to bench_function
//unnest
// TODO: value is dynamic so we cannot assert equality
//uuidv4,
upcase
Expand Down
6 changes: 6 additions & 0 deletions lib/vrl/stdlib/src/lib.rs
Expand Up @@ -194,6 +194,8 @@ mod to_timestamp;
mod to_unix_timestamp;
#[cfg(feature = "truncate")]
mod truncate;
#[cfg(feature = "unnest")]
mod unnest;
#[cfg(feature = "upcase")]
mod upcase;
#[cfg(feature = "uuid_v4")]
Expand Down Expand Up @@ -389,6 +391,8 @@ pub use to_timestamp::ToTimestamp;
pub use to_unix_timestamp::ToUnixTimestamp;
#[cfg(feature = "truncate")]
pub use truncate::Truncate;
#[cfg(feature = "unnest")]
pub use unnest::Unnest;
#[cfg(feature = "upcase")]
pub use upcase::Upcase;
#[cfg(feature = "uuid_v4")]
Expand Down Expand Up @@ -588,6 +592,8 @@ pub fn all() -> Vec<Box<dyn vrl::Function>> {
Box::new(ToUnixTimestamp),
#[cfg(feature = "truncate")]
Box::new(Truncate),
#[cfg(feature = "unnest")]
Box::new(Unnest),
#[cfg(feature = "upcase")]
Box::new(Upcase),
#[cfg(feature = "uuid_v4")]
Expand Down
160 changes: 160 additions & 0 deletions lib/vrl/stdlib/src/unnest.rs
@@ -0,0 +1,160 @@
use lookup::LookupBuf;
use vrl::prelude::*;

#[derive(Clone, Copy, Debug)]
pub struct Unnest;

impl Function for Unnest {
fn identifier(&self) -> &'static str {
"unnest"
}

fn parameters(&self) -> &'static [Parameter] {
&[Parameter {
keyword: "path",
kind: kind::ARRAY,
required: true,
}]
}

fn examples(&self) -> &'static [Example] {
&[
Example {
title: "external target",
source: indoc! {r#"
. = {"hostname": "localhost", "events": [{"message": "hello"}, {"message": "world"}]}
. = unnest!(.events)
"#},
result: Ok(
r#"[{"hostname": "localhost", "events": {"message": "hello"}}, {"hostname": "localhost", "events": {"message": "world"}}]"#,
),
},
Example {
title: "variable target",
source: indoc! {r#"
foo = {"hostname": "localhost", "events": [{"message": "hello"}, {"message": "world"}]}
foo = unnest!(foo.events)
"#},
result: Ok(
r#"[{"hostname": "localhost", "events": {"message": "hello"}}, {"hostname": "localhost", "events": {"message": "world"}}]"#,
),
},
]
}

fn compile(&self, mut arguments: ArgumentList) -> Compiled {
let path = arguments.required_query("path")?;

Ok(Box::new(UnnestFn { path }))
}
}

#[derive(Debug, Clone)]
struct UnnestFn {
path: expression::Query,
}

impl UnnestFn {
#[cfg(test)]
fn new(path: &str) -> Self {
use std::str::FromStr;

Self {
path: expression::Query::new(
expression::Target::External,
FromStr::from_str(path).unwrap(),
),
}
}
}

impl Expression for UnnestFn {
fn resolve(&self, ctx: &mut Context) -> Resolved {
let path = self.path.path();

let value: Value;
let target: Box<&dyn Target> = match self.path.target() {
expression::Target::External => Box::new(ctx.target()) as Box<_>,
expression::Target::Internal(v) => {
let v = ctx.state().variable(v.ident()).unwrap_or(&Value::Null);
Box::new(v as &dyn Target) as Box<_>
}
expression::Target::Container(expr) => {
value = expr.resolve(ctx)?;
Box::new(&value as &dyn Target) as Box<&dyn Target>
}
expression::Target::FunctionCall(expr) => {
value = expr.resolve(ctx)?;
Box::new(&value as &dyn Target) as Box<&dyn Target>
}
};

let root = target.get(&LookupBuf::root())?.unwrap_or(Value::Null);

let values = root
.get_by_path(path)
.cloned()
.ok_or(value::Error::Expected {
got: Kind::Null,
expected: Kind::Array,
})?
.try_array()?;

let events = values
.into_iter()
.map(|value| {
let mut event = root.clone();
event.insert_by_path(path, value);
event
})
.collect::<Vec<_>>();

Ok(Value::Array(events))
}

fn type_def(&self, state: &state::Compiler) -> TypeDef {
self.path
.type_def(state)
.fallible_unless(Kind::Object)
.restrict_array()
.add_null()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn unnest() {
let cases = vec![
(
value!({"hostname": "localhost", "events": [{"message": "hello"}, {"message": "world"}]}),
Ok(
value!([{"hostname": "localhost", "events": {"message": "hello"}}, {"hostname": "localhost", "events": {"message": "world"}}]),
),
UnnestFn::new("events"),
),
(
value!({"hostname": "localhost", "events": [{"message": "hello"}, {"message": "world"}]}),
Err(r#"expected "array", got "null""#.to_owned()),
UnnestFn::new("unknown"),
),
(
value!({"hostname": "localhost", "events": [{"message": "hello"}, {"message": "world"}]}),
Err(r#"expected "array", got "string""#.to_owned()),
UnnestFn::new("hostname"),
),
];

for (object, exp, func) in cases {
let mut object: Value = object.into();
let mut runtime_state = vrl::state::Runtime::default();
let mut ctx = Context::new(&mut object, &mut runtime_state);
let got = func
.resolve(&mut ctx)
.map_err(|e| format!("{:#}", anyhow::anyhow!(e)));
assert_eq!(got, exp);
}
}
}

0 comments on commit 0ffd617

Please sign in to comment.