From 9264e6cc564889e58870d6438ba2758aafa6046b Mon Sep 17 00:00:00 2001 From: prognant Date: Tue, 15 Jun 2021 15:41:35 +0200 Subject: [PATCH] enhancement(vrl): add generic `encode_key_value` Signed-off-by: prognant --- lib/vrl/stdlib/Cargo.toml | 4 +- lib/vrl/stdlib/src/encode_key_value.rs | 347 +++++++++++++++++++++++++ lib/vrl/stdlib/src/encode_logfmt.rs | 271 +------------------ lib/vrl/stdlib/src/lib.rs | 6 + 4 files changed, 368 insertions(+), 260 deletions(-) create mode 100644 lib/vrl/stdlib/src/encode_key_value.rs diff --git a/lib/vrl/stdlib/Cargo.toml b/lib/vrl/stdlib/Cargo.toml index e75b74a52fd3a8..a3ac219f540ecd 100644 --- a/lib/vrl/stdlib/Cargo.toml +++ b/lib/vrl/stdlib/Cargo.toml @@ -52,6 +52,7 @@ default = [ "downcase", "encode_base64", "encode_json", + "encode_key_value", "encode_logfmt", "ends_with", "exists", @@ -153,7 +154,8 @@ del = [] downcase = [] encode_base64 = ["base64"] encode_json = ["serde_json"] -encode_logfmt = [] +encode_key_value = [] +encode_logfmt = ["encode_key_value"] ends_with = [] exists = [] flatten = [] diff --git a/lib/vrl/stdlib/src/encode_key_value.rs b/lib/vrl/stdlib/src/encode_key_value.rs new file mode 100644 index 00000000000000..3d6a79576d6118 --- /dev/null +++ b/lib/vrl/stdlib/src/encode_key_value.rs @@ -0,0 +1,347 @@ +use std::collections::BTreeMap; +use std::fmt::Write; +use std::result::Result; +use vrl::prelude::*; +use Value::{Array, Object}; + +#[derive(Clone, Copy, Debug)] +pub struct EncodeKeyValue; + +impl Function for EncodeKeyValue { + fn identifier(&self) -> &'static str { + "encode_key_value" + } + + fn parameters(&self) -> &'static [Parameter] { + &[ + Parameter { + keyword: "value", + kind: kind::OBJECT, + required: true, + }, + Parameter { + keyword: "fields_ordering", + kind: kind::ARRAY, + required: false, + }, + Parameter { + keyword: "key_value_delimiter", + kind: kind::ANY, + required: false, + }, + Parameter { + keyword: "field_delimiter", + kind: kind::ANY, + required: false, + }, + ] + } + + fn compile(&self, mut arguments: ArgumentList) -> Compiled { + let value = arguments.required("value"); + let fields = arguments.optional("fields_ordering"); + + let key_value_delimiter = arguments + .optional("key_value_delimiter") + .unwrap_or_else(|| expr!("=")); + + let field_delimiter = arguments + .optional("field_delimiter") + .unwrap_or_else(|| expr!(" ")); + + Ok(Box::new(EncodeKeyValueFn { + value, + fields, + key_value_delimiter, + field_delimiter, + })) + } + + fn examples(&self) -> &'static [Example] { + &[ + Example { + title: "encode object", + source: r#"encode_key_value!({"lvl": "info", "msg": "This is a message", "log_id": 12345})"#, + result: Ok(r#"s'log_id=12345 lvl=info msg="This is a message"'"#), + }, + Example { + title: "encode object with fields ordering", + source: r#"encode_key_value!({"msg": "This is a message", "lvl": "info", "log_id": 12345}, ["lvl", "msg"])"#, + result: Ok(r#"s'lvl=info msg="This is a message" log_id=12345'"#), + }, + Example { + title: "custom delimiters", + source: r#"encode_key_value!({"start": "ool", "end": "kul", "stop1": "yyc", "stop2" : "gdx"}, key_value_delimiter: ":", field_delimiter: ",")"#, + result: Ok(r#"s'end:kul,start:ool,stop1:yyc,stop2:gdx'"#), + }, + ] + } +} + +#[derive(Clone, Debug)] +pub(crate) struct EncodeKeyValueFn { + pub(crate) value: Box, + pub(crate) fields: Option>, + pub(crate) key_value_delimiter: Box, + pub(crate) field_delimiter: Box, +} + +fn resolve_fields(fields: Value) -> Result, ExpressionError> { + let arr = fields.try_array()?; + arr.iter() + .enumerate() + .map(|(idx, v)| { + v.try_bytes_utf8_lossy() + .map(|v| v.to_string()) + .map_err(|e| format!("invalid field value type at index {}: {}", idx, e).into()) + }) + .collect() +} + +impl Expression for EncodeKeyValueFn { + fn resolve(&self, ctx: &mut Context) -> Resolved { + let value = self.value.resolve(ctx)?; + let fields = match &self.fields { + None => Ok(vec![]), + Some(expr) => { + let fields = expr.resolve(ctx)?; + resolve_fields(fields) + } + }?; + + let object = value.try_object()?; + + let value = self.key_value_delimiter.resolve(ctx)?; + let key_value_delimiter = value.try_bytes_utf8_lossy()?; + + let value = self.field_delimiter.resolve(ctx)?; + let field_delimiter = value.try_bytes_utf8_lossy()?; + + Ok(encode(object, &fields[..], &key_value_delimiter, &field_delimiter).into()) + } + + fn type_def(&self, _state: &state::Compiler) -> TypeDef { + TypeDef::new().bytes().fallible() + } +} + +fn encode_string(output: &mut String, str: &str) { + let needs_quoting = str.chars().any(char::is_whitespace); + + if needs_quoting { + output.write_char('"').unwrap(); + } + + for c in str.chars() { + match c { + '\\' => output.write_str(r#"\\"#).unwrap(), + '"' => output.write_str(r#"\""#).unwrap(), + '\n' => output.write_str(r#"\\n"#).unwrap(), + _ => output.write_char(c).unwrap(), + } + } + + if needs_quoting { + output.write_char('"').unwrap(); + } +} + +fn encode_value(output: &mut String, value: &Value) { + match value { + Value::Bytes(b) => { + let val = String::from_utf8_lossy(b); + encode_string(output, &val) + } + _ => encode_string(output, &value.to_string()), + } +} + +fn flatten<'a>( + input: impl IntoIterator + 'a, + parent_key: String, + separator: char, + depth: usize, +) -> Box + 'a> { + let iter = input.into_iter().map(move |(key, value)| { + let new_key = if depth > 0 { + format!("{}{}{}", parent_key, separator, key) + } else { + key + }; + + match value { + Object(map) => flatten(map, new_key, separator, depth + 1), + Array(array) => { + let array_map: BTreeMap<_, _> = array + .into_iter() + .enumerate() + .map(|(key, value)| (key.to_string(), value)) + .collect(); + flatten(array_map, new_key, separator, depth + 1) + } + _ => Box::new(std::iter::once((new_key, value))) + as Box>, + } + }); + + Box::new(iter.flatten()) +} + +fn encode_field<'a>(output: &mut String, key: &str, value: &Value, key_value_delimiter: &'a str) { + encode_string(output, key); + output.write_str(key_value_delimiter).unwrap(); + encode_value(output, value) +} + +pub fn encode<'a>( + input: BTreeMap, + fields: &[String], + key_value_delimiter: &'a str, + field_delimiter: &'a str, +) -> String { + let mut output = String::new(); + + let mut input: BTreeMap<_, _> = flatten(input, String::from(""), '.', 0).collect(); + + for field in fields.iter() { + if let Some(val) = input.remove(field) { + encode_field(&mut output, field, &val, key_value_delimiter); + output.write_str(field_delimiter).unwrap(); + } + } + + for (key, value) in input.iter() { + encode_field(&mut output, key, value, key_value_delimiter); + output.write_str(field_delimiter).unwrap(); + } + + if output.ends_with(field_delimiter) { + output.truncate(output.len() - field_delimiter.len()) + } + + output +} + +#[cfg(test)] +mod tests { + use super::*; + use shared::btreemap; + + test_function![ + encode_key_value => EncodeKeyValue; + + single_element { + args: func_args![value: + btreemap! { + "lvl" => "info" + } + ], + want: Ok("lvl=info"), + tdef: TypeDef::new().bytes().fallible(), + } + + multiple_elements { + args: func_args![value: + btreemap! { + "lvl" => "info", + "log_id" => 12345 + } + ], + want: Ok("log_id=12345 lvl=info"), + tdef: TypeDef::new().bytes().fallible(), + } + + string_with_spaces { + args: func_args![value: + btreemap! { + "lvl" => "info", + "msg" => "This is a log message" + }], + want: Ok(r#"lvl=info msg="This is a log message""#), + tdef: TypeDef::new().bytes().fallible(), + } + + string_with_characters_to_escape { + args: func_args![value: + btreemap! { + "lvl" => "info", + "msg" => r#"payload: {"code": 200}\n"#, + "another_field" => "some\nfield\\and things", + "space key" => "foo" + }], + want: Ok(r#"another_field="some\\nfield\\and things" lvl=info msg="payload: {\"code\": 200}\\n" "space key"=foo"#), + tdef: TypeDef::new().bytes().fallible(), + } + + nested_fields { + args: func_args![value: + btreemap! { + "log" => btreemap! { + "file" => btreemap! { + "path" => "encode_key_value.rs" + }, + }, + "agent" => btreemap! { + "name" => "vector", + "id" => 1234 + }, + "network" => btreemap! { + "ip" => value!([127, 0, 0, 1]), + "proto" => "tcp" + }, + "event" => "log" + }], + want: Ok("agent.id=1234 agent.name=vector event=log log.file.path=encode_key_value.rs network.ip.0=127 network.ip.1=0 network.ip.2=0 network.ip.3=1 network.proto=tcp"), + tdef: TypeDef::new().bytes().fallible(), + } + + fields_ordering { + args: func_args![value: + btreemap! { + "lvl" => "info", + "msg" => "This is a log message", + "log_id" => 12345, + }, + fields_ordering: value!(["lvl", "msg"]) + ], + want: Ok(r#"lvl=info msg="This is a log message" log_id=12345"#), + tdef: TypeDef::new().bytes().fallible(), + } + + nested_fields_ordering { + args: func_args![value: + btreemap! { + "log" => btreemap! { + "file" => btreemap! { + "path" => "encode_key_value.rs" + }, + }, + "agent" => btreemap! { + "name" => "vector", + }, + "event" => "log" + }, + fields_ordering: value!(["event", "log.file.path", "agent.name"]) + ], + want: Ok("event=log log.file.path=encode_key_value.rs agent.name=vector"), + tdef: TypeDef::new().bytes().fallible(), + } + + fields_ordering_invalid_field_type { + args: func_args![value: + btreemap! { + "lvl" => "info", + "msg" => "This is a log message", + "log_id" => 12345, + }, + fields_ordering: value!(["lvl", 2]) + ], + want: Err(format!(r"invalid field value type at index 1: {}", + value::Error::Expected { + got: Kind::Integer, + expected: Kind::Bytes + })), + tdef: TypeDef::new().bytes().fallible(), + } + ]; +} diff --git a/lib/vrl/stdlib/src/encode_logfmt.rs b/lib/vrl/stdlib/src/encode_logfmt.rs index 35d94100b1004f..1632bbcd6550c1 100644 --- a/lib/vrl/stdlib/src/encode_logfmt.rs +++ b/lib/vrl/stdlib/src/encode_logfmt.rs @@ -1,8 +1,5 @@ -use std::collections::BTreeMap; -use std::fmt::Write; -use std::result::Result; +use crate::encode_key_value::EncodeKeyValueFn; use vrl::prelude::*; -use Value::{Array, Object}; #[derive(Clone, Copy, Debug)] pub struct EncodeLogfmt; @@ -28,10 +25,20 @@ impl Function for EncodeLogfmt { } fn compile(&self, mut arguments: ArgumentList) -> Compiled { + // The encode_logfmt function is just an alias for `encode_key_value` with the following + // parameters for the delimiters. + let key_value_delimiter = expr!("="); + let field_delimiter = expr!(" "); + let value = arguments.required("value"); let fields = arguments.optional("fields_ordering"); - Ok(Box::new(EncodeLogfmtFn { value, fields })) + Ok(Box::new(EncodeKeyValueFn { + value, + fields, + key_value_delimiter, + field_delimiter, + })) } fn examples(&self) -> &'static [Example] { @@ -49,257 +56,3 @@ impl Function for EncodeLogfmt { ] } } - -#[derive(Clone, Debug)] -struct EncodeLogfmtFn { - value: Box, - fields: Option>, -} - -fn resolve_fields(fields: Value) -> Result, ExpressionError> { - let arr = fields.try_array()?; - arr.iter() - .enumerate() - .map(|(idx, v)| { - v.try_bytes_utf8_lossy() - .map(|v| v.to_string()) - .map_err(|e| format!("invalid field value type at index {}: {}", idx, e).into()) - }) - .collect() -} - -impl Expression for EncodeLogfmtFn { - fn resolve(&self, ctx: &mut Context) -> Resolved { - let value = self.value.resolve(ctx)?; - let fields = match &self.fields { - None => Ok(vec![]), - Some(expr) => { - let fields = expr.resolve(ctx)?; - resolve_fields(fields) - } - }?; - - let object = value.try_object()?; - Ok(encode(object, &fields[..]).into()) - } - - fn type_def(&self, _state: &state::Compiler) -> TypeDef { - TypeDef::new().bytes().fallible() - } -} - -fn encode_string(output: &mut String, str: &str) { - let needs_quoting = str.chars().any(char::is_whitespace); - - if needs_quoting { - output.write_char('"').unwrap(); - } - - for c in str.chars() { - match c { - '\\' => output.write_str(r#"\\"#).unwrap(), - '"' => output.write_str(r#"\""#).unwrap(), - '\n' => output.write_str(r#"\\n"#).unwrap(), - _ => output.write_char(c).unwrap(), - } - } - - if needs_quoting { - output.write_char('"').unwrap(); - } -} - -fn encode_value(output: &mut String, value: &Value) { - match value { - Value::Bytes(b) => { - let val = String::from_utf8_lossy(b); - encode_string(output, &val) - } - _ => encode_string(output, &value.to_string()), - } -} - -fn flatten<'a>( - input: impl IntoIterator + 'a, - parent_key: String, - separator: char, - depth: usize, -) -> Box + 'a> { - let iter = input.into_iter().map(move |(key, value)| { - let new_key = if depth > 0 { - format!("{}{}{}", parent_key, separator, key) - } else { - key - }; - - match value { - Object(map) => flatten(map, new_key, separator, depth + 1), - Array(array) => { - let array_map: BTreeMap<_, _> = array - .into_iter() - .enumerate() - .map(|(key, value)| (key.to_string(), value)) - .collect(); - flatten(array_map, new_key, separator, depth + 1) - } - _ => Box::new(std::iter::once((new_key, value))) - as Box>, - } - }); - - Box::new(iter.flatten()) -} - -fn encode_field(output: &mut String, key: &str, value: &Value) { - encode_string(output, key); - output.write_char('=').unwrap(); - encode_value(output, value) -} - -pub fn encode(input: BTreeMap, fields: &[String]) -> String { - let mut output = String::new(); - - let mut input: BTreeMap<_, _> = flatten(input, String::from(""), '.', 0).collect(); - - for field in fields.iter() { - if let Some(val) = input.remove(field) { - encode_field(&mut output, field, &val); - output.write_char(' ').unwrap(); - } - } - - for (key, value) in input.iter() { - encode_field(&mut output, key, value); - output.write_char(' ').unwrap(); - } - - if output.ends_with(' ') { - output.truncate(output.len() - 1) - } - - output -} - -#[cfg(test)] -mod tests { - use super::*; - use shared::btreemap; - - test_function![ - encode_logfmt => EncodeLogfmt; - - single_element { - args: func_args![value: - btreemap! { - "lvl" => "info" - } - ], - want: Ok("lvl=info"), - tdef: TypeDef::new().bytes().fallible(), - } - - multiple_elements { - args: func_args![value: - btreemap! { - "lvl" => "info", - "log_id" => 12345 - } - ], - want: Ok("log_id=12345 lvl=info"), - tdef: TypeDef::new().bytes().fallible(), - } - - string_with_spaces { - args: func_args![value: - btreemap! { - "lvl" => "info", - "msg" => "This is a log message" - }], - want: Ok(r#"lvl=info msg="This is a log message""#), - tdef: TypeDef::new().bytes().fallible(), - } - - string_with_characters_to_escape { - args: func_args![value: - btreemap! { - "lvl" => "info", - "msg" => r#"payload: {"code": 200}\n"#, - "another_field" => "some\nfield\\and things", - "space key" => "foo" - }], - want: Ok(r#"another_field="some\\nfield\\and things" lvl=info msg="payload: {\"code\": 200}\\n" "space key"=foo"#), - tdef: TypeDef::new().bytes().fallible(), - } - - nested_fields { - args: func_args![value: - btreemap! { - "log" => btreemap! { - "file" => btreemap! { - "path" => "encode_logfmt.rs" - }, - }, - "agent" => btreemap! { - "name" => "vector", - "id" => 1234 - }, - "network" => btreemap! { - "ip" => value!([127, 0, 0, 1]), - "proto" => "tcp" - }, - "event" => "log" - }], - want: Ok("agent.id=1234 agent.name=vector event=log log.file.path=encode_logfmt.rs network.ip.0=127 network.ip.1=0 network.ip.2=0 network.ip.3=1 network.proto=tcp"), - tdef: TypeDef::new().bytes().fallible(), - } - - fields_ordering { - args: func_args![value: - btreemap! { - "lvl" => "info", - "msg" => "This is a log message", - "log_id" => 12345, - }, - fields_ordering: value!(["lvl", "msg"]) - ], - want: Ok(r#"lvl=info msg="This is a log message" log_id=12345"#), - tdef: TypeDef::new().bytes().fallible(), - } - - nested_fields_ordering { - args: func_args![value: - btreemap! { - "log" => btreemap! { - "file" => btreemap! { - "path" => "encode_logfmt.rs" - }, - }, - "agent" => btreemap! { - "name" => "vector", - }, - "event" => "log" - }, - fields_ordering: value!(["event", "log.file.path", "agent.name"]) - ], - want: Ok("event=log log.file.path=encode_logfmt.rs agent.name=vector"), - tdef: TypeDef::new().bytes().fallible(), - } - - fields_ordering_invalid_field_type { - args: func_args![value: - btreemap! { - "lvl" => "info", - "msg" => "This is a log message", - "log_id" => 12345, - }, - fields_ordering: value!(["lvl", 2]) - ], - want: Err(format!(r"invalid field value type at index 1: {}", - value::Error::Expected { - got: Kind::Integer, - expected: Kind::Bytes - })), - tdef: TypeDef::new().bytes().fallible(), - } - ]; -} diff --git a/lib/vrl/stdlib/src/lib.rs b/lib/vrl/stdlib/src/lib.rs index 7ed158bebd19c2..f35df8376abe06 100644 --- a/lib/vrl/stdlib/src/lib.rs +++ b/lib/vrl/stdlib/src/lib.rs @@ -24,6 +24,8 @@ mod downcase; mod encode_base64; #[cfg(feature = "encode_json")] mod encode_json; +#[cfg(feature = "encode_key_value")] +mod encode_key_value; #[cfg(feature = "encode_logfmt")] mod encode_logfmt; #[cfg(feature = "ends_with")] @@ -237,6 +239,8 @@ pub use downcase::Downcase; pub use encode_base64::EncodeBase64; #[cfg(feature = "encode_json")] pub use encode_json::EncodeJson; +#[cfg(feature = "encode_key_value")] +pub use encode_key_value::EncodeKeyValue; #[cfg(feature = "encode_logfmt")] pub use encode_logfmt::EncodeLogfmt; #[cfg(feature = "ends_with")] @@ -436,6 +440,8 @@ pub fn all() -> Vec> { Box::new(EncodeBase64), #[cfg(feature = "encode_json")] Box::new(EncodeJson), + #[cfg(feature = "encode_key_value")] + Box::new(EncodeKeyValue), #[cfg(feature = "encode_logfmt")] Box::new(EncodeLogfmt), #[cfg(feature = "ends_with")]