From 159cf7dec29aa7f37a2b8254ae24ad8332c2826e Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Tue, 25 Nov 2025 13:13:22 -0800 Subject: [PATCH] Updates for parseable cluster scheduler --- src/analytics.rs | 3 +- src/handlers/http/cluster/mod.rs | 89 +------------------------------- 2 files changed, 3 insertions(+), 89 deletions(-) diff --git a/src/analytics.rs b/src/analytics.rs index 6353a3f57..3241e9201 100644 --- a/src/analytics.rs +++ b/src/analytics.rs @@ -332,12 +332,11 @@ async fn build_metrics() -> HashMap { cpu.cpu_usage().into(), ); } - metrics } pub fn init_analytics_scheduler() -> anyhow::Result<()> { - info!("Setting up schedular for anonymous user analytics"); + info!("Setting up scheduler for anonymous user analytics"); let mut scheduler = AsyncScheduler::new(); scheduler diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs index baf0894aa..518f82e07 100644 --- a/src/handlers/http/cluster/mod.rs +++ b/src/handlers/http/cluster/mod.rs @@ -30,18 +30,16 @@ use actix_web::http::header::{self, HeaderMap}; use actix_web::web::Path; use bytes::Bytes; use chrono::Utc; -use clokwerk::{AsyncScheduler, Interval}; use http::{StatusCode, header as http_header}; use itertools::Itertools; use serde::de::{DeserializeOwned, Error}; use serde_json::error::Error as SerdeError; use serde_json::{Value as JsonValue, to_vec}; -use tracing::{error, info, warn}; +use tracing::{error, warn}; use url::Url; use utils::{IngestionStats, QueriedStats, StorageStats, check_liveness, to_url_string}; use crate::INTRA_CLUSTER_CLIENT; -use crate::handlers::http::ingest::ingest_internal_stream; use crate::handlers::http::query::{Query, QueryError, TIME_ELAPSED_HEADER}; use crate::metrics::prom_utils::Metrics; use crate::option::Mode; @@ -61,8 +59,6 @@ use super::role::RoleError; pub const PMETA_STREAM_NAME: &str = "pmeta"; pub const BILLING_METRICS_STREAM_NAME: &str = "pbilling"; -const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1); - lazy_static! { static ref QUERIER_MAP: Arc>> = Arc::new(RwLock::new(HashMap::new())); @@ -1486,7 +1482,7 @@ where } /// Main function to fetch billing metrics from all nodes -async fn fetch_cluster_billing_metrics() -> Result, PostError> { +pub async fn fetch_cluster_billing_metrics() -> Result, PostError> { // Get all node types metadata concurrently let (prism_result, querier_result, ingestor_result, indexer_result) = future::join4( get_node_info(NodeType::Prism), @@ -1553,87 +1549,6 @@ async fn fetch_cluster_billing_metrics() -> Result, Post Ok(all_billing_metrics) } -pub fn init_cluster_metrics_schedular() -> Result<(), PostError> { - info!("Setting up schedular for cluster metrics ingestion"); - let mut scheduler = AsyncScheduler::new(); - scheduler - .every(CLUSTER_METRICS_INTERVAL_SECONDS) - .run(move || async { - let result: Result<(), PostError> = async { - // Fetch regular cluster metrics - let cluster_metrics = fetch_cluster_metrics().await; - if let Ok(metrics) = cluster_metrics - && !metrics.is_empty() - { - info!("Cluster metrics fetched successfully from all nodes"); - if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) { - if matches!( - ingest_internal_stream( - PMETA_STREAM_NAME.to_string(), - bytes::Bytes::from(metrics_bytes), - ) - .await, - Ok(()) - ) { - info!("Cluster metrics successfully ingested into internal stream"); - } else { - error!("Failed to ingest cluster metrics into internal stream"); - } - } else { - error!("Failed to serialize cluster metrics"); - } - } - - // Fetch billing metrics - match fetch_cluster_billing_metrics().await { - Ok(metrics) if !metrics.is_empty() => { - info!("Billing metrics fetched successfully from all nodes"); - // Optionally add: trace!("Billing metrics: {:?}", metrics); - if let Ok(billing_metrics_bytes) = serde_json::to_vec(&metrics) { - if matches!( - ingest_internal_stream( - BILLING_METRICS_STREAM_NAME.to_string(), - bytes::Bytes::from(billing_metrics_bytes), - ) - .await, - Ok(()) - ) { - info!("Billing metrics successfully ingested into billing-metrics stream"); - } else { - error!("Failed to ingest billing metrics into billing-metrics stream"); - } - } else { - error!("Failed to serialize billing metrics"); - } - } - Ok(_) => { - // Empty metrics result - info!("No billing metrics to ingest (empty result)"); - } - Err(err) => { - error!("Error fetching billing metrics: {:?}", err); - } - } - - Ok(()) - } - .await; - - if let Err(err) = result { - error!("Error in cluster metrics scheduler: {:?}", err); - } - }); - - tokio::spawn(async move { - loop { - scheduler.run_pending().await; - tokio::time::sleep(Duration::from_secs(10)).await; - } - }); - - Ok(()) -} - #[derive(Clone, Debug)] struct QuerierStatus { metadata: QuerierMetadata,