Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 105 additions & 1 deletion src/alerts/alert_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,32 @@
use std::{collections::HashMap, time::Duration};

use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde::{Deserialize, Deserializer, Serialize};
use serde_json::Value;
use tokio::sync::{RwLock, mpsc};
use ulid::Ulid;

const RESERVED_FIELDS: &[&str] = &[
"id",
"version",
"severity",
"title",
"query",
"datasets",
"alertType",
"anomalyConfig",
"forecastConfig",
"thresholdConfig",
"notificationConfig",
"evalConfig",
"targets",
"tags",
"state",
"notificationState",
"created",
"lastTriggeredAt",
];

use crate::{
alerts::{
AlertError, CURRENT_ALERTS_VERSION,
Expand All @@ -38,6 +60,52 @@ use crate::{
storage::object_storage::{alert_json_path, alert_state_json_path},
};

/// Custom deserializer for DateTime<Utc> that handles legacy empty strings
///
/// This is a compatibility layer for migrating old alerts that stored empty strings
/// instead of valid timestamps. In production, this should log warnings to help
/// identify data quality issues.
///
/// # Migration Path
/// - Empty strings → Default to current time with a warning
/// - Missing fields → Default to current time
/// - Valid timestamps → Parse normally
pub fn deserialize_datetime_with_empty_string_fallback<'de, D>(
deserializer: D,
) -> Result<DateTime<Utc>, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(untagged)]
enum DateTimeOrString {
DateTime(DateTime<Utc>),
String(String),
}

match DateTimeOrString::deserialize(deserializer)? {
DateTimeOrString::DateTime(dt) => Ok(dt),
DateTimeOrString::String(s) => {
if s.is_empty() {
// Log warning about data quality issue
tracing::warn!(
"Alert has empty 'created' field - this indicates a data quality issue. \
Defaulting to current timestamp. Please investigate and fix the data source."
);
Ok(Utc::now())
} else {
s.parse::<DateTime<Utc>>().map_err(serde::de::Error::custom)
}
}
}
}

/// Default function for created timestamp - returns current time
/// This handles the case where created field is missing in deserialization
pub fn default_created_time() -> DateTime<Utc> {
Utc::now()
}

/// Helper struct for basic alert fields during migration
pub struct BasicAlertFields {
pub id: Ulid,
Expand Down Expand Up @@ -253,10 +321,32 @@ pub struct AlertRequest {
pub eval_config: EvalConfig,
pub targets: Vec<Ulid>,
pub tags: Option<Vec<String>>,
#[serde(flatten)]
pub other_fields: Option<serde_json::Map<String, Value>>,
}

impl AlertRequest {
pub async fn into(self) -> Result<AlertConfig, AlertError> {
// Validate that other_fields doesn't contain reserved field names
if let Some(ref other_fields) = self.other_fields {
// Limit other_fields to maximum 10 fields
if other_fields.len() > 10 {
return Err(AlertError::ValidationFailure(format!(
"other_fields can contain at most 10 fields, found {}",
other_fields.len()
)));
}

for key in other_fields.keys() {
if RESERVED_FIELDS.contains(&key.as_str()) {
return Err(AlertError::ValidationFailure(format!(
"Field '{}' cannot be in other_fields as it's a reserved field name",
key
)));
}
}
}

// Validate that all target IDs exist
for id in &self.targets {
TARGETS.get_target_by_id(id).await?;
Expand Down Expand Up @@ -309,6 +399,7 @@ impl AlertRequest {
created: Utc::now(),
tags: self.tags,
last_triggered_at: None,
other_fields: self.other_fields,
};
Ok(config)
}
Expand All @@ -333,9 +424,15 @@ pub struct AlertConfig {
pub state: AlertState,
pub notification_state: NotificationState,
pub notification_config: NotificationConfig,
#[serde(
default = "default_created_time",
deserialize_with = "deserialize_datetime_with_empty_string_fallback"
)]
pub created: DateTime<Utc>,
pub tags: Option<Vec<String>>,
pub last_triggered_at: Option<DateTime<Utc>>,
#[serde(flatten)]
pub other_fields: Option<serde_json::Map<String, Value>>,
}

#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
Expand All @@ -359,9 +456,15 @@ pub struct AlertConfigResponse {
pub state: AlertState,
pub notification_state: NotificationState,
pub notification_config: NotificationConfig,
#[serde(
default = "default_created_time",
deserialize_with = "deserialize_datetime_with_empty_string_fallback"
)]
pub created: DateTime<Utc>,
pub tags: Option<Vec<String>>,
pub last_triggered_at: Option<DateTime<Utc>>,
#[serde(flatten)]
pub other_fields: Option<serde_json::Map<String, Value>>,
}

