Skip to content

Commit

Permalink
enhancement(kubernetes_logs source) Add PodIPs into Pod Metadata events
Browse files Browse the repository at this point in the history
Signed-off-by: Ian Henry <ianjhenry00@gmail.com>
  • Loading branch information
eeyun committed Nov 5, 2020
1 parent eac3dba commit ede8ca7
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 4 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Expand Up @@ -178,7 +178,7 @@ pulsar = { version = "1.0.0", default-features = false, features = ["tokio-runti
task-compat = "0.1"
cidr-utils = "0.4.2"
pin-project = "1.0.1"
k8s-openapi = { version = "0.9", features = ["v1_15"], optional = true }
k8s-openapi = { version = "0.9", features = ["v1_16"], optional = true }
portpicker = "0.1.0"
sha-1 = "0.9"
sha2 = "0.9"
Expand Down
2 changes: 1 addition & 1 deletion lib/k8s-e2e-tests/Cargo.toml
Expand Up @@ -7,7 +7,7 @@ description = "End-to-end tests of Vector in the Kubernetes environment"

[dependencies]
futures = "0.3"
k8s-openapi = { version = "0.9", default-features = false, features = ["v1_15"] }
k8s-openapi = { version = "0.9", default-features = false, features = ["v1_16"] }
k8s-test-framework = { version = "0.1", path = "../k8s-test-framework" }
serde_json = "1"
tokio = { version = "0.2", features = ["macros", "rt-threaded", "time"] }
Expand Down
2 changes: 1 addition & 1 deletion lib/k8s-test-framework/Cargo.toml
Expand Up @@ -6,7 +6,7 @@ edition = "2018"
description = "Kubernetes Test Framework used to test Vector in Kubernetes"

[dependencies]
k8s-openapi = { version = "0.9", default-features = false, features = ["v1_15"] }
k8s-openapi = { version = "0.9", default-features = false, features = ["v1_16"] }
serde_json = "1"
tempfile = "3"
once_cell = "1"
Expand Down
79 changes: 78 additions & 1 deletion src/sources/kubernetes_logs/pod_metadata_annotator.rs
Expand Up @@ -9,8 +9,9 @@ use crate::{
};
use evmap::ReadHandle;
use k8s_openapi::{
api::core::v1::{Container, Pod, PodSpec},
api::core::v1::{Container, Pod, PodSpec, PodStatus},
apimachinery::pkg::apis::meta::v1::ObjectMeta,
k8s_if_ge_1_15, k8s_if_le_1_16,
};
use serde::{Deserialize, Serialize};

Expand All @@ -20,6 +21,7 @@ pub struct FieldsSpec {
pub pod_name: String,
pub pod_namespace: String,
pub pod_uid: String,
pub pod_ips: String,
pub pod_labels: String,
pub pod_node_name: String,
pub container_name: String,
Expand All @@ -32,6 +34,7 @@ impl Default for FieldsSpec {
pod_name: "kubernetes.pod_name".to_owned(),
pod_namespace: "kubernetes.pod_namespace".to_owned(),
pod_uid: "kubernetes.pod_uid".to_owned(),
pod_ips: "kubernetes.pod_ips".to_owned(),
pod_labels: "kubernetes.pod_labels".to_owned(),
pod_node_name: "kubernetes.pod_node_name".to_owned(),
container_name: "kubernetes.container_name".to_owned(),
Expand Down Expand Up @@ -72,6 +75,9 @@ impl PodMetadataAnnotator {

annotate_from_file_info(log, &self.fields_spec, &file_info);
annotate_from_metadata(log, &self.fields_spec, &pod.metadata);
if let Some(ref pod_status) = pod.status {
annotate_from_pod_status(log, &self.fields_spec, pod_status);
}
if let Some(ref pod_spec) = pod.spec {
annotate_from_pod_spec(log, &self.fields_spec, pod_spec);

Expand Down Expand Up @@ -130,6 +136,24 @@ fn annotate_from_pod_spec(log: &mut LogEvent, fields_spec: &FieldsSpec, pod_spec
}
}

fn annotate_from_pod_status(log: &mut LogEvent, fields_spec: &FieldsSpec, pod_status: &PodStatus) {
k8s_if_le_1_16! {
for (ref key, ref val) in [(&fields_spec.pod_ips, &pod_status.pod_ip)].iter() {
if let Some(val) = val {
log.insert(key, val.to_owned());
}
}
}
k8s_if_ge_1_15! {
for (ref key, ref val) in [(&fields_spec.pod_ips, &pod_status.pod_ips)].iter() {
if let Some(val) = val {
let inner: Vec<String> = val.iter().filter_map(|v| v.ip.clone()).collect::<Vec<String>>();
log.insert(key, format!("{:?}", inner));
}
}
}
}

fn annotate_from_container(log: &mut LogEvent, fields_spec: &FieldsSpec, container: &Container) {
for (ref key, ref val) in [(&fields_spec.container_image, &container.image)].iter() {
if let Some(val) = val {
Expand All @@ -141,6 +165,7 @@ fn annotate_from_container(log: &mut LogEvent, fields_spec: &FieldsSpec, contain
#[cfg(test)]
mod tests {
use super::*;
use k8s_openapi::api::core::v1::PodIP;

#[test]
fn test_annotate_from_metadata() {
Expand Down Expand Up @@ -323,6 +348,58 @@ mod tests {
}
}

#[test]
fn test_annotate_from_pod_status() {
let cases = vec![
(
FieldsSpec::default(),
PodStatus::default(),
LogEvent::default(),
),
(
FieldsSpec::default(),
PodStatus {
pod_ip: Some("192.168.1.2".to_owned()),
..Default::default()
},
{
let mut log = LogEvent::default();
log.insert("kubernetes.pod_ips", "192.168.1.2");
log
},
),
(
FieldsSpec {
pod_node_name: "node_name".to_owned(),
..Default::default()
},
PodStatus {
pod_ip: Some("192.168.1.2".to_owned()),
pod_ips: Some(vec![
PodIP {
ip: Some("192.168.1.2".to_owned()),
},
PodIP {
ip: Some("192.168.1.3".to_owned()),
},
]),
..Default::default()
},
{
let mut log = LogEvent::default();
log.insert("kubernetes.pod_ips", "[\"192.168.1.2\", \"192.168.1.3\"]");
log
},
),
];

for (fields_spec, pod_status, expected) in cases.into_iter() {
let mut log = LogEvent::default();
annotate_from_pod_status(&mut log, &fields_spec, &pod_status);
assert_eq!(log, expected);
}
}

#[test]
fn test_annotate_from_container() {
let cases = vec![
Expand Down

0 comments on commit ede8ca7

Please sign in to comment.