Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions codex-rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions codex-rs/core/src/config/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,19 +282,33 @@ pub enum OtelHttpProtocol {
Json,
}

#[derive(Deserialize, Debug, Clone, PartialEq, Default)]
#[serde(rename_all = "kebab-case")]
pub struct OtelTlsConfig {
pub ca_certificate: Option<PathBuf>,
pub client_certificate: Option<PathBuf>,
pub client_private_key: Option<PathBuf>,
}

/// Which OTEL exporter to use.
#[derive(Deserialize, Debug, Clone, PartialEq)]
#[serde(rename_all = "kebab-case")]
pub enum OtelExporterKind {
None,
OtlpHttp {
endpoint: String,
#[serde(default)]
headers: HashMap<String, String>,
protocol: OtelHttpProtocol,
#[serde(default)]
tls: Option<OtelTlsConfig>,
},
OtlpGrpc {
endpoint: String,
#[serde(default)]
headers: HashMap<String, String>,
#[serde(default)]
tls: Option<OtelTlsConfig>,
},
}

Expand Down
18 changes: 17 additions & 1 deletion codex-rs/core/src/otel_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::default_client::originator;
use codex_otel::config::OtelExporter;
use codex_otel::config::OtelHttpProtocol;
use codex_otel::config::OtelSettings;
use codex_otel::config::OtelTlsConfig as OtelTlsSettings;
use codex_otel::otel_provider::OtelProvider;
use std::error::Error;

Expand All @@ -21,6 +22,7 @@ pub fn build_provider(
endpoint,
headers,
protocol,
tls,
} => {
let protocol = match protocol {
Protocol::Json => OtelHttpProtocol::Json,
Expand All @@ -34,14 +36,28 @@ pub fn build_provider(
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
protocol,
tls: tls.as_ref().map(|config| OtelTlsSettings {
ca_certificate: config.ca_certificate.clone(),
client_certificate: config.client_certificate.clone(),
client_private_key: config.client_private_key.clone(),
}),
}
}
Kind::OtlpGrpc { endpoint, headers } => OtelExporter::OtlpGrpc {
Kind::OtlpGrpc {
endpoint,
headers,
tls,
} => OtelExporter::OtlpGrpc {
endpoint: endpoint.clone(),
headers: headers
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect(),
tls: tls.as_ref().map(|config| OtelTlsSettings {
ca_certificate: config.ca_certificate.clone(),
client_certificate: config.client_certificate.clone(),
client_private_key: config.client_private_key.clone(),
}),
},
};

Expand Down
10 changes: 9 additions & 1 deletion codex-rs/otel/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,18 +27,26 @@ opentelemetry-otlp = { workspace = true, features = [
"grpc-tonic",
"http-proto",
"http-json",
"logs",
"reqwest",
"reqwest-rustls",
"tls",
"tls-roots",
], optional = true }
opentelemetry-semantic-conventions = { workspace = true }
opentelemetry_sdk = { workspace = true, features = [
"logs",
"rt-tokio",
], optional = true }
http = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
strum_macros = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true, optional = true }
tonic = { workspace = true, optional = true, features = [
"transport",
"tls-native-roots",
"tls-ring",
] }
tracing = { workspace = true }
9 changes: 9 additions & 0 deletions codex-rs/otel/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,25 @@ pub enum OtelHttpProtocol {
Json,
}

#[derive(Clone, Debug, Default)]
pub struct OtelTlsConfig {
pub ca_certificate: Option<PathBuf>,
pub client_certificate: Option<PathBuf>,
pub client_private_key: Option<PathBuf>,
}

