Skip to content
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ tempfile = "3.20.0"
lazy_static = "1.4.0"
prost = "0.13.1"
dashmap = "6.1.0"
parking_lot = "0.12.5"
indexmap = { version = "2.13.0", features = ["serde"] }

[build-dependencies]
Expand Down
21 changes: 14 additions & 7 deletions src/alerts/alert_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,11 @@ pub struct BasicAlertFields {
pub severity: Severity,
}

pub type AlertMap = HashMap<Ulid, Box<dyn AlertTrait>>;

#[derive(Debug)]
pub struct Alerts {
pub alerts: RwLock<HashMap<Ulid, Box<dyn AlertTrait>>>,
pub alerts: RwLock<HashMap<String, AlertMap>>,
pub sender: mpsc::Sender<AlertTask>,
}

Expand Down Expand Up @@ -291,7 +293,7 @@ pub struct AlertRequest {
}

impl AlertRequest {
pub async fn into(self) -> Result<AlertConfig, AlertError> {
pub async fn into(self, tenant_id: Option<String>) -> Result<AlertConfig, AlertError> {
// Validate that other_fields doesn't contain reserved field names
let other_fields = if let Some(mut other_fields) = self.other_fields {
// Limit other_fields to maximum 10 fields
Expand Down Expand Up @@ -319,7 +321,7 @@ impl AlertRequest {

// Validate that all target IDs exist
for id in &self.targets {
TARGETS.get_target_by_id(id).await?;
TARGETS.get_target_by_id(id, &tenant_id).await?;
}
let datasets = resolve_stream_names(&self.query)?;

Expand Down Expand Up @@ -372,6 +374,7 @@ impl AlertRequest {
tags: self.tags,
last_triggered_at: None,
other_fields,
tenant_id,
};

Ok(config)
Expand Down Expand Up @@ -402,6 +405,7 @@ pub struct AlertConfig {
pub last_triggered_at: Option<DateTime<Utc>>,
#[serde(flatten)]
pub other_fields: Option<serde_json::Map<String, Value>>,
pub tenant_id: Option<String>,
}

#[derive(Debug, serde::Serialize, serde::Deserialize, Clone)]
Expand Down Expand Up @@ -714,6 +718,7 @@ pub struct DailyMTTRStats {
pub struct MTTRHistory {
/// Array of daily MTTR statistics
pub daily_stats: Vec<DailyMTTRStats>,
pub tenant_id: Option<String>,
}

/// Query parameters for MTTR API endpoint
Expand Down Expand Up @@ -767,6 +772,7 @@ pub struct AlertStateEntry {
/// The unique identifier for the alert
pub alert_id: Ulid,
pub states: Vec<StateTransition>,
pub tenant_id: Option<String>,
}

impl StateTransition {
Expand All @@ -781,10 +787,11 @@ impl StateTransition {

impl AlertStateEntry {
/// Creates a new alert state entry with an initial state
pub fn new(alert_id: Ulid, initial_state: AlertState) -> Self {
pub fn new(alert_id: Ulid, initial_state: AlertState, tenant_id: Option<String>) -> Self {
Self {
alert_id,
states: vec![StateTransition::new(initial_state)],
tenant_id,
}
}

Expand Down Expand Up @@ -876,7 +883,7 @@ impl MetastoreObject for AlertStateEntry {
}

fn get_object_path(&self) -> String {
alert_state_json_path(self.alert_id).to_string()
alert_state_json_path(self.alert_id, &self.tenant_id).to_string()
}
}

Expand All @@ -886,7 +893,7 @@ impl MetastoreObject for AlertConfig {
}

fn get_object_path(&self) -> String {
alert_json_path(self.id).to_string()
alert_json_path(self.id, &self.tenant_id).to_string()
}
}

Expand All @@ -896,6 +903,6 @@ impl MetastoreObject for MTTRHistory {
}

fn get_object_path(&self) -> String {
mttr_json_path().to_string()
mttr_json_path(&self.tenant_id).to_string()
}
}
24 changes: 19 additions & 5 deletions src/alerts/alert_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub trait AlertTrait: Debug + Send + Sync + MetastoreObject {
fn get_datasets(&self) -> &[String];
fn to_alert_config(&self) -> AlertConfig;
fn clone_box(&self) -> Box<dyn AlertTrait>;
fn get_tenant_id(&self) -> &Option<String>;
}

#[async_trait]
Expand All @@ -86,25 +87,38 @@ pub trait AlertManagerTrait: Send + Sync {
session: SessionKey,
tags: Vec<String>,
) -> Result<Vec<AlertConfig>, AlertError>;
async fn get_alert_by_id(&self, id: Ulid) -> Result<Box<dyn AlertTrait>, AlertError>;
async fn get_alert_by_id(
&self,
id: Ulid,
tenant_id: &Option<String>,
) -> Result<Box<dyn AlertTrait>, AlertError>;
async fn update(&self, alert: &dyn AlertTrait);
async fn update_state(
&self,
alert_id: Ulid,
new_state: AlertState,
trigger_notif: Option<String>,
tenant_id: &Option<String>,
) -> Result<(), AlertError>;
async fn update_notification_state(
&self,
alert_id: Ulid,
new_notification_state: NotificationState,
tenant_id: &Option<String>,
) -> Result<(), AlertError>;
async fn delete(&self, alert_id: Ulid) -> Result<(), AlertError>;
async fn get_state(&self, alert_id: Ulid) -> Result<AlertState, AlertError>;
async fn delete(&self, alert_id: Ulid, tenant_id: &Option<String>) -> Result<(), AlertError>;
async fn get_state(
&self,
alert_id: Ulid,
tenant_id: &Option<String>,
) -> Result<AlertState, AlertError>;
async fn start_task(&self, alert: Box<dyn AlertTrait>) -> Result<(), AlertError>;
async fn delete_task(&self, alert_id: Ulid) -> Result<(), AlertError>;
async fn list_tags(&self) -> Vec<String>;
async fn get_all_alerts(&self) -> HashMap<Ulid, Box<dyn AlertTrait>>;
async fn list_tags(&self, tenant_id: &Option<String>) -> Vec<String>;
async fn get_all_alerts(
&self,
tenant_id: &Option<String>,
) -> HashMap<Ulid, Box<dyn AlertTrait>>;
}

#[async_trait]
Expand Down
44 changes: 33 additions & 11 deletions src/alerts/alert_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

use std::{str::FromStr, time::Duration};

use actix_web::http::header::{HeaderMap, HeaderName, HeaderValue};
use chrono::{DateTime, Utc};
use serde_json::Value;
use tonic::async_trait;
Expand All @@ -41,6 +42,7 @@ use crate::{
query::resolve_stream_names,
rbac::map::SessionKey,
storage::object_storage::alert_json_path,
tenants::TENANT_METADATA,
utils::user_auth_for_query,
};

Expand Down Expand Up @@ -68,11 +70,12 @@ pub struct ThresholdAlert {
pub last_triggered_at: Option<DateTime<Utc>>,
#[serde(flatten)]
pub other_fields: Option<serde_json::Map<String, Value>>,
pub tenant_id: Option<String>,
}

impl MetastoreObject for ThresholdAlert {
fn get_object_path(&self) -> String {
alert_json_path(self.id).to_string()
alert_json_path(self.id, &self.tenant_id).to_string()
}

fn get_object_id(&self) -> String {
Expand All @@ -84,7 +87,20 @@ impl MetastoreObject for ThresholdAlert {
impl AlertTrait for ThresholdAlert {
async fn eval_alert(&self) -> Result<Option<String>, AlertError> {
let time_range = extract_time_range(&self.eval_config)?;
let query_result = execute_alert_query(self.get_query(), &time_range).await?;
let auth = if let Some(tenant) = self.tenant_id.as_ref()
&& let Some(header) = TENANT_METADATA.get_global_query_auth(tenant)
{
let mut map = HeaderMap::new();
map.insert(
HeaderName::from_static("authorization"),
HeaderValue::from_str(&header).unwrap(),
);
Some(map)
} else {
None
};
let query_result =
execute_alert_query(auth, self.get_query(), &time_range, &self.tenant_id).await?;

if query_result.is_simple_query {
// Handle simple queries
Expand Down Expand Up @@ -164,15 +180,15 @@ impl AlertTrait for ThresholdAlert {
"No tables found in query".into(),
));
}
create_streams_for_distributed(tables)
create_streams_for_distributed(tables, &self.tenant_id)
.await
.map_err(|_| AlertError::InvalidAlertQuery("Invalid tables".into()))?;

// validate that the user has access to the tables mentioned in the query
user_auth_for_query(session_key, &self.query).await?;

// validate that the alert query is valid and can be evaluated
let num_aggrs = get_number_of_agg_exprs(&self.query).await?;
let num_aggrs = get_number_of_agg_exprs(&self.query, &self.tenant_id).await?;
if num_aggrs != 1 {
return Err(AlertError::InvalidAlertQuery(format!(
"Found {num_aggrs} aggregate expressions, only 1 allowed"
Expand All @@ -191,7 +207,7 @@ impl AlertTrait for ThresholdAlert {
// update on disk
PARSEABLE
.metastore
.put_alert(&self.to_alert_config())
.put_alert(&self.to_alert_config(), &self.tenant_id)
.await?;
Ok(())
}
Expand All @@ -217,12 +233,12 @@ impl AlertTrait for ThresholdAlert {
// update on disk
PARSEABLE
.metastore
.put_alert(&self.to_alert_config())
.put_alert(&self.to_alert_config(), &self.tenant_id)
.await?;
let state_entry = AlertStateEntry::new(self.id, self.state);
let state_entry = AlertStateEntry::new(self.id, self.state, self.tenant_id.clone());
PARSEABLE
.metastore
.put_alert_state(&state_entry as &dyn MetastoreObject)
.put_alert_state(&state_entry as &dyn MetastoreObject, &self.tenant_id)
.await?;
return Ok(());
}
Expand Down Expand Up @@ -257,13 +273,13 @@ impl AlertTrait for ThresholdAlert {
// update on disk
PARSEABLE
.metastore
.put_alert(&self.to_alert_config())
.put_alert(&self.to_alert_config(), &self.tenant_id)
.await?;
let state_entry = AlertStateEntry::new(self.id, self.state);
let state_entry = AlertStateEntry::new(self.id, self.state, self.tenant_id.clone());

PARSEABLE
.metastore
.put_alert_state(&state_entry as &dyn MetastoreObject)
.put_alert_state(&state_entry as &dyn MetastoreObject, &self.tenant_id)
.await?;

if let Some(trigger_notif) = trigger_notif
Expand Down Expand Up @@ -337,6 +353,10 @@ impl AlertTrait for ThresholdAlert {
&self.datasets
}

fn get_tenant_id(&self) -> &Option<String> {
&self.tenant_id
}

fn to_alert_config(&self) -> AlertConfig {
let clone = self.clone();
clone.into()
Expand Down Expand Up @@ -414,6 +434,7 @@ impl From<AlertConfig> for ThresholdAlert {
datasets: value.datasets,
last_triggered_at: value.last_triggered_at,
other_fields: value.other_fields,
tenant_id: value.tenant_id,
}
}
}
Expand All @@ -438,6 +459,7 @@ impl From<ThresholdAlert> for AlertConfig {
datasets: val.datasets,
last_triggered_at: val.last_triggered_at,
other_fields: val.other_fields,
tenant_id: val.tenant_id,
}
}
}
Expand Down
Loading