impl AlertConfig {
Expand Down Expand Up @@ -401,6 +504,7 @@ impl AlertConfig {
created: self.created,
tags: self.tags,
last_triggered_at: self.last_triggered_at,
other_fields: self.other_fields,
}
}
}
Expand Down
14 changes: 13 additions & 1 deletion src/alerts/alert_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use std::{str::FromStr, time::Duration};

use chrono::{DateTime, Utc};
use serde_json::Value;
use tonic::async_trait;
use tracing::{info, trace, warn};
use ulid::Ulid;
Expand All @@ -28,7 +29,10 @@ use crate::{
AlertConfig, AlertError, AlertState, AlertType, AlertVersion, EvalConfig, Severity,
ThresholdConfig,
alert_enums::NotificationState,
alert_structs::{AlertStateEntry, GroupResult},
alert_structs::{
AlertStateEntry, GroupResult, default_created_time,
deserialize_datetime_with_empty_string_fallback,
},
alert_traits::{AlertTrait, MessageCreation},
alerts_utils::{evaluate_condition, execute_alert_query, extract_time_range},
get_number_of_agg_exprs,
Expand Down Expand Up @@ -61,10 +65,16 @@ pub struct ThresholdAlert {
pub state: AlertState,
pub notification_state: NotificationState,
pub notification_config: NotificationConfig,
#[serde(
default = "default_created_time",
deserialize_with = "deserialize_datetime_with_empty_string_fallback"
)]
pub created: DateTime<Utc>,
pub tags: Option<Vec<String>>,
pub datasets: Vec<String>,
pub last_triggered_at: Option<DateTime<Utc>>,
#[serde(flatten)]
pub other_fields: Option<serde_json::Map<String, Value>>,
}

impl MetastoreObject for ThresholdAlert {
Expand Down Expand Up @@ -408,6 +418,7 @@ impl From<AlertConfig> for ThresholdAlert {
tags: value.tags,
datasets: value.datasets,
last_triggered_at: value.last_triggered_at,
other_fields: value.other_fields,
}
}
}
Expand All @@ -431,6 +442,7 @@ impl From<ThresholdAlert> for AlertConfig {
tags: val.tags,
datasets: val.datasets,
last_triggered_at: val.last_triggered_at,
other_fields: val.other_fields,
}
}
}
Expand Down
9 changes: 8 additions & 1 deletion src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ pub use crate::alerts::alert_enums::{
pub use crate::alerts::alert_structs::{
AlertConfig, AlertInfo, AlertRequest, AlertStateEntry, Alerts, AlertsInfo, AlertsInfoByState,
AlertsSummary, BasicAlertFields, Context, DeploymentInfo, RollingWindow, StateTransition,
ThresholdConfig,
ThresholdConfig, default_created_time, deserialize_datetime_with_empty_string_fallback,
};
use crate::alerts::alert_traits::{AlertManagerTrait, AlertTrait};
use crate::alerts::alert_types::ThresholdAlert;
Expand Down Expand Up @@ -134,6 +134,7 @@ impl AlertConfig {
created: Utc::now(),
tags: None,
last_triggered_at: None,
other_fields: None,
};

// Save the migrated alert back to storage
Expand Down Expand Up @@ -682,6 +683,12 @@ impl AlertConfig {
);
}

if let Some(other_fields) = &self.other_fields {
for (key, value) in other_fields {
map.insert(key.clone(), value.clone());
}
}

map
}
}
Expand Down
Loading
Loading