Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,12 +332,11 @@ async fn build_metrics() -> HashMap<String, Value> {
cpu.cpu_usage().into(),
);
}

metrics
}

pub fn init_analytics_scheduler() -> anyhow::Result<()> {
info!("Setting up schedular for anonymous user analytics");
info!("Setting up scheduler for anonymous user analytics");

let mut scheduler = AsyncScheduler::new();
scheduler
Expand Down
89 changes: 2 additions & 87 deletions src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,16 @@ use actix_web::http::header::{self, HeaderMap};
use actix_web::web::Path;
use bytes::Bytes;
use chrono::Utc;
use clokwerk::{AsyncScheduler, Interval};
use http::{StatusCode, header as http_header};
use itertools::Itertools;
use serde::de::{DeserializeOwned, Error};
use serde_json::error::Error as SerdeError;
use serde_json::{Value as JsonValue, to_vec};
use tracing::{error, info, warn};
use tracing::{error, warn};
use url::Url;
use utils::{IngestionStats, QueriedStats, StorageStats, check_liveness, to_url_string};

use crate::INTRA_CLUSTER_CLIENT;
use crate::handlers::http::ingest::ingest_internal_stream;
use crate::handlers::http::query::{Query, QueryError, TIME_ELAPSED_HEADER};
use crate::metrics::prom_utils::Metrics;
use crate::option::Mode;
Expand All @@ -61,8 +59,6 @@ use super::role::RoleError;
pub const PMETA_STREAM_NAME: &str = "pmeta";
pub const BILLING_METRICS_STREAM_NAME: &str = "pbilling";

const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1);

lazy_static! {
static ref QUERIER_MAP: Arc<RwLock<HashMap<String, QuerierStatus>>> =
Arc::new(RwLock::new(HashMap::new()));
Expand Down Expand Up @@ -1486,7 +1482,7 @@ where
}

/// Main function to fetch billing metrics from all nodes
async fn fetch_cluster_billing_metrics() -> Result<Vec<BillingMetricEvent>, PostError> {
pub async fn fetch_cluster_billing_metrics() -> Result<Vec<BillingMetricEvent>, PostError> {
// Get all node types metadata concurrently
let (prism_result, querier_result, ingestor_result, indexer_result) = future::join4(
get_node_info(NodeType::Prism),
Expand Down Expand Up @@ -1553,87 +1549,6 @@ async fn fetch_cluster_billing_metrics() -> Result<Vec<BillingMetricEvent>, Post
Ok(all_billing_metrics)
}

pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
info!("Setting up schedular for cluster metrics ingestion");
let mut scheduler = AsyncScheduler::new();
scheduler
.every(CLUSTER_METRICS_INTERVAL_SECONDS)
.run(move || async {
let result: Result<(), PostError> = async {
// Fetch regular cluster metrics
let cluster_metrics = fetch_cluster_metrics().await;
if let Ok(metrics) = cluster_metrics
&& !metrics.is_empty()
{
info!("Cluster metrics fetched successfully from all nodes");
if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) {
if matches!(
ingest_internal_stream(
PMETA_STREAM_NAME.to_string(),
bytes::Bytes::from(metrics_bytes),
)
.await,
Ok(())
) {
info!("Cluster metrics successfully ingested into internal stream");
} else {
error!("Failed to ingest cluster metrics into internal stream");
}
} else {
error!("Failed to serialize cluster metrics");
}
}

// Fetch billing metrics
match fetch_cluster_billing_metrics().await {
Ok(metrics) if !metrics.is_empty() => {
info!("Billing metrics fetched successfully from all nodes");
// Optionally add: trace!("Billing metrics: {:?}", metrics);
if let Ok(billing_metrics_bytes) = serde_json::to_vec(&metrics) {
if matches!(
ingest_internal_stream(
BILLING_METRICS_STREAM_NAME.to_string(),
bytes::Bytes::from(billing_metrics_bytes),
)
.await,
Ok(())
) {
info!("Billing metrics successfully ingested into billing-metrics stream");
} else {
error!("Failed to ingest billing metrics into billing-metrics stream");
}
} else {
error!("Failed to serialize billing metrics");
}
}
Ok(_) => {
// Empty metrics result
info!("No billing metrics to ingest (empty result)");
}
Err(err) => {
error!("Error fetching billing metrics: {:?}", err);
}
}

Ok(())
}
.await;

if let Err(err) = result {
error!("Error in cluster metrics scheduler: {:?}", err);
}
});

tokio::spawn(async move {
loop {
scheduler.run_pending().await;
tokio::time::sleep(Duration::from_secs(10)).await;
}
});

Ok(())
}

#[derive(Clone, Debug)]
struct QuerierStatus {
metadata: QuerierMetadata,
Expand Down
Loading