#[derive(Clone, Debug)]
pub enum OtelExporter {
None,
OtlpGrpc {
endpoint: String,
headers: HashMap<String, String>,
tls: Option<OtelTlsConfig>,
},
OtlpHttp {
endpoint: String,
headers: HashMap<String, String>,
protocol: OtelHttpProtocol,
tls: Option<OtelTlsConfig>,
},
}
179 changes: 173 additions & 6 deletions codex-rs/otel/src/otel_provider.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,37 @@
use crate::config::OtelExporter;
use crate::config::OtelHttpProtocol;
use crate::config::OtelSettings;
use crate::config::OtelTlsConfig;
use http::Uri;
use opentelemetry::KeyValue;
use opentelemetry_otlp::LogExporter;
use opentelemetry_otlp::OTEL_EXPORTER_OTLP_LOGS_TIMEOUT;
use opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT;
use opentelemetry_otlp::OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT;
use opentelemetry_otlp::Protocol;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_otlp::WithHttpConfig;
use opentelemetry_otlp::WithTonicConfig;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::logs::SdkLoggerProvider;
use opentelemetry_semantic_conventions as semconv;
use reqwest::Certificate as ReqwestCertificate;
use reqwest::Identity as ReqwestIdentity;
use reqwest::header::HeaderMap;
use reqwest::header::HeaderName;
use reqwest::header::HeaderValue;
use std::env;
use std::error::Error;
use std::fs;
use std::io::ErrorKind;
use std::io::{self};
use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
use tonic::metadata::MetadataMap;
use tonic::transport::Certificate as TonicCertificate;
use tonic::transport::ClientTlsConfig;
use tonic::transport::Identity as TonicIdentity;
use tracing::debug;

