From 3e840149f59769e34f57522d1193b2aca926d1f8 Mon Sep 17 00:00:00 2001 From: streamkit-devin Date: Sat, 30 May 2026 16:19:02 +0000 Subject: [PATCH 1/5] feat(oneshot): add bounded service label to oneshot_pipeline.duration Source a bounded `service` label ({tts,stt,other}) from the trusted X-StreamKit-Service request header so TTS, STT, and other oneshot requests are distinguishable without leaking arbitrary user-submitted pipeline names into metric cardinality. Signed-off-by: streamkit-devin --- apps/skit/src/server/oneshot.rs | 54 +++++++++++++++++++++++++++++---- 1 file changed, 48 insertions(+), 6 deletions(-) diff --git a/apps/skit/src/server/oneshot.rs b/apps/skit/src/server/oneshot.rs index eb3f657df..1b4a2f6cc 100644 --- a/apps/skit/src/server/oneshot.rs +++ b/apps/skit/src/server/oneshot.rs @@ -43,6 +43,20 @@ struct HttpInputBinding { required: bool, } +/// Trusted header naming the service behind a oneshot request. +const SERVICE_HEADER: &str = "X-StreamKit-Service"; + +/// Map the service header to a bounded label so user-submitted pipeline names +/// (arbitrary, high-cardinality) never reach metrics. Anything outside the +/// allowlist — including a missing header — collapses to `other`. +fn classify_service(header: Option<&str>) -> &'static str { + match header.map(|h| h.trim().to_ascii_lowercase()).as_deref() { + Some("tts") => "tts", + Some("stt") => "stt", + _ => "other", + } +} + /// Extract content-type header and multipart boundary from request headers. fn extract_multipart_boundary(headers: &HeaderMap) -> Result { let ct_header = headers @@ -399,6 +413,7 @@ fn build_streaming_response( pipeline_result: streamkit_engine::OneshotPipelineResult, start_time: Instant, duration_histogram: opentelemetry::metrics::Histogram, + service: &'static str, ) -> Response { tracing::debug!( "Creating streaming response with content type: {}", @@ -406,7 +421,7 @@ fn build_streaming_response( ); let stream = ReceiverStream::new(pipeline_result.data_stream).map(Ok::<_, Infallible>); - let stream = InstrumentedOneshotStream::new(stream, start_time, duration_histogram); + let stream = InstrumentedOneshotStream::new(stream, start_time, duration_histogram, service); let body = Body::from_stream(stream); let mut headers = HeaderMap::new(); @@ -436,6 +451,7 @@ struct InstrumentedOneshotStream { start_time: Instant, recorded: bool, duration_histogram: opentelemetry::metrics::Histogram, + service: &'static str, } impl InstrumentedOneshotStream { @@ -443,8 +459,9 @@ impl InstrumentedOneshotStream { inner: S, start_time: Instant, duration_histogram: opentelemetry::metrics::Histogram, + service: &'static str, ) -> Self { - Self { inner, start_time, recorded: false, duration_histogram } + Self { inner, start_time, recorded: false, duration_histogram, service } } fn record(&mut self, status: &'static str) { @@ -452,7 +469,7 @@ impl InstrumentedOneshotStream { return; } self.recorded = true; - let labels = [KeyValue::new("status", status)]; + let labels = [KeyValue::new("status", status), KeyValue::new("service", self.service)]; self.duration_histogram.record(self.start_time.elapsed().as_secs_f64(), &labels); } } @@ -492,6 +509,7 @@ pub(super) async fn process_oneshot_pipeline_handler( tracing::info!("Processing multipart request"); let headers = req.headers().clone(); + let service = classify_service(headers.get(SERVICE_HEADER).and_then(|v| v.to_str().ok())); let (role_name, perms) = crate::role_extractor::get_role_and_permissions(&headers, &app_state); if !perms.create_sessions { return Err(AppError::Forbidden( @@ -652,7 +670,7 @@ pub(super) async fn process_oneshot_pipeline_handler( result }, Err(e) => { - let labels = [KeyValue::new("status", "error")]; + let labels = [KeyValue::new("status", "error"), KeyValue::new("service", service)]; oneshot_duration_histogram.record(oneshot_start_time.elapsed().as_secs_f64(), &labels); cancel_token.cancel(); return Err(e.into()); @@ -662,7 +680,7 @@ pub(super) async fn process_oneshot_pipeline_handler( match parse_done_rx.await { Ok(Ok(())) => {}, Ok(Err(err)) => { - let labels = [KeyValue::new("status", "error")]; + let labels = [KeyValue::new("status", "error"), KeyValue::new("service", service)]; oneshot_duration_histogram.record(oneshot_start_time.elapsed().as_secs_f64(), &labels); cancel_token.cancel(); return Err(err); @@ -674,7 +692,12 @@ pub(super) async fn process_oneshot_pipeline_handler( } let _ = routing_task.await; - Ok(build_streaming_response(pipeline_result, oneshot_start_time, oneshot_duration_histogram)) + Ok(build_streaming_response( + pipeline_result, + oneshot_start_time, + oneshot_duration_histogram, + service, + )) } #[cfg(test)] @@ -767,6 +790,25 @@ mod tests { assert!(msg.to_lowercase().contains("multipart"), "msg should mention multipart: {msg}"); } + #[test] + fn classify_service_recognizes_allowlist() { + assert_eq!(classify_service(Some("tts")), "tts"); + assert_eq!(classify_service(Some("stt")), "stt"); + } + + #[test] + fn classify_service_normalizes_case_and_whitespace() { + assert_eq!(classify_service(Some(" TTS ")), "tts"); + assert_eq!(classify_service(Some("Stt")), "stt"); + } + + #[test] + fn classify_service_unknown_and_absent_fall_back_to_other() { + assert_eq!(classify_service(Some("kokoro")), "other"); + assert_eq!(classify_service(Some("")), "other"); + assert_eq!(classify_service(None), "other"); + } + #[test] fn boundary_extracted_from_simple_content_type() { let value = From b08ba47b19a3601d3590e66968a91484c9980df3 Mon Sep 17 00:00:00 2001 From: streamkit-devin Date: Sat, 30 May 2026 16:40:43 +0000 Subject: [PATCH 2/5] refactor(metrics): make request-metric service labels config-driven Replace the hardcoded {tts,stt,other} allowlist with a general, operator-configurable [server.metrics.request_labels] facility: each label is sourced from a trusted request header, bounded by a configured allowlist, with a fallback (default "other"). A reusable resolver applies it to both oneshot_pipeline.duration and the shared http.server.* request metrics, so new dimensions need only a config edit, not a recompile. Defaults preserve the {tts,stt,other} service label. Signed-off-by: streamkit-devin --- apps/skit/src/config.rs | 54 ++++++++++++++++++ apps/skit/src/lib.rs | 1 + apps/skit/src/main.rs | 1 + apps/skit/src/metrics_labels.rs | 97 +++++++++++++++++++++++++++++++++ apps/skit/src/server/mod.rs | 15 ++++- apps/skit/src/server/oneshot.rs | 61 +++++++-------------- 6 files changed, 185 insertions(+), 44 deletions(-) create mode 100644 apps/skit/src/metrics_labels.rs diff --git a/apps/skit/src/config.rs b/apps/skit/src/config.rs index c4944b2ad..fa56ca580 100644 --- a/apps/skit/src/config.rs +++ b/apps/skit/src/config.rs @@ -231,6 +231,56 @@ impl Default for CorsConfig { } } +fn default_label_fallback() -> String { + "other".to_string() +} + +fn default_request_labels() -> Vec { + // Ships the `service` dimension oneshot dashboards build against; operators + // can extend or replace it without a recompile. + vec![RequestLabelConfig { + name: "service".to_string(), + header: "X-StreamKit-Service".to_string(), + allowed: vec!["tts".to_string(), "stt".to_string()], + fallback: default_label_fallback(), + }] +} + +/// A bounded metric label sourced from a trusted request header. +/// +/// The header value is trimmed and lowercased, then matched against `allowed`; +/// anything not in the allowlist (or a missing header) collapses to `fallback`, +/// so client-supplied headers can never inflate metric cardinality. +#[derive(Deserialize, Serialize, Debug, Clone, JsonSchema)] +pub struct RequestLabelConfig { + /// Metric label key (e.g. `service`). + pub name: String, + /// Trusted request header to read the value from (e.g. `X-StreamKit-Service`). + pub header: String, + /// Permitted values, matched case-insensitively after trimming. + #[serde(default)] + pub allowed: Vec, + /// Value emitted when the header is absent or its value is not in `allowed`. + #[serde(default = "default_label_fallback")] + pub fallback: String, +} + +/// Configuration for request-scoped metric labeling. +#[derive(Deserialize, Serialize, Debug, Clone, JsonSchema)] +pub struct MetricsConfig { + /// Bounded labels attached to request metrics, each sourced from a trusted + /// request header. Applied to all HTTP request metrics and to oneshot + /// pipeline metrics. + #[serde(default = "default_request_labels")] + pub request_labels: Vec, +} + +impl Default for MetricsConfig { + fn default() -> Self { + Self { request_labels: default_request_labels() } + } +} + /// Telemetry and observability configuration (OpenTelemetry, tokio-console). #[derive(Deserialize, Serialize, Debug, Clone, JsonSchema)] pub struct TelemetryConfig { @@ -322,6 +372,9 @@ pub struct ServerConfig { /// CORS configuration for cross-origin requests #[serde(default)] pub cors: CorsConfig, + /// Bounded request-metric labeling configuration. + #[serde(default)] + pub metrics: MetricsConfig, #[cfg(feature = "moq")] pub moq_address: Option, /// TLS certificate for the MoQ WebTransport listener. @@ -350,6 +403,7 @@ impl Default for ServerConfig { max_body_size: default_max_body_size(), base_path: None, cors: CorsConfig::default(), + metrics: MetricsConfig::default(), #[cfg(feature = "moq")] moq_address: None, #[cfg(feature = "moq")] diff --git a/apps/skit/src/lib.rs b/apps/skit/src/lib.rs index dab5e0e62..0e2ef6bfa 100644 --- a/apps/skit/src/lib.rs +++ b/apps/skit/src/lib.rs @@ -14,6 +14,7 @@ pub mod marketplace_installer; pub mod marketplace_security; #[cfg(feature = "mcp")] pub mod mcp; +pub mod metrics_labels; #[cfg(feature = "moq")] pub mod moq_gateway; pub mod mse_gateway; diff --git a/apps/skit/src/main.rs b/apps/skit/src/main.rs index c9a20a94e..64dac17ce 100644 --- a/apps/skit/src/main.rs +++ b/apps/skit/src/main.rs @@ -35,6 +35,7 @@ mod marketplace_installer; mod marketplace_security; #[cfg(feature = "mcp")] mod mcp; +mod metrics_labels; #[cfg(feature = "moq")] mod moq_gateway; mod mse_gateway; diff --git a/apps/skit/src/metrics_labels.rs b/apps/skit/src/metrics_labels.rs new file mode 100644 index 000000000..b93021a97 --- /dev/null +++ b/apps/skit/src/metrics_labels.rs @@ -0,0 +1,97 @@ +// SPDX-FileCopyrightText: © 2025 StreamKit Contributors +// +// SPDX-License-Identifier: MPL-2.0 + +//! Resolve bounded metric labels from trusted request headers. +//! +//! The values are constrained to operator-configured allowlists so +//! client-supplied headers can never inflate metric cardinality. + +use axum::http::HeaderMap; +use opentelemetry::KeyValue; + +use crate::config::RequestLabelConfig; + +fn normalize(value: &str) -> String { + value.trim().to_ascii_lowercase() +} + +/// Constrain a header value to an allowlist, falling back when it is absent or +/// unrecognized. Matching is case-insensitive after trimming. +fn classify(value: Option<&str>, allowed: &[String], fallback: &str) -> String { + match value.map(normalize) { + Some(v) if allowed.iter().any(|a| normalize(a) == v) => v, + _ => fallback.to_string(), + } +} + +/// Resolve configured request labels into bounded metric key-values. +pub fn resolve_request_labels(labels: &[RequestLabelConfig], headers: &HeaderMap) -> Vec { + labels + .iter() + .map(|label| { + let value = headers.get(label.header.as_str()).and_then(|v| v.to_str().ok()); + KeyValue::new(label.name.clone(), classify(value, &label.allowed, &label.fallback)) + }) + .collect() +} + +#[cfg(test)] +mod tests { + use super::*; + use axum::http::HeaderValue; + + fn label(name: &str, header: &str, allowed: &[&str]) -> RequestLabelConfig { + RequestLabelConfig { + name: name.to_string(), + header: header.to_string(), + allowed: allowed.iter().map(|s| (*s).to_string()).collect(), + fallback: "other".to_string(), + } + } + + #[test] + fn classify_allows_listed_values() { + let allowed = vec!["tts".to_string(), "stt".to_string()]; + assert_eq!(classify(Some("tts"), &allowed, "other"), "tts"); + assert_eq!(classify(Some("stt"), &allowed, "other"), "stt"); + } + + #[test] + fn classify_normalizes_case_and_whitespace() { + let allowed = vec!["tts".to_string()]; + assert_eq!(classify(Some(" TTS "), &allowed, "other"), "tts"); + } + + #[test] + fn classify_unknown_empty_and_absent_fall_back() { + let allowed = vec!["tts".to_string()]; + assert_eq!(classify(Some("kokoro"), &allowed, "other"), "other"); + assert_eq!(classify(Some(""), &allowed, "other"), "other"); + assert_eq!(classify(None, &allowed, "other"), "other"); + } + + #[test] + fn classify_empty_allowlist_always_falls_back() { + assert_eq!(classify(Some("tts"), &[], "other"), "other"); + } + + #[test] + fn resolve_emits_one_keyvalue_per_label() { + let labels = vec![label("service", "X-StreamKit-Service", &["tts", "stt"])]; + let mut headers = HeaderMap::new(); + headers.insert("X-StreamKit-Service", HeaderValue::from_static("STT")); + + let resolved = resolve_request_labels(&labels, &headers); + assert_eq!(resolved.len(), 1); + assert_eq!(resolved[0].key.as_str(), "service"); + assert_eq!(resolved[0].value.as_str(), "stt"); + } + + #[test] + fn resolve_falls_back_when_header_missing() { + let labels = vec![label("service", "X-StreamKit-Service", &["tts", "stt"])]; + let resolved = resolve_request_labels(&labels, &HeaderMap::new()); + assert_eq!(resolved[0].value.as_str(), "other"); + } +} diff --git a/apps/skit/src/server/mod.rs b/apps/skit/src/server/mod.rs index 1ac8da177..d3986efc5 100644 --- a/apps/skit/src/server/mod.rs +++ b/apps/skit/src/server/mod.rs @@ -1562,13 +1562,21 @@ async fn static_handler( } } -async fn metrics_middleware(req: axum::http::Request, next: Next) -> Response { +async fn metrics_middleware( + State(app_state): State>, + req: axum::http::Request, + next: Next, +) -> Response { let start = Instant::now(); let method = req.method().clone(); let path = req.extensions().get::().map_or_else( || req.uri().path().to_owned(), |matched_path| matched_path.as_str().to_owned(), ); + let configured_labels = crate::metrics_labels::resolve_request_labels( + &app_state.config.server.metrics.request_labels, + req.headers(), + ); let response = next.run(req).await; @@ -1590,11 +1598,12 @@ async fn metrics_middleware(req: axum::http::Request, next: Next) -> Respo }) .clone(); - let labels = [ + let mut labels = vec![ KeyValue::new("http.method", method.to_string()), KeyValue::new("http.route", path), KeyValue::new("http.status_code", status), ]; + labels.extend(configured_labels); counter.add(1, &labels); histogram.record(latency, &labels); @@ -1958,7 +1967,7 @@ pub fn create_app( .on_response(DefaultOnResponse::new().level(tracing::Level::DEBUG)) .on_failure(DefaultOnFailure::new().level(tracing::Level::WARN)), )) - .layer(middleware::from_fn(metrics_middleware)) + .layer(middleware::from_fn_with_state(Arc::clone(&app_state), metrics_middleware)) .layer(SetResponseHeaderLayer::if_not_present( header::X_CONTENT_TYPE_OPTIONS, header::HeaderValue::from_static("nosniff"), diff --git a/apps/skit/src/server/oneshot.rs b/apps/skit/src/server/oneshot.rs index 1b4a2f6cc..a1ad3453f 100644 --- a/apps/skit/src/server/oneshot.rs +++ b/apps/skit/src/server/oneshot.rs @@ -43,18 +43,12 @@ struct HttpInputBinding { required: bool, } -/// Trusted header naming the service behind a oneshot request. -const SERVICE_HEADER: &str = "X-StreamKit-Service"; - -/// Map the service header to a bounded label so user-submitted pipeline names -/// (arbitrary, high-cardinality) never reach metrics. Anything outside the -/// allowlist — including a missing header — collapses to `other`. -fn classify_service(header: Option<&str>) -> &'static str { - match header.map(|h| h.trim().to_ascii_lowercase()).as_deref() { - Some("tts") => "tts", - Some("stt") => "stt", - _ => "other", - } +/// Combine the per-request `status` with the resolved bounded labels. +fn duration_labels(status: &'static str, extra: &[KeyValue]) -> Vec { + let mut labels = Vec::with_capacity(extra.len() + 1); + labels.push(KeyValue::new("status", status)); + labels.extend_from_slice(extra); + labels } /// Extract content-type header and multipart boundary from request headers. @@ -413,7 +407,7 @@ fn build_streaming_response( pipeline_result: streamkit_engine::OneshotPipelineResult, start_time: Instant, duration_histogram: opentelemetry::metrics::Histogram, - service: &'static str, + metric_labels: Vec, ) -> Response { tracing::debug!( "Creating streaming response with content type: {}", @@ -421,7 +415,8 @@ fn build_streaming_response( ); let stream = ReceiverStream::new(pipeline_result.data_stream).map(Ok::<_, Infallible>); - let stream = InstrumentedOneshotStream::new(stream, start_time, duration_histogram, service); + let stream = + InstrumentedOneshotStream::new(stream, start_time, duration_histogram, metric_labels); let body = Body::from_stream(stream); let mut headers = HeaderMap::new(); @@ -451,7 +446,7 @@ struct InstrumentedOneshotStream { start_time: Instant, recorded: bool, duration_histogram: opentelemetry::metrics::Histogram, - service: &'static str, + metric_labels: Vec, } impl InstrumentedOneshotStream { @@ -459,9 +454,9 @@ impl InstrumentedOneshotStream { inner: S, start_time: Instant, duration_histogram: opentelemetry::metrics::Histogram, - service: &'static str, + metric_labels: Vec, ) -> Self { - Self { inner, start_time, recorded: false, duration_histogram, service } + Self { inner, start_time, recorded: false, duration_histogram, metric_labels } } fn record(&mut self, status: &'static str) { @@ -469,7 +464,7 @@ impl InstrumentedOneshotStream { return; } self.recorded = true; - let labels = [KeyValue::new("status", status), KeyValue::new("service", self.service)]; + let labels = duration_labels(status, &self.metric_labels); self.duration_histogram.record(self.start_time.elapsed().as_secs_f64(), &labels); } } @@ -509,7 +504,10 @@ pub(super) async fn process_oneshot_pipeline_handler( tracing::info!("Processing multipart request"); let headers = req.headers().clone(); - let service = classify_service(headers.get(SERVICE_HEADER).and_then(|v| v.to_str().ok())); + let metric_labels = crate::metrics_labels::resolve_request_labels( + &app_state.config.server.metrics.request_labels, + &headers, + ); let (role_name, perms) = crate::role_extractor::get_role_and_permissions(&headers, &app_state); if !perms.create_sessions { return Err(AppError::Forbidden( @@ -670,7 +668,7 @@ pub(super) async fn process_oneshot_pipeline_handler( result }, Err(e) => { - let labels = [KeyValue::new("status", "error"), KeyValue::new("service", service)]; + let labels = duration_labels("error", &metric_labels); oneshot_duration_histogram.record(oneshot_start_time.elapsed().as_secs_f64(), &labels); cancel_token.cancel(); return Err(e.into()); @@ -680,7 +678,7 @@ pub(super) async fn process_oneshot_pipeline_handler( match parse_done_rx.await { Ok(Ok(())) => {}, Ok(Err(err)) => { - let labels = [KeyValue::new("status", "error"), KeyValue::new("service", service)]; + let labels = duration_labels("error", &metric_labels); oneshot_duration_histogram.record(oneshot_start_time.elapsed().as_secs_f64(), &labels); cancel_token.cancel(); return Err(err); @@ -696,7 +694,7 @@ pub(super) async fn process_oneshot_pipeline_handler( pipeline_result, oneshot_start_time, oneshot_duration_histogram, - service, + metric_labels, )) } @@ -790,25 +788,6 @@ mod tests { assert!(msg.to_lowercase().contains("multipart"), "msg should mention multipart: {msg}"); } - #[test] - fn classify_service_recognizes_allowlist() { - assert_eq!(classify_service(Some("tts")), "tts"); - assert_eq!(classify_service(Some("stt")), "stt"); - } - - #[test] - fn classify_service_normalizes_case_and_whitespace() { - assert_eq!(classify_service(Some(" TTS ")), "tts"); - assert_eq!(classify_service(Some("Stt")), "stt"); - } - - #[test] - fn classify_service_unknown_and_absent_fall_back_to_other() { - assert_eq!(classify_service(Some("kokoro")), "other"); - assert_eq!(classify_service(Some("")), "other"); - assert_eq!(classify_service(None), "other"); - } - #[test] fn boundary_extracted_from_simple_content_type() { let value = From 010ca67630904c0ff9f455adfc369f1a3f635cd1 Mon Sep 17 00:00:00 2001 From: streamkit-devin Date: Sun, 31 May 2026 09:52:52 +0000 Subject: [PATCH 3/5] fix(metrics): validate label names, pre-normalize allowlist, dedupe resolution Address review on request-metric labels: - Reject configured label names that collide with built-in metric keys (status, http.method, http.route, http.status_code) or duplicate each other, at config load (duplicate Prometheus label keys break scrape). - Pre-normalize allowlist entries once at config load so the per-request hot path only normalizes the incoming header. - Resolve labels once in metrics_middleware and stash them in request extensions; the oneshot handler reuses them (falls back to resolving when the layer is absent, e.g. unit tests). - Record oneshot_pipeline.duration on the routing-task-aborted error arm for consistent error-path coverage. Signed-off-by: streamkit-devin --- apps/skit/src/config.rs | 102 ++++++++++++++++++++++++++++++++ apps/skit/src/metrics_labels.rs | 10 +++- apps/skit/src/server/mod.rs | 5 +- apps/skit/src/server/oneshot.rs | 18 ++++-- 4 files changed, 128 insertions(+), 7 deletions(-) diff --git a/apps/skit/src/config.rs b/apps/skit/src/config.rs index fa56ca580..9ce729669 100644 --- a/apps/skit/src/config.rs +++ b/apps/skit/src/config.rs @@ -281,6 +281,46 @@ impl Default for MetricsConfig { } } +impl MetricsConfig { + /// Label keys emitted by built-in request instruments; configured labels + /// must not collide with these (a duplicate key makes Prometheus reject the + /// whole series on scrape). + const RESERVED_LABEL_NAMES: [&'static str; 4] = + ["status", "http.method", "http.route", "http.status_code"]; + + /// Lowercase and trim every allowlist entry so the per-request hot path only + /// has to normalize the incoming header value. + fn normalize(&mut self) { + for label in &mut self.request_labels { + for allowed in &mut label.allowed { + *allowed = allowed.trim().to_ascii_lowercase(); + } + } + } + + /// Reject label names that collide with built-in metric keys or each other. + /// + /// # Errors + /// + /// Returns an error if a configured label name is reserved by a built-in + /// metric or duplicates another configured label name. + pub fn validate(&self) -> Result<(), String> { + let mut seen = std::collections::HashSet::new(); + for label in &self.request_labels { + if Self::RESERVED_LABEL_NAMES.contains(&label.name.as_str()) { + return Err(format!( + "metrics request_label name '{}' is reserved by built-in metrics", + label.name + )); + } + if !seen.insert(label.name.as_str()) { + return Err(format!("duplicate metrics request_label name '{}'", label.name)); + } + } + Ok(()) + } +} + /// Telemetry and observability configuration (OpenTelemetry, tokio-console). #[derive(Deserialize, Serialize, Debug, Clone, JsonSchema)] pub struct TelemetryConfig { @@ -1085,10 +1125,14 @@ pub fn load(config_path: &str) -> Result> figment.merge(Env::prefixed("SK_").split("__")).extract().map_err(Box::new)?; normalize_permissions_config(&mut config); + config.server.metrics.normalize(); if let Err(e) = config.mcp.validate() { return Err(Box::new(figment::Error::from(e))); } + if let Err(e) = config.server.metrics.validate() { + return Err(Box::new(figment::Error::from(e))); + } Ok(ConfigLoadResult { config, file_missing }) } @@ -1499,4 +1543,62 @@ allowed_plugins = [] Ok(()) }); } + + fn request_label(name: &str) -> RequestLabelConfig { + RequestLabelConfig { + name: name.to_string(), + header: "X-Test".to_string(), + allowed: vec![], + fallback: "other".to_string(), + } + } + + #[test] + fn metrics_validate_rejects_reserved_label_name() { + let metrics = MetricsConfig { request_labels: vec![request_label("status")] }; + assert!(metrics.validate().is_err()); + } + + #[test] + fn metrics_validate_rejects_duplicate_label_name() { + let metrics = MetricsConfig { + request_labels: vec![request_label("service"), request_label("service")], + }; + assert!(metrics.validate().is_err()); + } + + #[test] + fn metrics_validate_accepts_default() { + assert!(MetricsConfig::default().validate().is_ok()); + } + + #[test] + fn metrics_normalize_lowercases_allowlist() { + let mut metrics = MetricsConfig { + request_labels: vec![RequestLabelConfig { + name: "service".to_string(), + header: "X-StreamKit-Service".to_string(), + allowed: vec![" TTS ".to_string(), "Stt".to_string()], + fallback: "other".to_string(), + }], + }; + metrics.normalize(); + assert_eq!(metrics.request_labels[0].allowed, vec!["tts".to_string(), "stt".to_string()]); + } + + #[test] + fn load_rejects_reserved_metrics_label_name() { + figment::Jail::expect_with(|jail| { + jail.create_file( + "skit.toml", + r#"[[server.metrics.request_labels]] +name = "http.route" +header = "X-StreamKit-Service" +allowed = ["tts"] +"#, + )?; + assert!(load("skit.toml").is_err(), "reserved label name must fail load"); + Ok(()) + }); + } } diff --git a/apps/skit/src/metrics_labels.rs b/apps/skit/src/metrics_labels.rs index b93021a97..dc9becea0 100644 --- a/apps/skit/src/metrics_labels.rs +++ b/apps/skit/src/metrics_labels.rs @@ -12,15 +12,21 @@ use opentelemetry::KeyValue; use crate::config::RequestLabelConfig; +/// Bounded request labels resolved once per request and stashed in request +/// extensions so downstream handlers can reuse them without re-parsing headers. +#[derive(Clone)] +pub struct ResolvedRequestLabels(pub Vec); + fn normalize(value: &str) -> String { value.trim().to_ascii_lowercase() } /// Constrain a header value to an allowlist, falling back when it is absent or -/// unrecognized. Matching is case-insensitive after trimming. +/// unrecognized. The incoming value is normalized (trim + lowercase); `allowed` +/// entries are expected to be pre-normalized at config load. fn classify(value: Option<&str>, allowed: &[String], fallback: &str) -> String { match value.map(normalize) { - Some(v) if allowed.iter().any(|a| normalize(a) == v) => v, + Some(v) if allowed.contains(&v) => v, _ => fallback.to_string(), } } diff --git a/apps/skit/src/server/mod.rs b/apps/skit/src/server/mod.rs index d3986efc5..720e3c3db 100644 --- a/apps/skit/src/server/mod.rs +++ b/apps/skit/src/server/mod.rs @@ -1564,7 +1564,7 @@ async fn static_handler( async fn metrics_middleware( State(app_state): State>, - req: axum::http::Request, + mut req: axum::http::Request, next: Next, ) -> Response { let start = Instant::now(); @@ -1577,6 +1577,9 @@ async fn metrics_middleware( &app_state.config.server.metrics.request_labels, req.headers(), ); + // Let downstream handlers reuse the resolved labels instead of re-parsing headers. + req.extensions_mut() + .insert(crate::metrics_labels::ResolvedRequestLabels(configured_labels.clone())); let response = next.run(req).await; diff --git a/apps/skit/src/server/oneshot.rs b/apps/skit/src/server/oneshot.rs index a1ad3453f..702efac5b 100644 --- a/apps/skit/src/server/oneshot.rs +++ b/apps/skit/src/server/oneshot.rs @@ -504,10 +504,18 @@ pub(super) async fn process_oneshot_pipeline_handler( tracing::info!("Processing multipart request"); let headers = req.headers().clone(); - let metric_labels = crate::metrics_labels::resolve_request_labels( - &app_state.config.server.metrics.request_labels, - &headers, - ); + // Reuse labels already resolved by metrics_middleware; resolve directly only + // if this handler is exercised without that layer (e.g. unit tests). + let metric_labels = + req.extensions().get::().map_or_else( + || { + crate::metrics_labels::resolve_request_labels( + &app_state.config.server.metrics.request_labels, + &headers, + ) + }, + |resolved| resolved.0.clone(), + ); let (role_name, perms) = crate::role_extractor::get_role_and_permissions(&headers, &app_state); if !perms.create_sessions { return Err(AppError::Forbidden( @@ -684,6 +692,8 @@ pub(super) async fn process_oneshot_pipeline_handler( return Err(err); }, Err(e) => { + let labels = duration_labels("error", &metric_labels); + oneshot_duration_histogram.record(oneshot_start_time.elapsed().as_secs_f64(), &labels); cancel_token.cancel(); return Err(AppError::BadRequest(format!("Multipart routing task aborted: {e}"))); }, From 854dba216399e5c512cefb8aad56c80796384838 Mon Sep 17 00:00:00 2001 From: streamkit-devin Date: Sun, 31 May 2026 14:36:56 +0000 Subject: [PATCH 4/5] fix(metrics): harden request-label validation and normalization - reject empty/invalid label names and empty allowlist entries - compare label names by Prometheus-sanitized key so http_method collides with the built-in http.method - normalize fallback like matched values; share one normalize() helper - centralize built-in reserved keys (single source for emit sites + validation) - normalize metrics config in create_app_state so non-load AppState paths (tests, MCP/embedded) match the allowlist - skip resolve + extension insert when request_labels is empty - correct default_request_labels doc: declaring labels replaces the list Signed-off-by: streamkit-devin --- apps/skit/src/config.rs | 113 +++++++++++++++++++++++++++----- apps/skit/src/metrics_labels.rs | 29 +++++++- apps/skit/src/server/mod.rs | 27 +++++--- apps/skit/src/server/oneshot.rs | 2 +- 4 files changed, 139 insertions(+), 32 deletions(-) diff --git a/apps/skit/src/config.rs b/apps/skit/src/config.rs index 9ce729669..c3d75f33c 100644 --- a/apps/skit/src/config.rs +++ b/apps/skit/src/config.rs @@ -236,8 +236,9 @@ fn default_label_fallback() -> String { } fn default_request_labels() -> Vec { - // Ships the `service` dimension oneshot dashboards build against; operators - // can extend or replace it without a recompile. + // Ships the `service` dimension oneshot dashboards build against. Declaring + // any `request_labels` in config REPLACES this list wholesale (figment does + // not merge sequences), so re-list `service` to keep it alongside new ones. vec![RequestLabelConfig { name: "service".to_string(), header: "X-StreamKit-Service".to_string(), @@ -281,41 +282,74 @@ impl Default for MetricsConfig { } } +/// Prometheus sanitizes any character outside `[a-zA-Z0-9_]` in a label key to +/// `_`, so `http.method` and `http_method` collapse to the same series key. We +/// compare sanitized keys to catch collisions that only appear after scrape. +fn sanitize_label_key(name: &str) -> String { + name.chars().map(|c| if c.is_ascii_alphanumeric() || c == '_' { c } else { '_' }).collect() +} + +/// A metric label name must be a non-empty identifier (dots allowed, per the +/// OpenTelemetry convention used by the built-in keys) so it survives export. +fn is_valid_label_name(name: &str) -> bool { + let mut chars = name.chars(); + match chars.next() { + Some(c) if c.is_ascii_alphabetic() || c == '_' => {}, + _ => return false, + } + chars.all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '.') +} + impl MetricsConfig { - /// Label keys emitted by built-in request instruments; configured labels - /// must not collide with these (a duplicate key makes Prometheus reject the - /// whole series on scrape). - const RESERVED_LABEL_NAMES: [&'static str; 4] = - ["status", "http.method", "http.route", "http.status_code"]; - - /// Lowercase and trim every allowlist entry so the per-request hot path only - /// has to normalize the incoming header value. - fn normalize(&mut self) { + /// Lowercase and trim every allowlist entry and each `fallback` so the + /// per-request hot path only has to normalize the incoming header value and + /// every emitted value shares one normalized space. + pub fn normalize(&mut self) { for label in &mut self.request_labels { for allowed in &mut label.allowed { - *allowed = allowed.trim().to_ascii_lowercase(); + *allowed = crate::metrics_labels::normalize(allowed); } + label.fallback = crate::metrics_labels::normalize(&label.fallback); } } - /// Reject label names that collide with built-in metric keys or each other. + /// Reject label configs that would silently corrupt metrics: invalid names, + /// names colliding (after Prometheus sanitization) with a built-in key or + /// each other, and empty allowlist entries. /// /// # Errors /// - /// Returns an error if a configured label name is reserved by a built-in - /// metric or duplicates another configured label name. + /// Returns an error describing the first offending label. pub fn validate(&self) -> Result<(), String> { + let reserved: std::collections::HashSet = + crate::metrics_labels::RESERVED_LABEL_KEYS + .iter() + .map(|k| sanitize_label_key(k)) + .collect(); let mut seen = std::collections::HashSet::new(); for label in &self.request_labels { - if Self::RESERVED_LABEL_NAMES.contains(&label.name.as_str()) { + if !is_valid_label_name(&label.name) { return Err(format!( - "metrics request_label name '{}' is reserved by built-in metrics", + "metrics request_label name '{}' is not a valid metric label name", label.name )); } - if !seen.insert(label.name.as_str()) { + let key = sanitize_label_key(&label.name); + if reserved.contains(&key) { + return Err(format!( + "metrics request_label name '{}' collides with a built-in metric key", + label.name + )); + } + if !seen.insert(key) { return Err(format!("duplicate metrics request_label name '{}'", label.name)); } + if label.allowed.iter().any(|v| v.trim().is_empty()) { + return Err(format!( + "metrics request_label '{}' has an empty allowed value", + label.name + )); + } } Ok(()) } @@ -1572,6 +1606,49 @@ allowed_plugins = [] assert!(MetricsConfig::default().validate().is_ok()); } + #[test] + fn metrics_validate_rejects_empty_or_invalid_label_name() { + assert!(MetricsConfig { request_labels: vec![request_label("")] }.validate().is_err()); + assert!(MetricsConfig { request_labels: vec![request_label(" ")] }.validate().is_err()); + assert!(MetricsConfig { request_labels: vec![request_label("1service")] } + .validate() + .is_err()); + } + + #[test] + fn metrics_validate_rejects_sanitized_reserved_collision() { + // `http_method` sanitizes to the same Prometheus key as `http.method`. + let metrics = MetricsConfig { request_labels: vec![request_label("http_method")] }; + assert!(metrics.validate().is_err()); + } + + #[test] + fn metrics_validate_rejects_empty_allowed_value() { + let metrics = MetricsConfig { + request_labels: vec![RequestLabelConfig { + name: "service".to_string(), + header: "X-Test".to_string(), + allowed: vec!["tts".to_string(), " ".to_string()], + fallback: "other".to_string(), + }], + }; + assert!(metrics.validate().is_err()); + } + + #[test] + fn metrics_normalize_lowercases_fallback() { + let mut metrics = MetricsConfig { + request_labels: vec![RequestLabelConfig { + name: "service".to_string(), + header: "X-Test".to_string(), + allowed: vec!["tts".to_string()], + fallback: " Other ".to_string(), + }], + }; + metrics.normalize(); + assert_eq!(metrics.request_labels[0].fallback, "other"); + } + #[test] fn metrics_normalize_lowercases_allowlist() { let mut metrics = MetricsConfig { diff --git a/apps/skit/src/metrics_labels.rs b/apps/skit/src/metrics_labels.rs index dc9becea0..30f7cca2e 100644 --- a/apps/skit/src/metrics_labels.rs +++ b/apps/skit/src/metrics_labels.rs @@ -12,21 +12,37 @@ use opentelemetry::KeyValue; use crate::config::RequestLabelConfig; +/// Label keys emitted by the built-in request instruments (HTTP middleware and +/// oneshot histogram). Configured request labels must not collide with these, +/// even after Prometheus sanitizes the key. +pub const STATUS_KEY: &str = "status"; +pub const HTTP_METHOD_KEY: &str = "http.method"; +pub const HTTP_ROUTE_KEY: &str = "http.route"; +pub const HTTP_STATUS_CODE_KEY: &str = "http.status_code"; + +/// Single source of truth for the built-in keys, referenced both at the emit +/// sites and by config validation so the reserved set cannot drift. +pub const RESERVED_LABEL_KEYS: [&str; 4] = + [STATUS_KEY, HTTP_METHOD_KEY, HTTP_ROUTE_KEY, HTTP_STATUS_CODE_KEY]; + /// Bounded request labels resolved once per request and stashed in request /// extensions so downstream handlers can reuse them without re-parsing headers. #[derive(Clone)] pub struct ResolvedRequestLabels(pub Vec); -fn normalize(value: &str) -> String { +/// Canonical normalization for label values and allowlist entries: trim and +/// ASCII-lowercase. Shared so the per-request path and config load stay in step. +pub fn normalize(value: &str) -> String { value.trim().to_ascii_lowercase() } /// Constrain a header value to an allowlist, falling back when it is absent or /// unrecognized. The incoming value is normalized (trim + lowercase); `allowed` -/// entries are expected to be pre-normalized at config load. +/// entries are expected to be pre-normalized at config load. An empty value +/// never matches, so a stray empty allowlist entry can't emit an empty label. fn classify(value: Option<&str>, allowed: &[String], fallback: &str) -> String { match value.map(normalize) { - Some(v) if allowed.contains(&v) => v, + Some(v) if !v.is_empty() && allowed.contains(&v) => v, _ => fallback.to_string(), } } @@ -82,6 +98,13 @@ mod tests { assert_eq!(classify(Some("tts"), &[], "other"), "other"); } + #[test] + fn classify_empty_allowlist_entry_never_emits_empty_value() { + let allowed = vec![String::new()]; + assert_eq!(classify(Some(""), &allowed, "other"), "other"); + assert_eq!(classify(Some(" "), &allowed, "other"), "other"); + } + #[test] fn resolve_emits_one_keyvalue_per_label() { let labels = vec![label("service", "X-StreamKit-Service", &["tts", "stt"])]; diff --git a/apps/skit/src/server/mod.rs b/apps/skit/src/server/mod.rs index 720e3c3db..c7c311f03 100644 --- a/apps/skit/src/server/mod.rs +++ b/apps/skit/src/server/mod.rs @@ -1573,13 +1573,16 @@ async fn metrics_middleware( || req.uri().path().to_owned(), |matched_path| matched_path.as_str().to_owned(), ); - let configured_labels = crate::metrics_labels::resolve_request_labels( - &app_state.config.server.metrics.request_labels, - req.headers(), - ); - // Let downstream handlers reuse the resolved labels instead of re-parsing headers. - req.extensions_mut() - .insert(crate::metrics_labels::ResolvedRequestLabels(configured_labels.clone())); + let configured_request_labels = &app_state.config.server.metrics.request_labels; + let configured_labels = if configured_request_labels.is_empty() { + Vec::new() + } else { + let resolved = + crate::metrics_labels::resolve_request_labels(configured_request_labels, req.headers()); + // Let downstream handlers reuse the resolved labels instead of re-parsing headers. + req.extensions_mut().insert(crate::metrics_labels::ResolvedRequestLabels(resolved.clone())); + resolved + }; let response = next.run(req).await; @@ -1602,9 +1605,9 @@ async fn metrics_middleware( .clone(); let mut labels = vec![ - KeyValue::new("http.method", method.to_string()), - KeyValue::new("http.route", path), - KeyValue::new("http.status_code", status), + KeyValue::new(crate::metrics_labels::HTTP_METHOD_KEY, method.to_string()), + KeyValue::new(crate::metrics_labels::HTTP_ROUTE_KEY, path), + KeyValue::new(crate::metrics_labels::HTTP_STATUS_CODE_KEY, status), ]; labels.extend(configured_labels); @@ -1627,6 +1630,10 @@ pub fn create_app_state( mut config: Config, auth: Option>, ) -> Arc { + // Normalize here (not only in config::load) so every AppState — tests, + // embedded/MCP callers — gets a normalized allowlist the resolver can match. + config.server.metrics.normalize(); + let (event_tx, _) = tokio::sync::broadcast::channel(128); let resource_policy = streamkit_core::ResourcePolicy { diff --git a/apps/skit/src/server/oneshot.rs b/apps/skit/src/server/oneshot.rs index 702efac5b..4344268a8 100644 --- a/apps/skit/src/server/oneshot.rs +++ b/apps/skit/src/server/oneshot.rs @@ -46,7 +46,7 @@ struct HttpInputBinding { /// Combine the per-request `status` with the resolved bounded labels. fn duration_labels(status: &'static str, extra: &[KeyValue]) -> Vec { let mut labels = Vec::with_capacity(extra.len() + 1); - labels.push(KeyValue::new("status", status)); + labels.push(KeyValue::new(crate::metrics_labels::STATUS_KEY, status)); labels.extend_from_slice(extra); labels } From c463ebb0bd9e976a405124cb91764fa8700cc6a9 Mon Sep 17 00:00:00 2001 From: streamkit-devin Date: Sun, 31 May 2026 14:47:21 +0000 Subject: [PATCH 5/5] fix(metrics): warn on invalid request_labels in create_app_state load() hard-rejects, but embedded/MCP/test callers build Config directly; validate there too and log a warning so a bad label can't slip in silently. Signed-off-by: streamkit-devin --- apps/skit/src/server/mod.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/apps/skit/src/server/mod.rs b/apps/skit/src/server/mod.rs index c7c311f03..6ccdeab2e 100644 --- a/apps/skit/src/server/mod.rs +++ b/apps/skit/src/server/mod.rs @@ -1632,7 +1632,11 @@ pub fn create_app_state( ) -> Arc { // Normalize here (not only in config::load) so every AppState — tests, // embedded/MCP callers — gets a normalized allowlist the resolver can match. + // load() hard-rejects invalid configs; this infallible path can only warn. config.server.metrics.normalize(); + if let Err(e) = config.server.metrics.validate() { + tracing::warn!("ignoring invalid metrics request_labels configuration: {e}"); + } let (event_tx, _) = tokio::sync::broadcast::channel(128);