Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 28 additions & 6 deletions src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use crate::handlers::http::modal::ingest::SyncRole;
use crate::handlers::http::query::{Query, QueryError, TIME_ELAPSED_HEADER};
use crate::metrics::prom_utils::Metrics;
use crate::option::Mode;
use crate::parseable::PARSEABLE;
use crate::parseable::{DEFAULT_TENANT, PARSEABLE};
use crate::rbac::role::model::Role;
use crate::rbac::user::User;
use crate::stats::Stats;
Expand Down Expand Up @@ -85,13 +85,16 @@ pub struct BillingMetricEvent {
pub provider: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub model: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tenant_id: Option<String>,
pub event_type: String,
pub event_time: chrono::NaiveDateTime,
}

// Internal structure for collecting metrics from prometheus
#[derive(Debug, Default)]
struct BillingMetricsCollector {
pub tenant_id: Option<String>,
pub node_address: String,
pub node_type: String,
pub total_events_ingested_by_date: HashMap<String, u64>,
Expand All @@ -116,8 +119,9 @@ struct BillingMetricsCollector {
}

impl BillingMetricsCollector {
pub fn new(node_address: String, node_type: String) -> Self {
pub fn new(node_address: String, node_type: String, tenant_id: Option<String>) -> Self {
Self {
tenant_id,
node_address,
node_type,
event_time: Utc::now().naive_utc(),
Expand Down Expand Up @@ -153,6 +157,7 @@ impl BillingMetricsCollector {
method: None,
provider: None,
model: None,
tenant_id: self.tenant_id.clone(),
event_type: "billing-metrics".to_string(),
event_time: self.event_time,
});
Expand Down Expand Up @@ -282,6 +287,7 @@ impl BillingMetricsCollector {
method: Some(method.clone()),
provider: None,
model: None,
tenant_id: self.tenant_id.clone(),
event_type: "billing-metrics".to_string(),
event_time: self.event_time,
});
Expand Down Expand Up @@ -320,6 +326,7 @@ impl BillingMetricsCollector {
method: None,
provider: Some(provider.clone()),
model: Some(model.clone()),
tenant_id: self.tenant_id.clone(),
event_type: "billing-metrics".to_string(),
event_time: self.event_time,
});
Expand Down Expand Up @@ -1382,16 +1389,31 @@ fn extract_billing_metrics_from_samples(
node_address: String,
node_type: String,
) -> Vec<BillingMetricEvent> {
let mut collector = BillingMetricsCollector::new(node_address, node_type);
// Group samples by tenant_id so each tenant gets its own collector
let mut collectors: HashMap<Option<String>, BillingMetricsCollector> = HashMap::new();

for sample in samples {
if let prometheus_parse::Value::Counter(val) = sample.value {
process_sample(&mut collector, &sample, val);
// Extract tenant_id from labels; treat "DEFAULT_TENANT" or absent as None (single-tenant compat)
let tenant_id = sample
.labels
.get("tenant_id")
.filter(|t| *t != DEFAULT_TENANT)
.map(|t| t.to_string());

let collector = collectors.entry(tenant_id.clone()).or_insert_with(|| {
BillingMetricsCollector::new(node_address.clone(), node_type.clone(), tenant_id)
});

process_sample(collector, &sample, val);
}
}

// Convert to flattened events, excluding empty collections
collector.into_events()
// Convert all collectors to flattened events
collectors
.into_values()
.flat_map(|c| c.into_events())
.collect()
}

/// Process a single prometheus sample and update the collector
Expand Down
12 changes: 12 additions & 0 deletions src/storage/store_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ pub struct StorageMetadata {
pub default_role: Option<String>,
pub suspended_services: Option<HashSet<Service>>,
pub global_query_auth: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub customer_name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub start_date: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub end_date: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub plan: Option<String>,
}

impl Default for StorageMetadata {
Expand All @@ -88,6 +96,10 @@ impl Default for StorageMetadata {
default_role: None,
suspended_services: None,
global_query_auth: None,
customer_name: None,
start_date: None,
end_date: None,
plan: None,
}
}
}
Expand Down
23 changes: 23 additions & 0 deletions src/tenants/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,29 @@ impl TenantMetadata {
);
}

pub fn get_tenant_meta(&self, tenant_id: &str) -> Option<StorageMetadata> {
self.tenants.get(tenant_id).map(|t| t.meta.clone())
}

pub fn update_tenant_meta(
&self,
tenant_id: &str,
customer_name: Option<String>,
start_date: Option<String>,
end_date: Option<String>,
plan: Option<String>,
) -> bool {
if let Some(mut tenant) = self.tenants.get_mut(tenant_id) {
tenant.meta.customer_name = customer_name;
tenant.meta.start_date = start_date;
tenant.meta.end_date = end_date;
tenant.meta.plan = plan;
true
} else {
false
}
}

pub fn get_global_query_auth(&self, tenant_id: &str) -> Option<String> {
if let Some(tenant) = self.tenants.get(tenant_id) {
tenant.meta.global_query_auth.clone()
Expand Down
Loading