Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 74 additions & 1 deletion src/alerts/alert_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::{
},
metastore::metastore_traits::MetastoreObject,
query::resolve_stream_names,
storage::object_storage::alert_json_path,
storage::object_storage::{alert_json_path, alert_state_json_path},
};

/// Helper struct for basic alert fields during migration
Expand Down Expand Up @@ -520,6 +520,79 @@ pub struct NotificationStateRequest {
pub state: String,
}

/// Represents a single state transition
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct StateTransition {
/// The alert state
pub state: AlertState,
/// Timestamp when this state was set/updated
pub last_updated_at: DateTime<Utc>,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct AlertStateEntry {
/// The unique identifier for the alert
pub alert_id: Ulid,
pub states: Vec<StateTransition>,
}

impl StateTransition {
/// Creates a new state transition with the current timestamp
pub fn new(state: AlertState) -> Self {
Self {
state,
last_updated_at: Utc::now(),
}
}
}

impl AlertStateEntry {
/// Creates a new alert state entry with an initial state
pub fn new(alert_id: Ulid, initial_state: AlertState) -> Self {
Self {
alert_id,
states: vec![StateTransition::new(initial_state)],
}
}

/// Updates the state (only adds new entry if state has changed)
/// Returns true if the state was changed, false if it remained the same
pub fn update_state(&mut self, new_state: AlertState) -> bool {
match self.states.last() {
Some(last_transition) => {
if last_transition.state != new_state {
// State changed - add new transition
self.states.push(StateTransition::new(new_state));
true
} else {
// If state hasn't changed, do nothing - preserve the original timestamp
false
}
}
None => {
// No previous states - add the first one
self.states.push(StateTransition::new(new_state));
true
}
}
}

/// Gets the current (latest) state
pub fn current_state(&self) -> Option<&StateTransition> {
self.states.last()
}
}

impl MetastoreObject for AlertStateEntry {
fn get_object_id(&self) -> String {
self.alert_id.to_string()
}

fn get_object_path(&self) -> String {
alert_state_json_path(self.alert_id).to_string()
}
}

impl MetastoreObject for AlertConfig {
fn get_object_id(&self) -> String {
self.id.to_string()
Expand Down
14 changes: 12 additions & 2 deletions src/alerts/alert_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::{
AlertConfig, AlertError, AlertState, AlertType, AlertVersion, EvalConfig, Severity,
ThresholdConfig,
alert_enums::NotificationState,
alert_structs::GroupResult,
alert_structs::{AlertStateEntry, GroupResult},
alert_traits::{AlertTrait, MessageCreation},
alerts_utils::{evaluate_condition, execute_alert_query, extract_time_range},
get_number_of_agg_exprs,
Expand Down Expand Up @@ -216,7 +216,11 @@ impl AlertTrait for ThresholdAlert {
.metastore
.put_alert(&self.to_alert_config())
.await?;
// The task should have already been removed from the list of running tasks
let state_entry = AlertStateEntry::new(self.id, self.state);
PARSEABLE
.metastore
.put_alert_state(&state_entry as &dyn MetastoreObject)
.await?;
return Ok(());
}

Expand Down Expand Up @@ -252,6 +256,12 @@ impl AlertTrait for ThresholdAlert {
.metastore
.put_alert(&self.to_alert_config())
.await?;
let state_entry = AlertStateEntry::new(self.id, self.state);

PARSEABLE
.metastore
.put_alert_state(&state_entry as &dyn MetastoreObject)
.await?;

if trigger_notif.is_some() && self.notification_state.eq(&NotificationState::Notify) {
trace!("trigger notif on-\n{}", self.state);
Expand Down
5 changes: 3 additions & 2 deletions src/alerts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ pub use crate::alerts::alert_enums::{
LogicalOperator, NotificationState, Severity, WhereConfigOperator,
};
pub use crate::alerts::alert_structs::{
AlertConfig, AlertInfo, AlertRequest, Alerts, AlertsInfo, AlertsInfoByState, AlertsSummary,
BasicAlertFields, Context, DeploymentInfo, RollingWindow, ThresholdConfig,
AlertConfig, AlertInfo, AlertRequest, AlertStateEntry, Alerts, AlertsInfo, AlertsInfoByState,
AlertsSummary, BasicAlertFields, Context, DeploymentInfo, RollingWindow, StateTransition,
ThresholdConfig,
};
use crate::alerts::alert_traits::{AlertManagerTrait, AlertTrait};
use crate::alerts::alert_types::ThresholdAlert;
Expand Down
17 changes: 16 additions & 1 deletion src/handlers/http/alerts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ use crate::{
alerts::{
ALERTS, AlertError, AlertState, Severity,
alert_enums::{AlertType, NotificationState},
alert_structs::{AlertConfig, AlertRequest, NotificationStateRequest},
alert_structs::{AlertConfig, AlertRequest, AlertStateEntry, NotificationStateRequest},
alert_traits::AlertTrait,
alert_types::ThresholdAlert,
target::Retry,
},
metastore::metastore_traits::MetastoreObject,
parseable::PARSEABLE,
utils::{actix::extract_session_key_from_req, user_auth_for_query},
};
Expand Down Expand Up @@ -214,6 +215,13 @@ pub async fn post(
.put_alert(&alert.to_alert_config())
.await?;

// create initial alert state entry (default to NotTriggered)
let state_entry = AlertStateEntry::new(*alert.get_id(), AlertState::NotTriggered);
PARSEABLE
.metastore
.put_alert_state(&state_entry as &dyn MetastoreObject)
.await?;

// update in memory
alerts.update(alert).await;

Expand Down Expand Up @@ -262,6 +270,13 @@ pub async fn delete(req: HttpRequest, alert_id: Path<Ulid>) -> Result<impl Respo

PARSEABLE.metastore.delete_alert(&*alert).await?;

// delete the associated alert state
let state_to_delete = AlertStateEntry::new(alert_id, AlertState::NotTriggered); // state doesn't matter for deletion
PARSEABLE
.metastore
.delete_alert_state(&state_to_delete as &dyn MetastoreObject)
.await?;

// delete from memory
alerts.delete(alert_id).await?;

Expand Down
18 changes: 16 additions & 2 deletions src/metastore/metastore_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,15 @@ use chrono::{DateTime, Utc};
use dashmap::DashMap;
use erased_serde::Serialize as ErasedSerialize;
use tonic::async_trait;
use ulid::Ulid;

use crate::{
alerts::target::Target, catalog::manifest::Manifest, handlers::http::modal::NodeType,
metastore::MetastoreError, option::Mode, users::filters::Filter,
alerts::{alert_structs::AlertStateEntry, target::Target},
catalog::manifest::Manifest,
handlers::http::modal::NodeType,
metastore::MetastoreError,
option::Mode,
users::filters::Filter,
};

/// A metastore is a logically separated compartment to store metadata for Parseable.
Expand All @@ -44,6 +49,15 @@ pub trait Metastore: std::fmt::Debug + Send + Sync {
async fn put_alert(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>;
async fn delete_alert(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>;

/// alerts state
async fn get_alert_states(&self) -> Result<Vec<AlertStateEntry>, MetastoreError>;
async fn get_alert_state_entry(
&self,
alert_id: &Ulid,
) -> Result<Option<AlertStateEntry>, MetastoreError>;
async fn put_alert_state(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>;
async fn delete_alert_state(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>;

/// llmconfig
async fn get_llmconfigs(&self) -> Result<Vec<Bytes>, MetastoreError>;
async fn put_llmconfig(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>;
Expand Down
102 changes: 99 additions & 3 deletions src/metastore/metastores/object_store_metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tracing::warn;
use ulid::Ulid;

use crate::{
alerts::target::Target,
alerts::{alert_structs::AlertStateEntry, target::Target},
catalog::{manifest::Manifest, partition_path},
handlers::http::{
modal::{Metadata, NodeMetadata, NodeType},
Expand All @@ -49,8 +49,8 @@ use crate::{
SETTINGS_ROOT_DIRECTORY, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY,
TARGETS_ROOT_DIRECTORY,
object_storage::{
alert_json_path, filter_path, manifest_path, parseable_json_path, schema_path,
stream_json_path, to_bytes,
alert_json_path, alert_state_json_path, filter_path, manifest_path,
parseable_json_path, schema_path, stream_json_path, to_bytes,
},
},
users::filters::{Filter, migrate_v1_v2},
Expand Down Expand Up @@ -115,6 +115,102 @@ impl Metastore for ObjectStoreMetastore {
.await?)
}

/// alerts state
async fn get_alert_states(&self) -> Result<Vec<AlertStateEntry>, MetastoreError> {
let base_path = RelativePathBuf::from_iter([ALERTS_ROOT_DIRECTORY]);
let alert_state_bytes = self
.storage
.get_objects(
Some(&base_path),
Box::new(|file_name| {
file_name.starts_with("alert_state_") && file_name.ends_with(".json")
}),
)
.await?;

let mut alert_states = Vec::new();
for bytes in alert_state_bytes {
if let Ok(entry) = serde_json::from_slice::<AlertStateEntry>(&bytes) {
alert_states.push(entry);
}
}
Ok(alert_states)
}

async fn get_alert_state_entry(
&self,
alert_id: &Ulid,
) -> Result<Option<AlertStateEntry>, MetastoreError> {
let path = alert_state_json_path(*alert_id);
match self.storage.get_object(&path).await {
Ok(bytes) => {
if let Ok(entry) = serde_json::from_slice::<AlertStateEntry>(&bytes) {
Ok(Some(entry))
} else {
Ok(None)
}
}
Err(ObjectStorageError::NoSuchKey(_)) => Ok(None),
Err(e) => Err(MetastoreError::ObjectStorageError(e)),
}
}

async fn put_alert_state(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError> {
let id = Ulid::from_string(&obj.get_object_id()).map_err(|e| MetastoreError::Error {
status_code: StatusCode::BAD_REQUEST,
message: e.to_string(),
flow: "put_alert_state".into(),
})?;
let path = alert_state_json_path(id);

// Parse the new state entry from the MetastoreObject
let new_state_entry: AlertStateEntry = serde_json::from_slice(&to_bytes(obj))?;
let new_state = new_state_entry
.current_state()
.ok_or_else(|| MetastoreError::InvalidJsonStructure {
expected: "AlertStateEntry with at least one state".to_string(),
found: "AlertStateEntry with empty states".to_string(),
})?
.state;

// Try to read existing file
let mut alert_entry = match self.storage.get_object(&path).await {
Ok(existing_bytes) => {
if let Ok(entry) = serde_json::from_slice::<AlertStateEntry>(&existing_bytes) {
entry
} else {
// Create new entry if parsing fails or file doesn't exist
AlertStateEntry::new(id, new_state)
}
}
Err(_) => {
// File doesn't exist, create new entry
AlertStateEntry::new(id, new_state)
}
};

// Update the state and only save if it actually changed
let state_changed = alert_entry.update_state(new_state);

if state_changed {
let updated_bytes =
serde_json::to_vec(&alert_entry).map_err(MetastoreError::JsonParseError)?;

self.storage.put_object(&path, updated_bytes.into()).await?;
}

Ok(())
}

/// Delete an alert state file
async fn delete_alert_state(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError> {
let path = obj.get_object_path();
Ok(self
.storage
.delete_object(&RelativePathBuf::from(path))
.await?)
}

/// This function fetches all the llmconfigs from the underlying object store
async fn get_llmconfigs(&self) -> Result<Vec<Bytes>, MetastoreError> {
let base_path = RelativePathBuf::from_iter([SETTINGS_ROOT_DIRECTORY, "llmconfigs"]);
Expand Down
32 changes: 25 additions & 7 deletions src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,31 @@ impl ObjectStorage for LocalFS {
),
)))
}
async fn head(&self, _path: &RelativePath) -> Result<ObjectMeta, ObjectStorageError> {
Err(ObjectStorageError::UnhandledError(Box::new(
std::io::Error::new(
std::io::ErrorKind::Unsupported,
"Head operation not implemented for LocalFS yet",
),
)))
async fn head(&self, path: &RelativePath) -> Result<ObjectMeta, ObjectStorageError> {
let file_path = self.path_in_root(path);

// Check if file exists and get metadata
match fs::metadata(&file_path).await {
Ok(metadata) => {
// Convert the relative path to object store path format
let location = object_store::path::Path::from(path.as_str());

// Create ObjectMeta with file information
let object_meta = ObjectMeta {
location,
last_modified: metadata
.modified()
.map_err(ObjectStorageError::IoError)?
.into(),
size: metadata.len() as usize,
e_tag: None,
version: None,
};

Ok(object_meta)
}
Err(e) => Err(ObjectStorageError::IoError(e)),
}
}
async fn get_object(&self, path: &RelativePath) -> Result<Bytes, ObjectStorageError> {
let file_path;
Expand Down
10 changes: 10 additions & 0 deletions src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1142,6 +1142,16 @@ pub fn target_json_path(target_id: &Ulid) -> RelativePathBuf {
])
}

/// Constructs the path for storing alert state JSON files
/// Format: ".parseable/alerts/alert_state_{alert_id}.json"
#[inline(always)]
pub fn alert_state_json_path(alert_id: Ulid) -> RelativePathBuf {
RelativePathBuf::from_iter([
ALERTS_ROOT_DIRECTORY,
&format!("alert_state_{alert_id}.json"),
])
}

#[inline(always)]
pub fn manifest_path(prefix: &str) -> RelativePathBuf {
let hostname = hostname::get()
Expand Down
Loading