Skip to content

Commit

Permalink
fix(journald source): Handle missing source timestamp (#2387)
Browse files Browse the repository at this point in the history
* fix(journald source): Handle missing source timestamp

If a source does not give journald a timestamp, the resulting record
will be missing the `_SOURCE_REALTIME_TIMESTAMP` field that this source
relies on. This patch adds a fallback to `__REALTIME_TIMESTAMP` if the
above is not available, which is always present.

Signed-off-by: Bruce Guenter <bruce@timber.io>
  • Loading branch information
bruceg authored and binarylogic committed Apr 29, 2020
1 parent 4d76e75 commit b1c8421
Showing 1 changed file with 30 additions and 4 deletions.
34 changes: 30 additions & 4 deletions src/sources/journald.rs
Expand Up @@ -33,7 +33,8 @@ lazy_static! {
static ref HOSTNAME: Atom = Atom::from("_HOSTNAME");
static ref MESSAGE: Atom = Atom::from("MESSAGE");
static ref SYSTEMD_UNIT: Atom = Atom::from("_SYSTEMD_UNIT");
static ref TIMESTAMP: Atom = Atom::from("_SOURCE_REALTIME_TIMESTAMP");
static ref SOURCE_TIMESTAMP: Atom = Atom::from("_SOURCE_REALTIME_TIMESTAMP");
static ref RECEIVED_TIMESTAMP: Atom = Atom::from("__REALTIME_TIMESTAMP");
static ref JOURNALCTL: PathBuf = "journalctl".into();
}

Expand Down Expand Up @@ -163,7 +164,10 @@ fn create_event(record: Record) -> Event {
log.insert(event::log_schema().host_key().clone(), host);
}
// Translate the timestamp, and so leave both old and new names.
if let Some(timestamp) = log.get(&TIMESTAMP) {
if let Some(timestamp) = log
.get(&SOURCE_TIMESTAMP)
.or_else(|| log.get(&RECEIVED_TIMESTAMP))
{
if let Value::Bytes(timestamp) = timestamp {
if let Ok(timestamp) = String::from_utf8_lossy(timestamp).parse::<u64>() {
let timestamp = chrono::Utc.timestamp(
Expand Down Expand Up @@ -459,6 +463,8 @@ mod tests {
const FAKE_JOURNAL: &str = r#"{"_SYSTEMD_UNIT":"sysinit.target","MESSAGE":"System Initialization","__CURSOR":"1","_SOURCE_REALTIME_TIMESTAMP":"1578529839140001"}
{"_SYSTEMD_UNIT":"unit.service","MESSAGE":"unit message","__CURSOR":"2","_SOURCE_REALTIME_TIMESTAMP":"1578529839140002"}
{"_SYSTEMD_UNIT":"badunit.service","MESSAGE":[194,191,72,101,108,108,111,63],"__CURSOR":"2","_SOURCE_REALTIME_TIMESTAMP":"1578529839140003"}
{"_SYSTEMD_UNIT":"stdout","MESSAGE":"Missing timestamp","__CURSOR":"3","__REALTIME_TIMESTAMP":"1578529839140004"}
{"_SYSTEMD_UNIT":"stdout","MESSAGE":"Different timestamps","__CURSOR":"4","_SOURCE_REALTIME_TIMESTAMP":"1578529839140005","__REALTIME_TIMESTAMP":"1578529839140004"}
"#;

struct FakeJournal {
Expand Down Expand Up @@ -527,7 +533,7 @@ mod tests {
#[test]
fn reads_journal() {
let received = run_journal(&[], None);
assert_eq!(received.len(), 3);
assert_eq!(received.len(), 5);
assert_eq!(
message(&received[0]),
Value::Bytes("System Initialization".into())
Expand All @@ -536,21 +542,25 @@ mod tests {
received[0].as_log()[event::log_schema().source_type_key()],
"journald".into()
);
assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140001000));
assert_eq!(message(&received[1]), Value::Bytes("unit message".into()));
assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140002000));
}

#[test]
fn filters_units() {
let received = run_journal(&["unit.service"], None);
assert_eq!(received.len(), 1);
assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140002000));
}

#[test]
fn handles_checkpoint() {
let received = run_journal(&[], Some("1"));
assert_eq!(received.len(), 2);
assert_eq!(received.len(), 4);
assert_eq!(message(&received[0]), Value::Bytes("unit message".into()));
assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140002000));
}

#[test]
Expand All @@ -560,7 +570,23 @@ mod tests {
assert_eq!(message(&received[0]), Value::Bytes("¿Hello?".into()));
}

#[test]
fn handles_missing_timestamp() {
let received = run_journal(&["stdout"], None);
assert_eq!(received.len(), 2);
assert_eq!(timestamp(&received[0]), value_ts(1578529839, 140004000));
assert_eq!(timestamp(&received[1]), value_ts(1578529839, 140005000));
}

fn message(event: &Event) -> Value {
event.as_log()[&event::log_schema().message_key()].clone()
}

fn timestamp(event: &Event) -> Value {
event.as_log()[&event::log_schema().timestamp_key()].clone()
}

fn value_ts(secs: i64, usecs: u32) -> Value {
Value::Timestamp(chrono::Utc.timestamp(secs, usecs))
}
}

0 comments on commit b1c8421

Please sign in to comment.