diff --git a/src/alerts/alert_structs.rs b/src/alerts/alert_structs.rs index 5b60a6150..36b894da6 100644 --- a/src/alerts/alert_structs.rs +++ b/src/alerts/alert_structs.rs @@ -35,7 +35,7 @@ use crate::{ }, metastore::metastore_traits::MetastoreObject, query::resolve_stream_names, - storage::object_storage::alert_json_path, + storage::object_storage::{alert_json_path, alert_state_json_path}, }; /// Helper struct for basic alert fields during migration @@ -520,6 +520,79 @@ pub struct NotificationStateRequest { pub state: String, } +/// Represents a single state transition +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct StateTransition { + /// The alert state + pub state: AlertState, + /// Timestamp when this state was set/updated + pub last_updated_at: DateTime, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct AlertStateEntry { + /// The unique identifier for the alert + pub alert_id: Ulid, + pub states: Vec, +} + +impl StateTransition { + /// Creates a new state transition with the current timestamp + pub fn new(state: AlertState) -> Self { + Self { + state, + last_updated_at: Utc::now(), + } + } +} + +impl AlertStateEntry { + /// Creates a new alert state entry with an initial state + pub fn new(alert_id: Ulid, initial_state: AlertState) -> Self { + Self { + alert_id, + states: vec![StateTransition::new(initial_state)], + } + } + + /// Updates the state (only adds new entry if state has changed) + /// Returns true if the state was changed, false if it remained the same + pub fn update_state(&mut self, new_state: AlertState) -> bool { + match self.states.last() { + Some(last_transition) => { + if last_transition.state != new_state { + // State changed - add new transition + self.states.push(StateTransition::new(new_state)); + true + } else { + // If state hasn't changed, do nothing - preserve the original timestamp + false + } + } + None => { + // No previous states - add the first one + self.states.push(StateTransition::new(new_state)); + true + } + } + } + + /// Gets the current (latest) state + pub fn current_state(&self) -> Option<&StateTransition> { + self.states.last() + } +} + +impl MetastoreObject for AlertStateEntry { + fn get_object_id(&self) -> String { + self.alert_id.to_string() + } + + fn get_object_path(&self) -> String { + alert_state_json_path(self.alert_id).to_string() + } +} + impl MetastoreObject for AlertConfig { fn get_object_id(&self) -> String { self.id.to_string() diff --git a/src/alerts/alert_types.rs b/src/alerts/alert_types.rs index 00d96424b..a2755e90a 100644 --- a/src/alerts/alert_types.rs +++ b/src/alerts/alert_types.rs @@ -28,7 +28,7 @@ use crate::{ AlertConfig, AlertError, AlertState, AlertType, AlertVersion, EvalConfig, Severity, ThresholdConfig, alert_enums::NotificationState, - alert_structs::GroupResult, + alert_structs::{AlertStateEntry, GroupResult}, alert_traits::{AlertTrait, MessageCreation}, alerts_utils::{evaluate_condition, execute_alert_query, extract_time_range}, get_number_of_agg_exprs, @@ -216,7 +216,11 @@ impl AlertTrait for ThresholdAlert { .metastore .put_alert(&self.to_alert_config()) .await?; - // The task should have already been removed from the list of running tasks + let state_entry = AlertStateEntry::new(self.id, self.state); + PARSEABLE + .metastore + .put_alert_state(&state_entry as &dyn MetastoreObject) + .await?; return Ok(()); } @@ -252,6 +256,12 @@ impl AlertTrait for ThresholdAlert { .metastore .put_alert(&self.to_alert_config()) .await?; + let state_entry = AlertStateEntry::new(self.id, self.state); + + PARSEABLE + .metastore + .put_alert_state(&state_entry as &dyn MetastoreObject) + .await?; if trigger_notif.is_some() && self.notification_state.eq(&NotificationState::Notify) { trace!("trigger notif on-\n{}", self.state); diff --git a/src/alerts/mod.rs b/src/alerts/mod.rs index 7faefdcb9..541ad6e35 100644 --- a/src/alerts/mod.rs +++ b/src/alerts/mod.rs @@ -49,8 +49,9 @@ pub use crate::alerts::alert_enums::{ LogicalOperator, NotificationState, Severity, WhereConfigOperator, }; pub use crate::alerts::alert_structs::{ - AlertConfig, AlertInfo, AlertRequest, Alerts, AlertsInfo, AlertsInfoByState, AlertsSummary, - BasicAlertFields, Context, DeploymentInfo, RollingWindow, ThresholdConfig, + AlertConfig, AlertInfo, AlertRequest, AlertStateEntry, Alerts, AlertsInfo, AlertsInfoByState, + AlertsSummary, BasicAlertFields, Context, DeploymentInfo, RollingWindow, StateTransition, + ThresholdConfig, }; use crate::alerts::alert_traits::{AlertManagerTrait, AlertTrait}; use crate::alerts::alert_types::ThresholdAlert; diff --git a/src/handlers/http/alerts.rs b/src/handlers/http/alerts.rs index 9ddff929e..be70fb05a 100644 --- a/src/handlers/http/alerts.rs +++ b/src/handlers/http/alerts.rs @@ -22,11 +22,12 @@ use crate::{ alerts::{ ALERTS, AlertError, AlertState, Severity, alert_enums::{AlertType, NotificationState}, - alert_structs::{AlertConfig, AlertRequest, NotificationStateRequest}, + alert_structs::{AlertConfig, AlertRequest, AlertStateEntry, NotificationStateRequest}, alert_traits::AlertTrait, alert_types::ThresholdAlert, target::Retry, }, + metastore::metastore_traits::MetastoreObject, parseable::PARSEABLE, utils::{actix::extract_session_key_from_req, user_auth_for_query}, }; @@ -214,6 +215,13 @@ pub async fn post( .put_alert(&alert.to_alert_config()) .await?; + // create initial alert state entry (default to NotTriggered) + let state_entry = AlertStateEntry::new(*alert.get_id(), AlertState::NotTriggered); + PARSEABLE + .metastore + .put_alert_state(&state_entry as &dyn MetastoreObject) + .await?; + // update in memory alerts.update(alert).await; @@ -262,6 +270,13 @@ pub async fn delete(req: HttpRequest, alert_id: Path) -> Result Result<(), MetastoreError>; async fn delete_alert(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>; + /// alerts state + async fn get_alert_states(&self) -> Result, MetastoreError>; + async fn get_alert_state_entry( + &self, + alert_id: &Ulid, + ) -> Result, MetastoreError>; + async fn put_alert_state(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>; + async fn delete_alert_state(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>; + /// llmconfig async fn get_llmconfigs(&self) -> Result, MetastoreError>; async fn put_llmconfig(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>; diff --git a/src/metastore/metastores/object_store_metastore.rs b/src/metastore/metastores/object_store_metastore.rs index 1d6db06c1..0f2dfbdde 100644 --- a/src/metastore/metastores/object_store_metastore.rs +++ b/src/metastore/metastores/object_store_metastore.rs @@ -32,7 +32,7 @@ use tracing::warn; use ulid::Ulid; use crate::{ - alerts::target::Target, + alerts::{alert_structs::AlertStateEntry, target::Target}, catalog::{manifest::Manifest, partition_path}, handlers::http::{ modal::{Metadata, NodeMetadata, NodeType}, @@ -49,8 +49,8 @@ use crate::{ SETTINGS_ROOT_DIRECTORY, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, TARGETS_ROOT_DIRECTORY, object_storage::{ - alert_json_path, filter_path, manifest_path, parseable_json_path, schema_path, - stream_json_path, to_bytes, + alert_json_path, alert_state_json_path, filter_path, manifest_path, + parseable_json_path, schema_path, stream_json_path, to_bytes, }, }, users::filters::{Filter, migrate_v1_v2}, @@ -115,6 +115,102 @@ impl Metastore for ObjectStoreMetastore { .await?) } + /// alerts state + async fn get_alert_states(&self) -> Result, MetastoreError> { + let base_path = RelativePathBuf::from_iter([ALERTS_ROOT_DIRECTORY]); + let alert_state_bytes = self + .storage + .get_objects( + Some(&base_path), + Box::new(|file_name| { + file_name.starts_with("alert_state_") && file_name.ends_with(".json") + }), + ) + .await?; + + let mut alert_states = Vec::new(); + for bytes in alert_state_bytes { + if let Ok(entry) = serde_json::from_slice::(&bytes) { + alert_states.push(entry); + } + } + Ok(alert_states) + } + + async fn get_alert_state_entry( + &self, + alert_id: &Ulid, + ) -> Result, MetastoreError> { + let path = alert_state_json_path(*alert_id); + match self.storage.get_object(&path).await { + Ok(bytes) => { + if let Ok(entry) = serde_json::from_slice::(&bytes) { + Ok(Some(entry)) + } else { + Ok(None) + } + } + Err(ObjectStorageError::NoSuchKey(_)) => Ok(None), + Err(e) => Err(MetastoreError::ObjectStorageError(e)), + } + } + + async fn put_alert_state(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError> { + let id = Ulid::from_string(&obj.get_object_id()).map_err(|e| MetastoreError::Error { + status_code: StatusCode::BAD_REQUEST, + message: e.to_string(), + flow: "put_alert_state".into(), + })?; + let path = alert_state_json_path(id); + + // Parse the new state entry from the MetastoreObject + let new_state_entry: AlertStateEntry = serde_json::from_slice(&to_bytes(obj))?; + let new_state = new_state_entry + .current_state() + .ok_or_else(|| MetastoreError::InvalidJsonStructure { + expected: "AlertStateEntry with at least one state".to_string(), + found: "AlertStateEntry with empty states".to_string(), + })? + .state; + + // Try to read existing file + let mut alert_entry = match self.storage.get_object(&path).await { + Ok(existing_bytes) => { + if let Ok(entry) = serde_json::from_slice::(&existing_bytes) { + entry + } else { + // Create new entry if parsing fails or file doesn't exist + AlertStateEntry::new(id, new_state) + } + } + Err(_) => { + // File doesn't exist, create new entry + AlertStateEntry::new(id, new_state) + } + }; + + // Update the state and only save if it actually changed + let state_changed = alert_entry.update_state(new_state); + + if state_changed { + let updated_bytes = + serde_json::to_vec(&alert_entry).map_err(MetastoreError::JsonParseError)?; + + self.storage.put_object(&path, updated_bytes.into()).await?; + } + + Ok(()) + } + + /// Delete an alert state file + async fn delete_alert_state(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError> { + let path = obj.get_object_path(); + Ok(self + .storage + .delete_object(&RelativePathBuf::from(path)) + .await?) + } + /// This function fetches all the llmconfigs from the underlying object store async fn get_llmconfigs(&self) -> Result, MetastoreError> { let base_path = RelativePathBuf::from_iter([SETTINGS_ROOT_DIRECTORY, "llmconfigs"]); diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index 92b74434b..33b2abeb6 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -127,13 +127,31 @@ impl ObjectStorage for LocalFS { ), ))) } - async fn head(&self, _path: &RelativePath) -> Result { - Err(ObjectStorageError::UnhandledError(Box::new( - std::io::Error::new( - std::io::ErrorKind::Unsupported, - "Head operation not implemented for LocalFS yet", - ), - ))) + async fn head(&self, path: &RelativePath) -> Result { + let file_path = self.path_in_root(path); + + // Check if file exists and get metadata + match fs::metadata(&file_path).await { + Ok(metadata) => { + // Convert the relative path to object store path format + let location = object_store::path::Path::from(path.as_str()); + + // Create ObjectMeta with file information + let object_meta = ObjectMeta { + location, + last_modified: metadata + .modified() + .map_err(ObjectStorageError::IoError)? + .into(), + size: metadata.len() as usize, + e_tag: None, + version: None, + }; + + Ok(object_meta) + } + Err(e) => Err(ObjectStorageError::IoError(e)), + } } async fn get_object(&self, path: &RelativePath) -> Result { let file_path; diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 225244bb9..b1abc5e52 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -1142,6 +1142,16 @@ pub fn target_json_path(target_id: &Ulid) -> RelativePathBuf { ]) } +/// Constructs the path for storing alert state JSON files +/// Format: ".parseable/alerts/alert_state_{alert_id}.json" +#[inline(always)] +pub fn alert_state_json_path(alert_id: Ulid) -> RelativePathBuf { + RelativePathBuf::from_iter([ + ALERTS_ROOT_DIRECTORY, + &format!("alert_state_{alert_id}.json"), + ]) +} + #[inline(always)] pub fn manifest_path(prefix: &str) -> RelativePathBuf { let hostname = hostname::get()