Skip to content

Commit

Permalink
enhancement(vrl): add generic encode_key_value
Browse files Browse the repository at this point in the history
Signed-off-by: prognant <pierre.rognant@datadoghq.com>
  • Loading branch information
prognant committed Jun 15, 2021
1 parent c04924e commit 9264e6c
Show file tree
Hide file tree
Showing 4 changed files with 368 additions and 260 deletions.
4 changes: 3 additions & 1 deletion lib/vrl/stdlib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ default = [
"downcase",
"encode_base64",
"encode_json",
"encode_key_value",
"encode_logfmt",
"ends_with",
"exists",
Expand Down Expand Up @@ -153,7 +154,8 @@ del = []
downcase = []
encode_base64 = ["base64"]
encode_json = ["serde_json"]
encode_logfmt = []
encode_key_value = []
encode_logfmt = ["encode_key_value"]
ends_with = []
exists = []
flatten = []
Expand Down
347 changes: 347 additions & 0 deletions lib/vrl/stdlib/src/encode_key_value.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,347 @@
use std::collections::BTreeMap;
use std::fmt::Write;
use std::result::Result;
use vrl::prelude::*;
use Value::{Array, Object};

#[derive(Clone, Copy, Debug)]
pub struct EncodeKeyValue;

impl Function for EncodeKeyValue {
fn identifier(&self) -> &'static str {
"encode_key_value"
}

fn parameters(&self) -> &'static [Parameter] {
&[
Parameter {
keyword: "value",
kind: kind::OBJECT,
required: true,
},
Parameter {
keyword: "fields_ordering",
kind: kind::ARRAY,
required: false,
},
Parameter {
keyword: "key_value_delimiter",
kind: kind::ANY,
required: false,
},
Parameter {
keyword: "field_delimiter",
kind: kind::ANY,
required: false,
},
]
}

fn compile(&self, mut arguments: ArgumentList) -> Compiled {
let value = arguments.required("value");
let fields = arguments.optional("fields_ordering");

let key_value_delimiter = arguments
.optional("key_value_delimiter")
.unwrap_or_else(|| expr!("="));

let field_delimiter = arguments
.optional("field_delimiter")
.unwrap_or_else(|| expr!(" "));

Ok(Box::new(EncodeKeyValueFn {
value,
fields,
key_value_delimiter,
field_delimiter,
}))
}

fn examples(&self) -> &'static [Example] {
&[
Example {
title: "encode object",
source: r#"encode_key_value!({"lvl": "info", "msg": "This is a message", "log_id": 12345})"#,
result: Ok(r#"s'log_id=12345 lvl=info msg="This is a message"'"#),
},
Example {
title: "encode object with fields ordering",
source: r#"encode_key_value!({"msg": "This is a message", "lvl": "info", "log_id": 12345}, ["lvl", "msg"])"#,
result: Ok(r#"s'lvl=info msg="This is a message" log_id=12345'"#),
},
Example {
title: "custom delimiters",
source: r#"encode_key_value!({"start": "ool", "end": "kul", "stop1": "yyc", "stop2" : "gdx"}, key_value_delimiter: ":", field_delimiter: ",")"#,
result: Ok(r#"s'end:kul,start:ool,stop1:yyc,stop2:gdx'"#),
},
]
}
}

#[derive(Clone, Debug)]
pub(crate) struct EncodeKeyValueFn {
pub(crate) value: Box<dyn Expression>,
pub(crate) fields: Option<Box<dyn Expression>>,
pub(crate) key_value_delimiter: Box<dyn Expression>,
pub(crate) field_delimiter: Box<dyn Expression>,
}

fn resolve_fields(fields: Value) -> Result<Vec<String>, ExpressionError> {
let arr = fields.try_array()?;
arr.iter()
.enumerate()
.map(|(idx, v)| {
v.try_bytes_utf8_lossy()
.map(|v| v.to_string())
.map_err(|e| format!("invalid field value type at index {}: {}", idx, e).into())
})
.collect()
}

impl Expression for EncodeKeyValueFn {
fn resolve(&self, ctx: &mut Context) -> Resolved {
let value = self.value.resolve(ctx)?;
let fields = match &self.fields {
None => Ok(vec![]),
Some(expr) => {
let fields = expr.resolve(ctx)?;
resolve_fields(fields)
}
}?;

let object = value.try_object()?;

let value = self.key_value_delimiter.resolve(ctx)?;
let key_value_delimiter = value.try_bytes_utf8_lossy()?;

let value = self.field_delimiter.resolve(ctx)?;
let field_delimiter = value.try_bytes_utf8_lossy()?;

Ok(encode(object, &fields[..], &key_value_delimiter, &field_delimiter).into())
}

fn type_def(&self, _state: &state::Compiler) -> TypeDef {
TypeDef::new().bytes().fallible()
}
}

fn encode_string(output: &mut String, str: &str) {
let needs_quoting = str.chars().any(char::is_whitespace);

if needs_quoting {
output.write_char('"').unwrap();
}

for c in str.chars() {
match c {
'\\' => output.write_str(r#"\\"#).unwrap(),
'"' => output.write_str(r#"\""#).unwrap(),
'\n' => output.write_str(r#"\\n"#).unwrap(),
_ => output.write_char(c).unwrap(),
}
}

if needs_quoting {
output.write_char('"').unwrap();
}
}

fn encode_value(output: &mut String, value: &Value) {
match value {
Value::Bytes(b) => {
let val = String::from_utf8_lossy(b);
encode_string(output, &val)
}
_ => encode_string(output, &value.to_string()),
}
}

fn flatten<'a>(
input: impl IntoIterator<Item = (String, Value)> + 'a,
parent_key: String,
separator: char,
depth: usize,
) -> Box<dyn Iterator<Item = (String, Value)> + 'a> {
let iter = input.into_iter().map(move |(key, value)| {
let new_key = if depth > 0 {
format!("{}{}{}", parent_key, separator, key)
} else {
key
};

match value {
Object(map) => flatten(map, new_key, separator, depth + 1),
Array(array) => {
let array_map: BTreeMap<_, _> = array
.into_iter()
.enumerate()
.map(|(key, value)| (key.to_string(), value))
.collect();
flatten(array_map, new_key, separator, depth + 1)
}
_ => Box::new(std::iter::once((new_key, value)))
as Box<dyn Iterator<Item = (std::string::String, vrl::Value)>>,
}
});

Box::new(iter.flatten())
}

fn encode_field<'a>(output: &mut String, key: &str, value: &Value, key_value_delimiter: &'a str) {
encode_string(output, key);
output.write_str(key_value_delimiter).unwrap();
encode_value(output, value)
}

pub fn encode<'a>(
input: BTreeMap<String, Value>,
fields: &[String],
key_value_delimiter: &'a str,
field_delimiter: &'a str,
) -> String {
let mut output = String::new();

let mut input: BTreeMap<_, _> = flatten(input, String::from(""), '.', 0).collect();

for field in fields.iter() {
if let Some(val) = input.remove(field) {
encode_field(&mut output, field, &val, key_value_delimiter);
output.write_str(field_delimiter).unwrap();
}
}

for (key, value) in input.iter() {
encode_field(&mut output, key, value, key_value_delimiter);
output.write_str(field_delimiter).unwrap();
}

if output.ends_with(field_delimiter) {
output.truncate(output.len() - field_delimiter.len())
}

output
}

