diff --git a/src/alerts/alert_structs.rs b/src/alerts/alert_structs.rs index 3c9f0b218..894fd1bc4 100644 --- a/src/alerts/alert_structs.rs +++ b/src/alerts/alert_structs.rs @@ -19,10 +19,32 @@ use std::{collections::HashMap, time::Duration}; use chrono::{DateTime, Utc}; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize}; +use serde_json::Value; use tokio::sync::{RwLock, mpsc}; use ulid::Ulid; +const RESERVED_FIELDS: &[&str] = &[ + "id", + "version", + "severity", + "title", + "query", + "datasets", + "alertType", + "anomalyConfig", + "forecastConfig", + "thresholdConfig", + "notificationConfig", + "evalConfig", + "targets", + "tags", + "state", + "notificationState", + "created", + "lastTriggeredAt", +]; + use crate::{ alerts::{ AlertError, CURRENT_ALERTS_VERSION, @@ -38,6 +60,52 @@ use crate::{ storage::object_storage::{alert_json_path, alert_state_json_path}, }; +/// Custom deserializer for DateTime that handles legacy empty strings +/// +/// This is a compatibility layer for migrating old alerts that stored empty strings +/// instead of valid timestamps. In production, this should log warnings to help +/// identify data quality issues. +/// +/// # Migration Path +/// - Empty strings → Default to current time with a warning +/// - Missing fields → Default to current time +/// - Valid timestamps → Parse normally +pub fn deserialize_datetime_with_empty_string_fallback<'de, D>( + deserializer: D, +) -> Result, D::Error> +where + D: Deserializer<'de>, +{ + #[derive(Deserialize)] + #[serde(untagged)] + enum DateTimeOrString { + DateTime(DateTime), + String(String), + } + + match DateTimeOrString::deserialize(deserializer)? { + DateTimeOrString::DateTime(dt) => Ok(dt), + DateTimeOrString::String(s) => { + if s.is_empty() { + // Log warning about data quality issue + tracing::warn!( + "Alert has empty 'created' field - this indicates a data quality issue. \ + Defaulting to current timestamp. Please investigate and fix the data source." + ); + Ok(Utc::now()) + } else { + s.parse::>().map_err(serde::de::Error::custom) + } + } + } +} + +/// Default function for created timestamp - returns current time +/// This handles the case where created field is missing in deserialization +pub fn default_created_time() -> DateTime { + Utc::now() +} + /// Helper struct for basic alert fields during migration pub struct BasicAlertFields { pub id: Ulid, @@ -253,10 +321,32 @@ pub struct AlertRequest { pub eval_config: EvalConfig, pub targets: Vec, pub tags: Option>, + #[serde(flatten)] + pub other_fields: Option>, } impl AlertRequest { pub async fn into(self) -> Result { + // Validate that other_fields doesn't contain reserved field names + if let Some(ref other_fields) = self.other_fields { + // Limit other_fields to maximum 10 fields + if other_fields.len() > 10 { + return Err(AlertError::ValidationFailure(format!( + "other_fields can contain at most 10 fields, found {}", + other_fields.len() + ))); + } + + for key in other_fields.keys() { + if RESERVED_FIELDS.contains(&key.as_str()) { + return Err(AlertError::ValidationFailure(format!( + "Field '{}' cannot be in other_fields as it's a reserved field name", + key + ))); + } + } + } + // Validate that all target IDs exist for id in &self.targets { TARGETS.get_target_by_id(id).await?; @@ -309,6 +399,7 @@ impl AlertRequest { created: Utc::now(), tags: self.tags, last_triggered_at: None, + other_fields: self.other_fields, }; Ok(config) } @@ -333,9 +424,15 @@ pub struct AlertConfig { pub state: AlertState, pub notification_state: NotificationState, pub notification_config: NotificationConfig, + #[serde( + default = "default_created_time", + deserialize_with = "deserialize_datetime_with_empty_string_fallback" + )] pub created: DateTime, pub tags: Option>, pub last_triggered_at: Option>, + #[serde(flatten)] + pub other_fields: Option>, } #[derive(Debug, serde::Serialize, serde::Deserialize, Clone)] @@ -359,9 +456,15 @@ pub struct AlertConfigResponse { pub state: AlertState, pub notification_state: NotificationState, pub notification_config: NotificationConfig, + #[serde( + default = "default_created_time", + deserialize_with = "deserialize_datetime_with_empty_string_fallback" + )] pub created: DateTime, pub tags: Option>, pub last_triggered_at: Option>, + #[serde(flatten)] + pub other_fields: Option>, } impl AlertConfig { @@ -401,6 +504,7 @@ impl AlertConfig { created: self.created, tags: self.tags, last_triggered_at: self.last_triggered_at, + other_fields: self.other_fields, } } } diff --git a/src/alerts/alert_types.rs b/src/alerts/alert_types.rs index a2755e90a..1bef0e3a0 100644 --- a/src/alerts/alert_types.rs +++ b/src/alerts/alert_types.rs @@ -19,6 +19,7 @@ use std::{str::FromStr, time::Duration}; use chrono::{DateTime, Utc}; +use serde_json::Value; use tonic::async_trait; use tracing::{info, trace, warn}; use ulid::Ulid; @@ -28,7 +29,10 @@ use crate::{ AlertConfig, AlertError, AlertState, AlertType, AlertVersion, EvalConfig, Severity, ThresholdConfig, alert_enums::NotificationState, - alert_structs::{AlertStateEntry, GroupResult}, + alert_structs::{ + AlertStateEntry, GroupResult, default_created_time, + deserialize_datetime_with_empty_string_fallback, + }, alert_traits::{AlertTrait, MessageCreation}, alerts_utils::{evaluate_condition, execute_alert_query, extract_time_range}, get_number_of_agg_exprs, @@ -61,10 +65,16 @@ pub struct ThresholdAlert { pub state: AlertState, pub notification_state: NotificationState, pub notification_config: NotificationConfig, + #[serde( + default = "default_created_time", + deserialize_with = "deserialize_datetime_with_empty_string_fallback" + )] pub created: DateTime, pub tags: Option>, pub datasets: Vec, pub last_triggered_at: Option>, + #[serde(flatten)] + pub other_fields: Option>, } impl MetastoreObject for ThresholdAlert { @@ -408,6 +418,7 @@ impl From for ThresholdAlert { tags: value.tags, datasets: value.datasets, last_triggered_at: value.last_triggered_at, + other_fields: value.other_fields, } } } @@ -431,6 +442,7 @@ impl From for AlertConfig { tags: val.tags, datasets: val.datasets, last_triggered_at: val.last_triggered_at, + other_fields: val.other_fields, } } } diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 541ad6e35..4df7bfd78 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -51,7 +51,7 @@ pub use crate::alerts::alert_enums::{ pub use crate::alerts::alert_structs::{ AlertConfig, AlertInfo, AlertRequest, AlertStateEntry, Alerts, AlertsInfo, AlertsInfoByState, AlertsSummary, BasicAlertFields, Context, DeploymentInfo, RollingWindow, StateTransition, - ThresholdConfig, + ThresholdConfig, default_created_time, deserialize_datetime_with_empty_string_fallback, }; use crate::alerts::alert_traits::{AlertManagerTrait, AlertTrait}; use crate::alerts::alert_types::ThresholdAlert; @@ -134,6 +134,7 @@ impl AlertConfig { created: Utc::now(), tags: None, last_triggered_at: None, + other_fields: None, }; // Save the migrated alert back to storage @@ -682,6 +683,12 @@ impl AlertConfig { ); } + if let Some(other_fields) = &self.other_fields { + for (key, value) in other_fields { + map.insert(key.clone(), value.clone()); + } + } + map } } diff --git a/src/handlers/http/alerts.rs b/src/handlers/http/alerts.rs index be70fb05a..a5e5c1938 100644 --- a/src/handlers/http/alerts.rs +++ b/src/handlers/http/alerts.rs @@ -38,69 +38,121 @@ use actix_web::{ use chrono::{DateTime, Utc}; use ulid::Ulid; -// GET /alerts -/// User needs at least a read access to the stream(s) that is being referenced in an alert -/// Read all alerts then return alerts which satisfy the condition -pub async fn list(req: HttpRequest) -> Result { - let session_key = extract_session_key_from_req(&req)?; - let query_map = web::Query::>::from_query(req.query_string()) - .map_err(|_| AlertError::InvalidQueryParameter("malformed query parameters".to_string()))?; +// Reserved query parameter names that are not treated as other_fields filters +const RESERVED_PARAMS: [&str; 3] = ["tags", "offset", "limit"]; +const MAX_LIMIT: usize = 1000; +const DEFAULT_LIMIT: usize = 100; + +/// Query parameters for listing alerts +struct ListQueryParams { + tags_list: Vec, + offset: usize, + limit: usize, + other_fields_filters: HashMap, +} +/// Parse and validate query parameters for listing alerts +fn parse_list_query_params( + query_map: &HashMap, +) -> Result { let mut tags_list = Vec::new(); let mut offset = 0usize; - let mut limit = 100usize; // Default limit - const MAX_LIMIT: usize = 1000; // Maximum allowed limit - - // Parse query parameters - if !query_map.is_empty() { - // Parse tags parameter - if let Some(tags) = query_map.get("tags") { - tags_list = tags - .split(',') - .map(|s| s.trim().to_string()) - .filter(|s| !s.is_empty()) - .collect(); - if tags_list.is_empty() { - return Err(AlertError::InvalidQueryParameter( - "empty tags not allowed with query param tags".to_string(), - )); - } + let mut limit = DEFAULT_LIMIT; + let mut other_fields_filters: HashMap = HashMap::new(); + + if query_map.is_empty() { + return Ok(ListQueryParams { + tags_list, + offset, + limit, + other_fields_filters, + }); + } + + // Parse tags parameter + if let Some(tags) = query_map.get("tags") { + tags_list = tags + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + if tags_list.is_empty() { + return Err(AlertError::InvalidQueryParameter( + "empty tags not allowed with query param tags".to_string(), + )); } + } - // Parse offset parameter - if let Some(offset_str) = query_map.get("offset") { - offset = offset_str.parse().map_err(|_| { - AlertError::InvalidQueryParameter("offset is not a valid number".to_string()) - })?; + // Parse offset parameter + if let Some(offset_str) = query_map.get("offset") { + offset = offset_str.parse().map_err(|_| { + AlertError::InvalidQueryParameter("offset is not a valid number".to_string()) + })?; + } + + // Parse limit parameter + if let Some(limit_str) = query_map.get("limit") { + limit = limit_str.parse().map_err(|_| { + AlertError::InvalidQueryParameter("limit is not a valid number".to_string()) + })?; + + // Validate limit bounds + if limit == 0 || limit > MAX_LIMIT { + return Err(AlertError::InvalidQueryParameter( + "limit should be between 1 and 1000".to_string(), + )); } + } - // Parse limit parameter - if let Some(limit_str) = query_map.get("limit") { - limit = limit_str.parse().map_err(|_| { - AlertError::InvalidQueryParameter("limit is not a valid number".to_string()) - })?; - - // Validate limit bounds - if limit == 0 || limit > MAX_LIMIT { - return Err(AlertError::InvalidQueryParameter( - "limit should be between 1 and 1000".to_string(), - )); - } + // Collect all other query parameters as potential other_fields filters + for (key, value) in query_map.iter() { + if !RESERVED_PARAMS.contains(&key.as_str()) { + other_fields_filters.insert(key.clone(), value.clone()); } } - let guard = ALERTS.read().await; - let alerts = if let Some(alerts) = guard.as_ref() { - alerts - } else { - return Err(AlertError::CustomError("No AlertManager set".into())); - }; - let alerts = alerts.list_alerts_for_user(session_key, tags_list).await?; - let mut alerts_summary = alerts - .iter() - .map(|alert| alert.to_summary()) - .collect::>(); - // Sort by state priority (Triggered > NotTriggered) then by severity (Critical > High > Medium > Low) + Ok(ListQueryParams { + tags_list, + offset, + limit, + other_fields_filters, + }) +} + +/// Filter alerts by other_fields +fn filter_by_other_fields( + mut alerts_summary: Vec>, + filters: &HashMap, +) -> Vec> { + if filters.is_empty() { + return alerts_summary; + } + + alerts_summary.retain(|alert_summary| { + // Check if all specified other_fields filters match + filters.iter().all(|(filter_key, filter_value)| { + alert_summary + .get(filter_key) + .map(|v| { + // Convert JSON value to string for comparison + let value_as_string = if v.is_string() { + // For strings, use the raw string value without quotes + v.as_str().unwrap_or("").to_string() + } else { + // For numbers, booleans, arrays, objects, use JSON representation + v.to_string() + }; + value_as_string == *filter_value + }) + .unwrap_or(false) + }) + }); + + alerts_summary +} + +/// Sort alerts by state, severity, and title +fn sort_alerts(alerts_summary: &mut [serde_json::Map]) { alerts_summary.sort_by(|a, b| { // Parse state and severity from JSON values back to enums let state_a = a @@ -128,22 +180,66 @@ pub async fn list(req: HttpRequest) -> Result { .unwrap_or(Severity::Low); let title_a = a.get("title").and_then(|v| v.as_str()).unwrap_or(""); - let title_b = b.get("title").and_then(|v| v.as_str()).unwrap_or(""); - // First sort by state, then by severity + // First sort by state, then by severity, then by title state_a .cmp(&state_b) .then_with(|| severity_a.cmp(&severity_b)) .then_with(|| title_a.cmp(title_b)) }); +} - let paginated_alerts = alerts_summary +/// Paginate alerts +fn paginate_alerts( + alerts_summary: Vec>, + offset: usize, + limit: usize, +) -> Vec> { + alerts_summary .into_iter() .skip(offset) .take(limit) + .collect() +} + +// GET /alerts +/// User needs at least a read access to the stream(s) that is being referenced in an alert +/// Read all alerts then return alerts which satisfy the condition +pub async fn list(req: HttpRequest) -> Result { + let session_key = extract_session_key_from_req(&req)?; + let query_map = web::Query::>::from_query(req.query_string()) + .map_err(|_| AlertError::InvalidQueryParameter("malformed query parameters".to_string()))?; + + // Parse and validate query parameters + let params = parse_list_query_params(&query_map)?; + + // Get alerts from the manager + let guard = ALERTS.read().await; + let alerts = if let Some(alerts) = guard.as_ref() { + alerts + } else { + return Err(AlertError::CustomError("No AlertManager set".into())); + }; + + // Fetch alerts for the user + let alerts = alerts + .list_alerts_for_user(session_key, params.tags_list) + .await?; + let mut alerts_summary = alerts + .iter() + .map(|alert| alert.to_summary()) .collect::>(); + // Filter by other_fields + alerts_summary = filter_by_other_fields(alerts_summary, ¶ms.other_fields_filters); + + // Sort alerts + sort_alerts(&mut alerts_summary); + + // Paginate results + let paginated_alerts = paginate_alerts(alerts_summary, params.offset, params.limit); + Ok(web::Json(paginated_alerts)) } diff --git a/src/metastore/metastores/object_store_metastore.rs b/src/metastore/metastores/object_store_metastore.rs index 10edf9438..446e7daf8 100644 --- a/src/metastore/metastores/object_store_metastore.rs +++ b/src/metastore/metastores/object_store_metastore.rs @@ -180,7 +180,9 @@ impl Metastore for ObjectStoreMetastore { .storage .get_objects( Some(&alerts_path), - Box::new(|file_name| file_name.ends_with(".json")), + Box::new(|file_name| { + !file_name.starts_with("alert_state_") && file_name.ends_with(".json") + }), ) .await?;