Skip to content

Commit 159cf7d

Browse files
Anant Vindalparmesant
authored andcommitted
Updates for parseable cluster scheduler
1 parent 350ab9c commit 159cf7d

File tree

2 files changed

+3
-89
lines changed

2 files changed

+3
-89
lines changed

src/analytics.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -332,12 +332,11 @@ async fn build_metrics() -> HashMap<String, Value> {
332332
cpu.cpu_usage().into(),
333333
);
334334
}
335-
336335
metrics
337336
}
338337

339338
pub fn init_analytics_scheduler() -> anyhow::Result<()> {
340-
info!("Setting up schedular for anonymous user analytics");
339+
info!("Setting up scheduler for anonymous user analytics");
341340

342341
let mut scheduler = AsyncScheduler::new();
343342
scheduler

src/handlers/http/cluster/mod.rs

Lines changed: 2 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,16 @@ use actix_web::http::header::{self, HeaderMap};
3030
use actix_web::web::Path;
3131
use bytes::Bytes;
3232
use chrono::Utc;
33-
use clokwerk::{AsyncScheduler, Interval};
3433
use http::{StatusCode, header as http_header};
3534
use itertools::Itertools;
3635
use serde::de::{DeserializeOwned, Error};
3736
use serde_json::error::Error as SerdeError;
3837
use serde_json::{Value as JsonValue, to_vec};
39-
use tracing::{error, info, warn};
38+
use tracing::{error, warn};
4039
use url::Url;
4140
use utils::{IngestionStats, QueriedStats, StorageStats, check_liveness, to_url_string};
4241

4342
use crate::INTRA_CLUSTER_CLIENT;
44-
use crate::handlers::http::ingest::ingest_internal_stream;
4543
use crate::handlers::http::query::{Query, QueryError, TIME_ELAPSED_HEADER};
4644
use crate::metrics::prom_utils::Metrics;
4745
use crate::option::Mode;
@@ -61,8 +59,6 @@ use super::role::RoleError;
6159
pub const PMETA_STREAM_NAME: &str = "pmeta";
6260
pub const BILLING_METRICS_STREAM_NAME: &str = "pbilling";
6361

64-
const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1);
65-
6662
lazy_static! {
6763
static ref QUERIER_MAP: Arc<RwLock<HashMap<String, QuerierStatus>>> =
6864
Arc::new(RwLock::new(HashMap::new()));
@@ -1486,7 +1482,7 @@ where
14861482
}
14871483

14881484
/// Main function to fetch billing metrics from all nodes
1489-
async fn fetch_cluster_billing_metrics() -> Result<Vec<BillingMetricEvent>, PostError> {
1485+
pub async fn fetch_cluster_billing_metrics() -> Result<Vec<BillingMetricEvent>, PostError> {
14901486
// Get all node types metadata concurrently
14911487
let (prism_result, querier_result, ingestor_result, indexer_result) = future::join4(
14921488
get_node_info(NodeType::Prism),
@@ -1553,87 +1549,6 @@ async fn fetch_cluster_billing_metrics() -> Result<Vec<BillingMetricEvent>, Post
15531549
Ok(all_billing_metrics)
15541550
}
15551551

1556-
pub fn init_cluster_metrics_schedular() -> Result<(), PostError> {
1557-
info!("Setting up schedular for cluster metrics ingestion");
1558-
let mut scheduler = AsyncScheduler::new();
1559-
scheduler
1560-
.every(CLUSTER_METRICS_INTERVAL_SECONDS)
1561-
.run(move || async {
1562-
let result: Result<(), PostError> = async {
1563-
// Fetch regular cluster metrics
1564-
let cluster_metrics = fetch_cluster_metrics().await;
1565-
if let Ok(metrics) = cluster_metrics
1566-
&& !metrics.is_empty()
1567-
{
1568-
info!("Cluster metrics fetched successfully from all nodes");
1569-
if let Ok(metrics_bytes) = serde_json::to_vec(&metrics) {
1570-
if matches!(
1571-
ingest_internal_stream(
1572-
PMETA_STREAM_NAME.to_string(),
1573-
bytes::Bytes::from(metrics_bytes),
1574-
)
1575-
.await,
1576-
Ok(())
1577-
) {
1578-
info!("Cluster metrics successfully ingested into internal stream");
1579-
} else {
1580-
error!("Failed to ingest cluster metrics into internal stream");
1581-
}
1582-
} else {
1583-
error!("Failed to serialize cluster metrics");
1584-
}
1585-
}
1586-
1587-
// Fetch billing metrics
1588-
match fetch_cluster_billing_metrics().await {
1589-
Ok(metrics) if !metrics.is_empty() => {
1590-
info!("Billing metrics fetched successfully from all nodes");
1591-
// Optionally add: trace!("Billing metrics: {:?}", metrics);
1592-
if let Ok(billing_metrics_bytes) = serde_json::to_vec(&metrics) {
1593-
if matches!(
1594-
ingest_internal_stream(
1595-
BILLING_METRICS_STREAM_NAME.to_string(),
1596-
bytes::Bytes::from(billing_metrics_bytes),
1597-
)
1598-
.await,
1599-
Ok(())
1600-
) {
1601-
info!("Billing metrics successfully ingested into billing-metrics stream");
1602-
} else {
1603-
error!("Failed to ingest billing metrics into billing-metrics stream");
1604-
}
1605-
} else {
1606-
error!("Failed to serialize billing metrics");
1607-
}
1608-
}
1609-
Ok(_) => {
1610-
// Empty metrics result
1611-
info!("No billing metrics to ingest (empty result)");
1612-
}
1613-
Err(err) => {
1614-
error!("Error fetching billing metrics: {:?}", err);
1615-
}
1616-
}
1617-
1618-
Ok(())
1619-
}
1620-
.await;
1621-
1622-
if let Err(err) = result {
1623-
error!("Error in cluster metrics scheduler: {:?}", err);
1624-
}
1625-
});
1626-
1627-
tokio::spawn(async move {
1628-
loop {
1629-
scheduler.run_pending().await;
1630-
tokio::time::sleep(Duration::from_secs(10)).await;
1631-
}
1632-
});
1633-
1634-
Ok(())
1635-
}
1636-
16371552
#[derive(Clone, Debug)]
16381553
struct QuerierStatus {
16391554
metadata: QuerierMetadata,

0 commit comments

Comments
 (0)