#[cfg(test)]
mod tests {
use super::*;
use shared::btreemap;

test_function![
encode_key_value => EncodeKeyValue;

single_element {
args: func_args![value:
btreemap! {
"lvl" => "info"
}
],
want: Ok("lvl=info"),
tdef: TypeDef::new().bytes().fallible(),
}

multiple_elements {
args: func_args![value:
btreemap! {
"lvl" => "info",
"log_id" => 12345
}
],
want: Ok("log_id=12345 lvl=info"),
tdef: TypeDef::new().bytes().fallible(),
}

string_with_spaces {
args: func_args![value:
btreemap! {
"lvl" => "info",
"msg" => "This is a log message"
}],
want: Ok(r#"lvl=info msg="This is a log message""#),
tdef: TypeDef::new().bytes().fallible(),
}

string_with_characters_to_escape {
args: func_args![value:
btreemap! {
"lvl" => "info",
"msg" => r#"payload: {"code": 200}\n"#,
"another_field" => "some\nfield\\and things",
"space key" => "foo"
}],
want: Ok(r#"another_field="some\\nfield\\and things" lvl=info msg="payload: {\"code\": 200}\\n" "space key"=foo"#),
tdef: TypeDef::new().bytes().fallible(),
}

nested_fields {
args: func_args![value:
btreemap! {
"log" => btreemap! {
"file" => btreemap! {
"path" => "encode_key_value.rs"
},
},
"agent" => btreemap! {
"name" => "vector",
"id" => 1234
},
"network" => btreemap! {
"ip" => value!([127, 0, 0, 1]),
"proto" => "tcp"
},
"event" => "log"
}],
want: Ok("agent.id=1234 agent.name=vector event=log log.file.path=encode_key_value.rs network.ip.0=127 network.ip.1=0 network.ip.2=0 network.ip.3=1 network.proto=tcp"),
tdef: TypeDef::new().bytes().fallible(),
}

fields_ordering {
args: func_args![value:
btreemap! {
"lvl" => "info",
"msg" => "This is a log message",
"log_id" => 12345,
},
fields_ordering: value!(["lvl", "msg"])
],
want: Ok(r#"lvl=info msg="This is a log message" log_id=12345"#),
tdef: TypeDef::new().bytes().fallible(),
}

nested_fields_ordering {
args: func_args![value:
btreemap! {
"log" => btreemap! {
"file" => btreemap! {
"path" => "encode_key_value.rs"
},
},
"agent" => btreemap! {
"name" => "vector",
},
"event" => "log"
},
fields_ordering: value!(["event", "log.file.path", "agent.name"])
],
want: Ok("event=log log.file.path=encode_key_value.rs agent.name=vector"),
tdef: TypeDef::new().bytes().fallible(),
}

fields_ordering_invalid_field_type {
args: func_args![value:
btreemap! {
"lvl" => "info",
"msg" => "This is a log message",
"log_id" => 12345,
},
fields_ordering: value!(["lvl", 2])
],
want: Err(format!(r"invalid field value type at index 1: {}",
value::Error::Expected {
got: Kind::Integer,
expected: Kind::Bytes
})),
tdef: TypeDef::new().bytes().fallible(),
}
];
}
Loading

0 comments on commit 9264e6c

Please sign in to comment.