From ad5062e8082673536b9ccca9ed053e0018cb5727 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 2 Apr 2024 15:52:21 +0530 Subject: [PATCH 1/2] fix: stats response --- server/src/handlers/http/cluster/mod.rs | 75 +++++++++-------------- server/src/handlers/http/cluster/utils.rs | 8 ++- 2 files changed, 35 insertions(+), 48 deletions(-) diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 8759a39e1..91c2cbc67 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -18,7 +18,9 @@ pub mod utils; -use crate::handlers::http::cluster::utils::{check_liveness, to_url_string}; +use crate::handlers::http::cluster::utils::{ + check_liveness, to_url_string, IngestionStats, QueriedStats, +}; use crate::handlers::http::ingest::PostError; use crate::handlers::http::logstream::error::StreamError; use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY}; @@ -26,11 +28,12 @@ use crate::option::CONFIG; use crate::metrics::prom_utils::Metrics; use crate::storage::object_storage::ingester_metadata_path; -use crate::storage::ObjectStorageError; -use crate::storage::PARSEABLE_ROOT_DIRECTORY; +use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY}; +use crate::storage::{ObjectStoreFormat, PARSEABLE_ROOT_DIRECTORY}; use actix_web::http::header; use actix_web::{HttpRequest, Responder}; use bytes::Bytes; +use chrono::Utc; use http::StatusCode; use itertools::Itertools; use relative_path::RelativePathBuf; @@ -39,6 +42,8 @@ use url::Url; type IngesterMetadataArr = Vec; +use self::utils::StorageStats; + use super::base_path_without_preceding_slash; use super::modal::IngesterMetadata; @@ -108,51 +113,31 @@ pub async fn sync_streams_with_ingesters( pub async fn fetch_stats_from_ingesters( stream_name: &str, ) -> Result, StreamError> { - let mut stats = Vec::new(); - - let ingester_infos = get_ingester_info().await.map_err(|err| { - log::error!("Fatal: failed to get ingester info: {:?}", err); - StreamError::Anyhow(err) - })?; - - for ingester in ingester_infos { - let url = format!( - "{}{}/logstream/{}/stats", - ingester.domain_name, - base_path_without_preceding_slash(), - stream_name - ); - - match utils::send_stats_request(&url, ingester.clone()).await { - Ok(Some(res)) => { - match serde_json::from_str::(&res.text().await.unwrap()) { - Ok(stat) => stats.push(stat), - Err(err) => { - log::error!( - "Could not parse stats from ingester: {}\n Error: {:?}", - ingester.domain_name, - err - ); - continue; - } - } - } - Ok(None) => { - log::error!("Ingester at {} is not reachable", &ingester.domain_name); - continue; - } - Err(err) => { - log::error!( - "Fatal: failed to fetch stats from ingester: {}\n Error: {:?}", - ingester.domain_name, - err - ); - return Err(err); - } + let path = RelativePathBuf::from_iter([stream_name, STREAM_ROOT_DIRECTORY]); + let obs = CONFIG + .storage() + .get_object_store() + .get_objects(Some(&path)) + .await?; + let mut ingestion_size = 0u64; + let mut storage_size = 0u64; + let mut count = 0u64; + for ob in obs { + if let Ok(stat) = serde_json::from_slice::(&ob) { + count += stat.stats.events; + ingestion_size += stat.stats.ingestion; + storage_size += stat.stats.storage; } } - Ok(stats) + let qs = QueriedStats::new( + "", + Utc::now(), + IngestionStats::new(count, format!("{} Bytes", ingestion_size), "json"), + StorageStats::new(format!("{} Bytes", storage_size), "parquet"), + ); + + Ok(vec![qs]) } async fn send_stream_sync_request( diff --git a/server/src/handlers/http/cluster/utils.rs b/server/src/handlers/http/cluster/utils.rs index fd1e30ea8..761177d45 100644 --- a/server/src/handlers/http/cluster/utils.rs +++ b/server/src/handlers/http/cluster/utils.rs @@ -98,8 +98,8 @@ impl IngestionStats { #[derive(Debug, Default, Serialize, Deserialize)] pub struct StorageStats { - size: String, - format: String, + pub size: String, + pub format: String, } impl StorageStats { @@ -120,7 +120,7 @@ pub fn merge_quried_stats(stats: Vec) -> QueriedStats { // .unwrap(); // should never be None // get the stream name - let stream_name = stats[0].stream.clone(); + let stream_name = stats[1].stream.clone(); // get the first event at // let min_first_event_at = stats @@ -198,6 +198,8 @@ pub async fn check_liveness(domain_name: &str) -> bool { } /// send a request to the ingester to fetch its stats +/// dead for now +#[allow(dead_code)] pub async fn send_stats_request( url: &str, ingester: IngesterMetadata, From 27fa57047becb7da63c98b9fc458108d8e074841 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 2 Apr 2024 19:55:00 +0530 Subject: [PATCH 2/2] fix: s3 get objects Refactor object storage to filter objects by starts_with_pattern --- server/src/handlers/http/cluster/mod.rs | 4 ++-- server/src/handlers/http/modal/ingest_server.rs | 2 +- server/src/storage/localfs.rs | 2 ++ server/src/storage/object_storage.rs | 1 + server/src/storage/s3.rs | 7 ++++++- 5 files changed, 12 insertions(+), 4 deletions(-) diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 91c2cbc67..f049a9c9f 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -117,7 +117,7 @@ pub async fn fetch_stats_from_ingesters( let obs = CONFIG .storage() .get_object_store() - .get_objects(Some(&path)) + .get_objects(Some(&path), ".ingester") .await?; let mut ingestion_size = 0u64; let mut storage_size = 0u64; @@ -346,7 +346,7 @@ pub async fn get_ingester_info() -> anyhow::Result { let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY); let arr = store - .get_objects(Some(&root_path)) + .get_objects(Some(&root_path), "ingester") .await? .iter() // this unwrap will most definateley shoot me in the foot later diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 63665e11b..08665160c 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -251,7 +251,7 @@ impl IngestServer { let store = CONFIG.storage().get_object_store(); let base_path = RelativePathBuf::from(""); let ingester_metadata = store - .get_objects(Some(&base_path)) + .get_objects(Some(&base_path), "ingester") .await? .iter() // this unwrap will most definateley shoot me in the foot later diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index 73f75d5b6..265e94069 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -189,9 +189,11 @@ impl ObjectStorage for LocalFS { Ok(path_arr) } + /// currently it is not using the starts_with_pattern async fn get_objects( &self, base_path: Option<&RelativePath>, + _starts_with_pattern: &str, ) -> Result, ObjectStorageError> { let time = Instant::now(); diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index efc10c544..1c87d0106 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -69,6 +69,7 @@ pub trait ObjectStorage: Sync + 'static { async fn get_objects( &self, base_path: Option<&RelativePath>, + starts_with_pattern: &str, ) -> Result, ObjectStorageError>; async fn put_object( &self, diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 27ec949fc..6ee1f9905 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -416,6 +416,7 @@ impl ObjectStorage for S3 { async fn get_objects( &self, base_path: Option<&RelativePath>, + starts_with_pattern: &str, ) -> Result, ObjectStorageError> { let instant = Instant::now(); @@ -430,7 +431,11 @@ impl ObjectStorage for S3 { let mut res = vec![]; while let Some(meta) = list_stream.next().await.transpose()? { - let ingester_file = meta.location.filename().unwrap().starts_with("ingester"); + let ingester_file = meta + .location + .filename() + .unwrap() + .starts_with(starts_with_pattern); if !ingester_file { continue;