Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 64 additions & 83 deletions src/alerts/alert_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,93 +19,55 @@
use std::{collections::HashMap, time::Duration};

use chrono::{DateTime, Utc};
use serde::{Deserialize, Deserializer, Serialize};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tokio::sync::{RwLock, mpsc};
use ulid::Ulid;

use crate::{
alerts::{
AlertError, CURRENT_ALERTS_VERSION,
alert_enums::{
AlertOperator, AlertState, AlertTask, AlertType, AlertVersion, EvalConfig,
LogicalOperator, NotificationState, Severity, WhereConfigOperator,
},
alert_traits::AlertTrait,
target::{NotificationConfig, TARGETS},
},
metastore::metastore_traits::MetastoreObject,
query::resolve_stream_names,
storage::object_storage::{alert_json_path, alert_state_json_path},
};

const RESERVED_FIELDS: &[&str] = &[
"id",
"version",
"id",
"severity",
"title",
"query",
"datasets",
"alertType",
"alert_type",
"anomalyConfig",
"anomaly_config",
"forecastConfig",
"forecast_config",
"thresholdConfig",
"notificationConfig",
"threshold_config",
"evalConfig",
"eval_config",
"targets",
"tags",
"state",
"notificationState",
"notification_state",
"notificationConfig",
"notification_config",
"created",
"tags",
"lastTriggeredAt",
"last_triggered_at",
];

use crate::{
alerts::{
AlertError, CURRENT_ALERTS_VERSION,
alert_enums::{
AlertOperator, AlertState, AlertTask, AlertType, AlertVersion, EvalConfig,
LogicalOperator, NotificationState, Severity, WhereConfigOperator,
},
alert_traits::AlertTrait,
target::{NotificationConfig, TARGETS},
},
metastore::metastore_traits::MetastoreObject,
query::resolve_stream_names,
storage::object_storage::{alert_json_path, alert_state_json_path},
};

/// Custom deserializer for DateTime<Utc> that handles legacy empty strings
///
/// This is a compatibility layer for migrating old alerts that stored empty strings
/// instead of valid timestamps. In production, this should log warnings to help
/// identify data quality issues.
///
/// # Migration Path
/// - Empty strings → Default to current time with a warning
/// - Missing fields → Default to current time
/// - Valid timestamps → Parse normally
pub fn deserialize_datetime_with_empty_string_fallback<'de, D>(
deserializer: D,
) -> Result<DateTime<Utc>, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum DateTimeOrString {
DateTime(DateTime<Utc>),
String(String),
}

match DateTimeOrString::deserialize(deserializer)? {
DateTimeOrString::DateTime(dt) => Ok(dt),
DateTimeOrString::String(s) => {
if s.is_empty() {
// Log warning about data quality issue
tracing::warn!(
"Alert has empty 'created' field - this indicates a data quality issue. \
Defaulting to current timestamp. Please investigate and fix the data source."
);
Ok(Utc::now())
} else {
s.parse::<DateTime<Utc>>().map_err(serde::de::Error::custom)
}
}
}
}

/// Default function for created timestamp - returns current time
/// This handles the case where created field is missing in deserialization
pub fn default_created_time() -> DateTime<Utc> {
Utc::now()
}

