Skip to content

Commit

Permalink
bug(lua transform): Emit events with the source_id set (#17870)
Browse files Browse the repository at this point in the history
By default, the `lua` transform creates output events with no source id.
This is, however, the only component that does that. This change causes
all output events to have a source ID:

1. All events resulting from processing an event (either the default v1
path or the v2 `process` hook) will copy the source ID from the incoming
event.
2. All other events will set the source ID to the fixed string `lua`.
  • Loading branch information
bruceg committed Jul 6, 2023
1 parent 1260c83 commit bc1b83a
Show file tree
Hide file tree
Showing 5 changed files with 190 additions and 193 deletions.
11 changes: 2 additions & 9 deletions build.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,4 @@
use std::{
collections::HashSet,
env,
fs::File,
io::Write,
path::{Path, PathBuf},
process::Command,
};
use std::{collections::HashSet, env, fs::File, io::Write, path::Path, process::Command};

struct TrackedEnv {
tracked: HashSet<String>,
Expand Down Expand Up @@ -137,7 +130,7 @@ fn main() {
// in a type-safe way, which is necessary for incrementally building certain payloads, like
// the ones generated in the `datadog_metrics` sink.
let protobuf_fds_path =
PathBuf::from(std::env::var("OUT_DIR").expect("OUT_DIR environment variable not set"))
Path::new(&std::env::var("OUT_DIR").expect("OUT_DIR environment variable not set"))
.join("protobuf-fds.bin");

let mut prost_build = prost_build::Config::new();
Expand Down
10 changes: 7 additions & 3 deletions src/transforms/lua/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ pub mod v1;
pub mod v2;

use vector_config::configurable_component;
use vector_core::config::LogNamespace;
use vector_core::config::{ComponentKey, LogNamespace};

use crate::{
config::{GenerateConfig, Input, OutputId, TransformConfig, TransformContext, TransformOutput},
Expand Down Expand Up @@ -89,10 +89,14 @@ impl GenerateConfig for LuaConfig {
#[async_trait::async_trait]
#[typetag::serde(name = "lua")]
impl TransformConfig for LuaConfig {
async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
let key = context
.key
.as_ref()
.map_or_else(|| ComponentKey::from("lua"), Clone::clone);
match self {
LuaConfig::V1(v1) => v1.config.build(),
LuaConfig::V2(v2) => v2.config.build(),
LuaConfig::V2(v2) => v2.config.build(key),
}
}

Expand Down
160 changes: 74 additions & 86 deletions src/transforms/lua/v1/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ impl Lua {
}

fn process(&mut self, event: Event) -> Result<Option<Event>, mlua::Error> {
let source_id = event.source_id().cloned();
let lua = &self.lua;
let globals = lua.globals();

Expand All @@ -156,7 +157,15 @@ impl Lua {

let result = globals
.raw_get::<_, Option<LuaEvent>>("event")
.map(|option| option.map(|lua_event| lua_event.inner));
.map(|option| {
option.map(|lua_event| {
let mut event = lua_event.inner;
if let Some(source_id) = source_id {
event.set_source_id(source_id);
}
event
})
});

self.invocations_after_gc += 1;
if self.invocations_after_gc % GC_INTERVAL == 0 {
Expand Down Expand Up @@ -302,167 +311,130 @@ pub fn format_error(error: &mlua::Error) -> String {

#[cfg(test)]
mod tests {
use std::sync::Arc;

use super::*;
use crate::event::{Event, LogEvent, Value};
use crate::{config::ComponentKey, test_util};

#[test]
fn lua_add_field() {
crate::test_util::trace_init();
let mut transform = Lua::new(
let event = transform_one(
r#"
event["hello"] = "goodbye"
"#
.to_string(),
vec![],
"#,
LogEvent::from("program me"),
)
.unwrap();

let event = Event::Log(LogEvent::from("program me"));

let event = transform.transform_one(event).unwrap();

assert_eq!(event.as_log()["hello"], "goodbye".into());
}

#[test]
fn lua_read_field() {
crate::test_util::trace_init();
let mut transform = Lua::new(
let event = transform_one(
r#"
_, _, name = string.find(event["message"], "Hello, my name is (%a+).")
event["name"] = name
"#
.to_string(),
vec![],
"#,
LogEvent::from("Hello, my name is Bob."),
)
.unwrap();

let event = Event::Log(LogEvent::from("Hello, my name is Bob."));

let event = transform.transform_one(event).unwrap();

assert_eq!(event.as_log()["name"], "Bob".into());
}

#[test]
fn lua_remove_field() {
crate::test_util::trace_init();
let mut transform = Lua::new(
let mut log = LogEvent::default();
log.insert("name", "Bob");
let event = transform_one(
r#"
event["name"] = nil
"#
.to_string(),
vec![],
"#,
log,
)
.unwrap();

let mut log = LogEvent::default();
log.insert("name", "Bob");
let event = transform.transform_one(log.into()).unwrap();

assert!(event.as_log().get("name").is_none());
}

#[test]
fn lua_drop_event() {
let mut transform = Lua::new(
r#"
event = nil
"#
.to_string(),
vec![],
)
.unwrap();

let mut log = LogEvent::default();
log.insert("name", "Bob");
let event = transform.transform_one(log.into());
let event = transform_one(
r#"
event = nil
"#,
log,
);

assert!(event.is_none());
}

#[test]
fn lua_read_empty_field() {
crate::test_util::trace_init();
let mut transform = Lua::new(
let event = transform_one(
r#"
if event["non-existant"] == nil then
event["result"] = "empty"
else
event["result"] = "found"
end
"#
.to_string(),
vec![],
"#,
LogEvent::default(),
)
.unwrap();

let event = transform.transform_one(LogEvent::default().into()).unwrap();

assert_eq!(event.as_log()["result"], "empty".into());
}

#[test]
fn lua_integer_value() {
crate::test_util::trace_init();
let mut transform = Lua::new(
let event = transform_one(
r#"
event["number"] = 3
"#
.to_string(),
vec![],
"#,
LogEvent::default(),
)
.unwrap();

let event = transform.transform_one(LogEvent::default().into()).unwrap();
assert_eq!(event.as_log()["number"], Value::Integer(3));
}

#[test]
fn lua_numeric_value() {
crate::test_util::trace_init();
let mut transform = Lua::new(
let event = transform_one(
r#"
event["number"] = 3.14159
"#
.to_string(),
vec![],
"#,
LogEvent::default(),
)
.unwrap();

let event = transform.transform_one(LogEvent::default().into()).unwrap();
assert_eq!(event.as_log()["number"], Value::from(3.14159));
}

#[test]
fn lua_boolean_value() {
crate::test_util::trace_init();
let mut transform = Lua::new(
let event = transform_one(
r#"
event["bool"] = true
"#
.to_string(),
vec![],
"#,
LogEvent::default(),
)
.unwrap();

let event = transform.transform_one(LogEvent::default().into()).unwrap();
assert_eq!(event.as_log()["bool"], Value::Boolean(true));
}

#[test]
fn lua_non_coercible_value() {
crate::test_util::trace_init();
let mut transform = Lua::new(
let event = transform_one(
r#"
event["junk"] = {"asdf"}
"#
.to_string(),
vec![],
"#,
LogEvent::default(),
)
.unwrap();

let event = transform.transform_one(LogEvent::default().into()).unwrap();
assert_eq!(event.as_log().get("junk"), None);
}

Expand Down Expand Up @@ -573,32 +545,48 @@ mod tests {

let mut transform =
Lua::new(source, vec![dir.path().to_string_lossy().into_owned()]).unwrap();
let event = LogEvent::default().into();
let event = transform.transform_one(event).unwrap();
let event = transform.transform_one(LogEvent::default().into()).unwrap();
assert_eq!(event.as_log()["\"new field\""], "new value".into());
}

#[test]
fn lua_pairs() {
crate::test_util::trace_init();
let mut transform = Lua::new(
let mut event = LogEvent::default();
event.insert("name", "Bob");
event.insert("friend", "Alice");

let event = transform_one(
r#"
for k,v in pairs(event) do
event[k] = k .. v
end
"#
.to_string(),
vec![],
"#,
event,
)
.unwrap();

let mut event = LogEvent::default();
event.insert("name", "Bob");
event.insert("friend", "Alice");

let event = transform.transform_one(event.into()).unwrap();

assert_eq!(event.as_log()["name"], "nameBob".into());
assert_eq!(event.as_log()["friend"], "friendAlice".into());
}

fn transform_one(transform: &str, event: impl Into<Event>) -> Option<Event> {
crate::test_util::trace_init();

let source = source_id();
let mut event = event.into();
event.set_source_id(Arc::clone(&source));

let mut transform = Lua::new(transform.to_string(), vec![]).unwrap();
let event = transform.transform_one(event);

if let Some(event) = &event {
assert_eq!(event.source_id(), Some(&source));
}

event
}

fn source_id() -> Arc<ComponentKey> {
Arc::new(ComponentKey::from(test_util::random_string(16)))
}
}

0 comments on commit bc1b83a

Please sign in to comment.