const ENV_ATTRIBUTE: &str = "env";
Expand Down Expand Up @@ -47,8 +64,12 @@ impl OtelProvider {
debug!("No exporter enabled in OTLP settings.");
return Ok(None);
}
OtelExporter::OtlpGrpc { endpoint, headers } => {
debug!("Using OTLP Grpc exporter: {}", endpoint);
OtelExporter::OtlpGrpc {
endpoint,
headers,
tls,
} => {
debug!("Using OTLP Grpc exporter: {endpoint}");

let mut header_map = HeaderMap::new();
for (key, value) in headers {
Expand All @@ -59,10 +80,25 @@ impl OtelProvider {
}
}

let base_tls_config = ClientTlsConfig::new()
.with_enabled_roots()
.assume_http2(true);

let tls_config = match tls.as_ref() {
Some(tls) => build_grpc_tls_config(
endpoint,
base_tls_config,
tls,
settings.codex_home.as_path(),
)?,
None => base_tls_config,
};

let exporter = LogExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.with_metadata(MetadataMap::from_headers(header_map))
.with_tls_config(tls_config)
.build()?;

builder = builder.with_batch_exporter(exporter);
Expand All @@ -71,20 +107,27 @@ impl OtelProvider {
endpoint,
headers,
protocol,
tls,
} => {
debug!("Using OTLP Http exporter: {}", endpoint);
debug!("Using OTLP Http exporter: {endpoint}");

let protocol = match protocol {
OtelHttpProtocol::Binary => Protocol::HttpBinary,
OtelHttpProtocol::Json => Protocol::HttpJson,
};

let exporter = LogExporter::builder()
let mut exporter_builder = LogExporter::builder()
.with_http()
.with_endpoint(endpoint)
.with_protocol(protocol)
.with_headers(headers.clone())
.build()?;
.with_headers(headers.clone());

if let Some(tls) = tls.as_ref() {
let client = build_http_client(tls, settings.codex_home.as_path())?;
exporter_builder = exporter_builder.with_http_client(client);
}

let exporter = exporter_builder.build()?;

builder = builder.with_batch_exporter(exporter);
}
Expand All @@ -101,3 +144,127 @@ impl Drop for OtelProvider {
let _ = self.logger.shutdown();
}
}

fn build_grpc_tls_config(
endpoint: &str,
tls_config: ClientTlsConfig,
tls: &OtelTlsConfig,
codex_home: &Path,
) -> Result<ClientTlsConfig, Box<dyn Error>> {
let uri: Uri = endpoint.parse()?;
let host = uri.host().ok_or_else(|| {
config_error(format!(
"OTLP gRPC endpoint {endpoint} does not include a host"
))
})?;

let mut config = tls_config.domain_name(host.to_owned());

if let Some(path) = tls.ca_certificate.as_ref() {
let (pem, _) = read_bytes(codex_home, path)?;
config = config.ca_certificate(TonicCertificate::from_pem(pem));
}

match (&tls.client_certificate, &tls.client_private_key) {
(Some(cert_path), Some(key_path)) => {
let (cert_pem, _) = read_bytes(codex_home, cert_path)?;
let (key_pem, _) = read_bytes(codex_home, key_path)?;
config = config.identity(TonicIdentity::from_pem(cert_pem, key_pem));
}
(Some(_), None) | (None, Some(_)) => {
return Err(config_error(
"client_certificate and client_private_key must both be provided for mTLS",
));
}
(None, None) => {}
}

Ok(config)
}

fn build_http_client(
tls: &OtelTlsConfig,
codex_home: &Path,
) -> Result<reqwest::Client, Box<dyn Error>> {
let mut builder =
reqwest::Client::builder().timeout(resolve_otlp_timeout(OTEL_EXPORTER_OTLP_LOGS_TIMEOUT));

if let Some(path) = tls.ca_certificate.as_ref() {
let (pem, location) = read_bytes(codex_home, path)?;
let certificate = ReqwestCertificate::from_pem(pem.as_slice()).map_err(|error| {
config_error(format!(
"failed to parse certificate {}: {error}",
location.display()
))
})?;
builder = builder.add_root_certificate(certificate);
}

match (&tls.client_certificate, &tls.client_private_key) {
(Some(cert_path), Some(key_path)) => {
let (mut cert_pem, cert_location) = read_bytes(codex_home, cert_path)?;
let (key_pem, key_location) = read_bytes(codex_home, key_path)?;
cert_pem.extend_from_slice(key_pem.as_slice());
let identity = ReqwestIdentity::from_pem(cert_pem.as_slice()).map_err(|error| {
config_error(format!(
"failed to parse client identity using {} and {}: {error}",
cert_location.display(),
key_location.display()
))
})?;
builder = builder.identity(identity);
}
(Some(_), None) | (None, Some(_)) => {
return Err(config_error(
"client_certificate and client_private_key must both be provided for mTLS",
));
}
(None, None) => {}
}

builder
.build()
.map_err(|error| Box::new(error) as Box<dyn Error>)
}

fn resolve_otlp_timeout(signal_var: &str) -> Duration {
if let Some(timeout) = read_timeout_env(signal_var) {
return timeout;
}
if let Some(timeout) = read_timeout_env(OTEL_EXPORTER_OTLP_TIMEOUT) {
return timeout;
}
OTEL_EXPORTER_OTLP_TIMEOUT_DEFAULT
}

fn read_timeout_env(var: &str) -> Option<Duration> {
let value = env::var(var).ok()?;
let parsed = value.parse::<i64>().ok()?;
if parsed < 0 {
return None;
}
Some(Duration::from_millis(parsed as u64))
}

fn read_bytes(base: &Path, provided: &PathBuf) -> Result<(Vec<u8>, PathBuf), Box<dyn Error>> {
let resolved = resolve_config_path(base, provided);
match fs::read(&resolved) {
Ok(bytes) => Ok((bytes, resolved)),
Err(error) => Err(Box::new(io::Error::new(
error.kind(),
format!("failed to read {}: {error}", resolved.display()),
))),
}
}

fn resolve_config_path(base: &Path, provided: &PathBuf) -> PathBuf {
if provided.is_absolute() {
provided.clone()
} else {
base.join(provided)
}
}

fn config_error(message: impl Into<String>) -> Box<dyn Error> {
Box::new(io::Error::new(ErrorKind::InvalidData, message.into()))
}
Loading
Loading