Skip to content

Commit

Permalink
add new logging and tracing code for pod-init (#186)
Browse files Browse the repository at this point in the history
* Add in tracing code to pod-init

* adding telemetry code
  • Loading branch information
nhudson authored Aug 3, 2023
1 parent b99b1dd commit dbda757
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 73 deletions.
13 changes: 9 additions & 4 deletions tembo-pod-init/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ name = "tembo-pod-init"
version = "0.1.0"
edition = "2021"

[registries.crates-io]
protocol = "sparse"

[[bin]]
doc = false
name = "tembo-pod-init"
Expand All @@ -15,9 +12,13 @@ path = "src/main.rs"
name = "tembo_pod_init"
path = "src/lib.rs"

[features]
default = []
telemetry = ["tonic", "opentelemetry-otlp"]

[dependencies]
actix-web = { version = "4.3", features = ["openssl"] }
tokio = { version = "1.28", features = ["rt"] }
tokio = { version = "1.29", features = ["rt"] }
serde = { version = "1.0", features = ["derive"]}
k8s-openapi = { version = "0.18.0", features = ["v1_25", "schemars"], default-features = false }
serde_json = "1.0"
Expand All @@ -29,6 +30,10 @@ tracing-log = "0.1"
tracing-subscriber = { version = "0.3", features = ["fmt", "env-filter"]}
openssl = { version = "0.10", features = ["vendored"] }
controller = {path = "../tembo-operator", package = "controller"}
tracing-opentelemetry = "0.19.0"
opentelemetry = { version = "0.19.0", features = ["trace", "rt-tokio"] }
opentelemetry-otlp = { version = "0.12.0", features = ["tokio"], optional = true }
tonic = { version = "0.8.0", optional = true } # 0.9 blocked on opentelemetry-otlp release

[dependencies.kube]
features = ["admission","runtime", "client", "derive", "ws"]
Expand Down
2 changes: 1 addition & 1 deletion tembo-pod-init/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ COPY tembo-operator ./tembo-operator
WORKDIR /tembo-pod-init
COPY Cargo.toml .
COPY src/ ./src/
RUN cargo build --release
RUN cargo build --release --features=telemetry

FROM debian:bookworm-slim
RUN set -eux; \
Expand Down
4 changes: 4 additions & 0 deletions tembo-pod-init/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub struct Config {
pub init_container_name: String,
pub tls_cert: String,
pub tls_key: String,
pub opentelemetry_endpoint_url: String,
}

impl Default for Config {
Expand Down Expand Up @@ -41,6 +42,9 @@ impl Default for Config {
tls_key: from_env_or_default("TLS_KEY", "/certs/tls.key")
.parse()
.unwrap(),
opentelemetry_endpoint_url: from_env_or_default("OPENTELEMETRY_ENDPOINT_URL", "")
.parse()
.unwrap(),
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion tembo-pod-init/src/container.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use controller::cloudnativepg::clusters::Cluster;
use k8s_openapi::api::core::v1::{Capabilities, Container, SecurityContext, VolumeMount};
use kube::{Api, Client};
use tracing::debug;
use tracing::*;

use crate::config::Config;

// Create a Container object that will be injected into the Pod
#[instrument(skip(client))]
pub async fn create_init_container(
config: &Config,
client: &Client,
Expand Down
2 changes: 1 addition & 1 deletion tembo-pod-init/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
pub mod config;
pub mod container;
pub mod health;
pub mod log;
pub mod mutate;
pub mod telemetry;
pub mod watcher;
24 changes: 0 additions & 24 deletions tembo-pod-init/src/log.rs

This file was deleted.

65 changes: 26 additions & 39 deletions tembo-pod-init/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,22 @@ use kube::Client;
use openssl::ssl::{SslAcceptor, SslFiletype, SslMethod};
use parking_lot::Mutex;
use std::sync::Arc;
use tembo_pod_init::{config::Config, health::*, log, mutate::mutate, watcher::NamespaceWatcher};

#[macro_use]
extern crate tracing;
use tembo_pod_init::{
config::Config, health::*, mutate::mutate, telemetry, watcher::NamespaceWatcher,
};
use tracing::*;

#[instrument(fields(trace_id))]
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let config = Config::default();

// Initialize logging
if let Err(e) = log::init(&config) {
error!("Failed to initialize logging: {}", e);
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Logger initialization failed",
));
}
telemetry::init(&config).await;

// Set trace_id for logging
let trace_id = telemetry::get_trace_id();
Span::current().record("trace_id", &field::display(&trace_id));

let stop_handle = web::Data::new(StopHandle::default());

Expand All @@ -34,34 +33,7 @@ async fn main() -> std::io::Result<()> {
// Start watching namespaces in a seperate tokio task thread
let watcher = NamespaceWatcher::new(Arc::new(kube_client.clone()), config.clone());
let namespaces = watcher.get_namespaces();
tokio::spawn(async move {
loop {
match watcher.watch().await {
Ok(_) => {
info!("Namespace watcher finished, restarting.");
}
Err(e) => {
error!("Namespace watcher failed, restarting: {}", e);
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}
}
});

// Print out the namespaces we are currently watching every 10 seconds
//if tracing::Level::DEBUG >= *tracing::level_filters::RECORDED_LEVEL {
// let debug_namespaces = Arc::clone(&namespaces);
// tokio::spawn(async move {
// loop {
// let stored_namespaces = debug_namespaces.read().await;
// debug!(
// "Namespaces currently being tracked: {:?}",
// *stored_namespaces
// );
// tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; // adjust the delay as needed
// }
// });
//}
tokio::spawn(watch_namespaces(watcher));

// Load the TLS certificate and key
let mut tls_config = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
Expand Down Expand Up @@ -121,3 +93,18 @@ impl StopHandle {
*self.inner.lock() = Some(handle);
}
}

#[instrument(skip(watcher))]
async fn watch_namespaces(watcher: NamespaceWatcher) {
loop {
match watcher.watch().await {
Ok(_) => {
info!("Namespace watcher finished, restarting.");
}
Err(e) => {
error!("Namespace watcher failed, restarting: {}", e);
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
}
}
}
5 changes: 3 additions & 2 deletions tembo-pod-init/src/mutate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ use serde_json::json;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, error};
use tracing::*;

use crate::{config::Config, container::*};

#[instrument(skip(client), fields(trace_id))]
#[post("/mutate")]
async fn mutate(
body: web::Json<AdmissionReview<Pod>>,
Expand Down Expand Up @@ -185,7 +186,7 @@ async fn mutate(

add_volume_mounts(postgres_container, volume_mount);
} else {
error!("Postgres container not found");
warn!("Postgres container not found");
}
}

Expand Down
95 changes: 95 additions & 0 deletions tembo-pod-init/src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use crate::config::Config;
use opentelemetry::trace::TraceId;
use tracing_subscriber::{prelude::*, EnvFilter, Registry};

#[cfg(feature = "telemetry")]
use opentelemetry::{
global,
sdk::{propagation::TraceContextPropagator, trace, trace::Sampler, Resource},
KeyValue,
};

/// Fetch an opentelemetry::trace::TraceId as hex through the full tracing stack
pub fn get_trace_id() -> TraceId {
use opentelemetry::trace::TraceContextExt as _; // opentelemetry::Context -> opentelemetry::trace::Span
use tracing_opentelemetry::OpenTelemetrySpanExt as _; // tracing::Span to opentelemetry::Context

tracing::Span::current()
.context()
.span()
.span_context()
.trace_id()
}

#[cfg(feature = "telemetry")]
async fn init_tracer(config: &Config) -> opentelemetry::sdk::trace::Tracer {
global::set_text_map_propagator(TraceContextPropagator::new());
let otlp_endpoint = &config.opentelemetry_endpoint_url;

if otlp_endpoint.is_empty() {
panic!("OPENTELEMETRY_ENDPOINT_URL is not set");
}

let channel = tonic::transport::Channel::from_shared(otlp_endpoint.to_string())
.unwrap()
.connect()
.await
.unwrap();

opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_channel(channel),
)
.with_trace_config(
trace::config()
.with_resource(Resource::new(vec![KeyValue::new(
"service.name",
"tembo-pod-init",
)]))
.with_sampler(Sampler::AlwaysOn),
)
.install_batch(opentelemetry::runtime::Tokio)
.unwrap()
}

/// Initialize tracing
pub async fn init(config: &Config) {
// Setup tracing layers
#[cfg(feature = "telemetry")]
let telemetry = tracing_opentelemetry::layer().with_tracer(init_tracer(config).await);
let logger = tracing_subscriber::fmt::layer().compact();
let env_filter = EnvFilter::new(&config.log_level);

// Decide on layers
#[cfg(feature = "telemetry")]
let collector = Registry::default()
.with(telemetry)
.with(logger)
.with(env_filter);
#[cfg(not(feature = "telemetry"))]
let collector = Registry::default().with(logger).with(env_filter);

// Initialize tracing
tracing::subscriber::set_global_default(collector).unwrap();
}

#[cfg(test)]
mod test {
// This test only works when telemetry is initialized fully
// and requires OPENTELEMETRY_ENDPOINT_URL pointing to a valid server
#[cfg(feature = "telemetry")]
#[tokio::test]
#[ignore = "requires a trace exporter"]
async fn get_trace_id_returns_valid_traces() {
use super::*;
super::init().await;
#[tracing::instrument(name = "test_span")] // need to be in an instrumented fn
fn test_trace_id() -> TraceId {
get_trace_id()
}
assert_ne!(test_trace_id(), TraceId::INVALID, "valid trace");
}
}
3 changes: 2 additions & 1 deletion tembo-pod-init/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use kube::Client;
use std::collections::HashSet;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::debug;
use tracing::*;

use crate::config::Config;

Expand All @@ -24,6 +24,7 @@ impl NamespaceWatcher {
}
}

#[instrument(skip(self), fields(trace_id))]
pub async fn watch(&self) -> Result<(), kube::Error> {
let namespaces = self.namespaces.clone();
let client = self.client.clone();
Expand Down

0 comments on commit dbda757

Please sign in to comment.