/// Helper struct for basic alert fields during migration
pub struct BasicAlertFields {
pub id: Ulid,
Expand Down Expand Up @@ -328,7 +290,7 @@ pub struct AlertRequest {
impl AlertRequest {
pub async fn into(self) -> Result<AlertConfig, AlertError> {
// Validate that other_fields doesn't contain reserved field names
if let Some(ref other_fields) = self.other_fields {
let other_fields = if let Some(mut other_fields) = self.other_fields {
// Limit other_fields to maximum 10 fields
if other_fields.len() > 10 {
return Err(AlertError::ValidationFailure(format!(
Expand All @@ -337,15 +299,20 @@ impl AlertRequest {
)));
}

for key in other_fields.keys() {
if RESERVED_FIELDS.contains(&key.as_str()) {
return Err(AlertError::ValidationFailure(format!(
"Field '{}' cannot be in other_fields as it's a reserved field name",
key
)));
for reserved in RESERVED_FIELDS {
if other_fields.remove(*reserved).is_some() {
tracing::warn!("Removed reserved field '{}' from other_fields", reserved);
}
}
}

if other_fields.is_empty() {
None
} else {
Some(other_fields)
}
} else {
None
};

// Validate that all target IDs exist
for id in &self.targets {
Expand All @@ -359,6 +326,8 @@ impl AlertRequest {
)));
}

let created_timestamp = Utc::now();

let config = AlertConfig {
version: AlertVersion::from(CURRENT_ALERTS_VERSION),
id: Ulid::new(),
Expand Down Expand Up @@ -396,11 +365,12 @@ impl AlertRequest {
state: AlertState::default(),
notification_state: NotificationState::Notify,
notification_config: self.notification_config,
created: Utc::now(),
created: created_timestamp,
tags: self.tags,
last_triggered_at: None,
other_fields: self.other_fields,
other_fields,
};

Ok(config)
}
}
Expand All @@ -424,10 +394,6 @@ pub struct AlertConfig {
pub state: AlertState,
pub notification_state: NotificationState,
pub notification_config: NotificationConfig,
#[serde(
default = "default_created_time",
deserialize_with = "deserialize_datetime_with_empty_string_fallback"
)]
pub created: DateTime<Utc>,
pub tags: Option<Vec<String>>,
pub last_triggered_at: Option<DateTime<Utc>>,
Expand Down Expand Up @@ -456,10 +422,6 @@ pub struct AlertConfigResponse {
pub state: AlertState,
pub notification_state: NotificationState,
pub notification_config: NotificationConfig,
#[serde(
default = "default_created_time",
deserialize_with = "deserialize_datetime_with_empty_string_fallback"
)]
pub created: DateTime<Utc>,
pub tags: Option<Vec<String>>,
pub last_triggered_at: Option<DateTime<Utc>>,
Expand All @@ -468,6 +430,25 @@ pub struct AlertConfigResponse {
}

impl AlertConfig {
/// Filters out reserved field names from other_fields
/// This prevents conflicts when flattening other_fields during serialization
pub fn sanitize_other_fields(&mut self) {
if let Some(ref mut other_fields) = self.other_fields {
for reserved in RESERVED_FIELDS {
if other_fields.remove(*reserved).is_some() {
tracing::warn!(
"Removed reserved field '{}' from other_fields during sanitization",
reserved
);
}
}

if other_fields.is_empty() {
self.other_fields = None;
}
}
}

pub fn to_response(self) -> AlertConfigResponse {
AlertConfigResponse {
version: self.version,
Expand Down
9 changes: 1 addition & 8 deletions src/alerts/alert_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ use crate::{
AlertConfig, AlertError, AlertState, AlertType, AlertVersion, EvalConfig, Severity,
ThresholdConfig,
alert_enums::NotificationState,
alert_structs::{
AlertStateEntry, GroupResult, default_created_time,
deserialize_datetime_with_empty_string_fallback,
},
alert_structs::{AlertStateEntry, GroupResult},
alert_traits::{AlertTrait, MessageCreation},
alerts_utils::{evaluate_condition, execute_alert_query, extract_time_range},
get_number_of_agg_exprs,
Expand Down Expand Up @@ -65,10 +62,6 @@ pub struct ThresholdAlert {
pub state: AlertState,
pub notification_state: NotificationState,
pub notification_config: NotificationConfig,
#[serde(
default = "default_created_time",
deserialize_with = "deserialize_datetime_with_empty_string_fallback"
)]
pub created: DateTime<Utc>,
pub tags: Option<Vec<String>>,
pub datasets: Vec<String>,
Expand Down
2 changes: 1 addition & 1 deletion src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub use crate::alerts::alert_enums::{
pub use crate::alerts::alert_structs::{
AlertConfig, AlertInfo, AlertRequest, AlertStateEntry, Alerts, AlertsInfo, AlertsInfoByState,
AlertsSummary, BasicAlertFields, Context, DeploymentInfo, RollingWindow, StateTransition,
ThresholdConfig, default_created_time, deserialize_datetime_with_empty_string_fallback,
ThresholdConfig,
};
use crate::alerts::alert_traits::{AlertManagerTrait, AlertTrait};
use crate::alerts::alert_types::ThresholdAlert;
Expand Down
Loading