diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index ae42bc3e0..604ab9c3a 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -397,7 +397,7 @@ pub async fn put_stream_hot_tier( validator::hot_tier(&hottier.size.to_string())?; - stream.set_hot_tier(true); + stream.set_hot_tier(Some(hottier.clone())); let Some(hot_tier_manager) = HotTierManager::global() else { return Err(StreamError::HotTierNotEnabled(stream_name)); }; @@ -418,6 +418,7 @@ pub async fn put_stream_hot_tier( .await?, )?; stream_metadata.hot_tier_enabled = true; + stream_metadata.hot_tier = Some(hottier.clone()); PARSEABLE .metastore @@ -480,6 +481,7 @@ pub async fn delete_stream_hot_tier( .await?, )?; stream_metadata.hot_tier_enabled = false; + stream_metadata.hot_tier = None; PARSEABLE .metastore diff --git a/src/handlers/http/modal/mod.rs b/src/handlers/http/modal/mod.rs index 0ecee0aa6..844975e5f 100644 --- a/src/handlers/http/modal/mod.rs +++ b/src/handlers/http/modal/mod.rs @@ -37,6 +37,7 @@ use crate::{ alerts::{ALERTS, get_alert_manager, target::TARGETS}, cli::Options, correlation::CORRELATIONS, + hottier::{HotTierManager, StreamHotTier}, metastore::metastore_traits::MetastoreObject, oidc::Claims, option::Mode, @@ -594,6 +595,55 @@ pub type IndexerMetadata = NodeMetadata; pub type QuerierMetadata = NodeMetadata; pub type PrismMetadata = NodeMetadata; +/// Initialize hot tier metadata files for streams that have hot tier configuration +/// in their stream metadata but don't have local hot tier metadata files yet. +/// This function is called once during query server startup. +pub async fn initialize_hot_tier_metadata_on_startup( + hot_tier_manager: &HotTierManager, +) -> anyhow::Result<()> { + // Collect hot tier configurations from streams before doing async operations + let hot_tier_configs: Vec<(String, StreamHotTier)> = { + let streams_guard = PARSEABLE.streams.read().unwrap(); + streams_guard + .iter() + .filter_map(|(stream_name, stream)| { + // Skip if hot tier metadata file already exists for this stream + if hot_tier_manager.check_stream_hot_tier_exists(stream_name) { + return None; + } + + // Get the hot tier configuration from the in-memory stream metadata + stream + .get_hot_tier() + .map(|config| (stream_name.clone(), config)) + }) + .collect() + }; + + for (stream_name, hot_tier_config) in hot_tier_configs { + // Create the hot tier metadata file with the configuration from stream metadata + let mut hot_tier_metadata = hot_tier_config; + hot_tier_metadata.used_size = 0; + hot_tier_metadata.available_size = hot_tier_metadata.size; + hot_tier_metadata.oldest_date_time_entry = None; + if hot_tier_metadata.version.is_none() { + hot_tier_metadata.version = Some(crate::hottier::CURRENT_HOT_TIER_VERSION.to_string()); + } + + if let Err(e) = hot_tier_manager + .put_hot_tier(&stream_name, &mut hot_tier_metadata) + .await + { + warn!( + "Failed to initialize hot tier metadata for stream {}: {}", + stream_name, e + ); + } + } + + Ok(()) +} + #[cfg(test)] mod test { use actix_web::body::MessageBody; diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index c551884d4..5afa0459a 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -22,6 +22,7 @@ use std::thread; use crate::handlers::airplane; use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt}; +use crate::handlers::http::modal::initialize_hot_tier_metadata_on_startup; use crate::handlers::http::{MAX_EVENT_PAYLOAD_SIZE, logstream}; use crate::handlers::http::{base_path, prism_base_path, resource_check}; use crate::handlers::http::{rbac, role}; @@ -141,6 +142,11 @@ impl ParseableServer for QueryServer { }); if let Some(hot_tier_manager) = HotTierManager::global() { hot_tier_manager.put_internal_stream_hot_tier().await?; + // Initialize hot tier metadata files for streams that have hot tier configuration + // but don't have local hot tier metadata files yet + if let Err(e) = initialize_hot_tier_metadata_on_startup(hot_tier_manager).await { + tracing::warn!("Failed to initialize hot tier metadata on startup: {}", e); + } hot_tier_manager.download_from_s3()?; }; diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index a522697aa..6e3ba9ea7 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -25,6 +25,7 @@ use crate::handlers::http::alerts; use crate::handlers::http::base_path; use crate::handlers::http::demo_data::get_demo_data; use crate::handlers::http::health_check; +use crate::handlers::http::modal::initialize_hot_tier_metadata_on_startup; use crate::handlers::http::prism_base_path; use crate::handlers::http::query; use crate::handlers::http::resource_check; @@ -143,6 +144,11 @@ impl ParseableServer for Server { }); if let Some(hot_tier_manager) = HotTierManager::global() { + // Initialize hot tier metadata files for streams that have hot tier configuration + // but don't have local hot tier metadata files yet + if let Err(e) = initialize_hot_tier_metadata_on_startup(hot_tier_manager).await { + tracing::warn!("Failed to initialize hot tier metadata on startup: {}", e); + } hot_tier_manager.download_from_s3()?; }; diff --git a/src/hottier.rs b/src/hottier.rs index a4e78bca6..872c62b57 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -51,7 +51,7 @@ const HOT_TIER_SYNC_DURATION: Interval = clokwerk::Interval::Minutes(1); pub const INTERNAL_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10485760; //10 MiB pub const CURRENT_HOT_TIER_VERSION: &str = "v2"; -#[derive(Debug, serde::Deserialize, serde::Serialize)] +#[derive(Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize)] pub struct StreamHotTier { pub version: Option, #[serde(with = "crate::utils::human_size")] diff --git a/src/metadata.rs b/src/metadata.rs index e49fc2119..e52f612aa 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -26,6 +26,7 @@ use std::sync::Arc; use crate::catalog::snapshot::ManifestItem; use crate::event::format::LogSourceEntry; use crate::handlers::TelemetryType; +use crate::hottier::StreamHotTier; use crate::metrics::{ EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE, @@ -87,6 +88,7 @@ pub struct LogStreamMetadata { pub custom_partition: Option, pub static_schema_flag: bool, pub hot_tier_enabled: bool, + pub hot_tier: Option, pub stream_type: StreamType, pub log_source: Vec, pub telemetry_type: TelemetryType, diff --git a/src/metastore/metastores/object_store_metastore.rs b/src/metastore/metastores/object_store_metastore.rs index b93984f05..10edf9438 100644 --- a/src/metastore/metastores/object_store_metastore.rs +++ b/src/metastore/metastores/object_store_metastore.rs @@ -110,10 +110,7 @@ impl Metastore for ObjectStoreMetastore { /// Delete an overview async fn delete_overview(&self, stream: &str) -> Result<(), MetastoreError> { let path = RelativePathBuf::from_iter([stream, "overview"]); - Ok(self - .storage - .delete_object(&path) - .await?) + Ok(self.storage.delete_object(&path).await?) } /// This function fetches all the keystones from the underlying object store diff --git a/src/migration/mod.rs b/src/migration/mod.rs index e94f58913..1490c1869 100644 --- a/src/migration/mod.rs +++ b/src/migration/mod.rs @@ -367,6 +367,7 @@ async fn setup_logstream_metadata( custom_partition, static_schema_flag, hot_tier_enabled, + hot_tier, stream_type, log_source, telemetry_type, @@ -402,6 +403,7 @@ async fn setup_logstream_metadata( custom_partition, static_schema_flag, hot_tier_enabled, + hot_tier, stream_type, log_source, telemetry_type, diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 1a0e4374b..d8cba4aea 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -359,11 +359,13 @@ impl Parseable { .and_then(|limit| limit.parse().ok()); let custom_partition = stream_metadata.custom_partition; let static_schema_flag = stream_metadata.static_schema_flag; + let hot_tier_enabled = stream_metadata.hot_tier_enabled; + let hot_tier = stream_metadata.hot_tier.clone(); let stream_type = stream_metadata.stream_type; let schema_version = stream_metadata.schema_version; let log_source = stream_metadata.log_source; let telemetry_type = stream_metadata.telemetry_type; - let metadata = LogStreamMetadata::new( + let mut metadata = LogStreamMetadata::new( created_at, time_partition, time_partition_limit, @@ -375,18 +377,28 @@ impl Parseable { log_source, telemetry_type, ); + + // Set hot tier fields from the stored metadata + metadata.hot_tier_enabled = hot_tier_enabled; + metadata.hot_tier.clone_from(&hot_tier); + let ingestor_id = INGESTOR_META .get() .map(|ingestor_metadata| ingestor_metadata.get_node_id()); // Gets write privileges only for creating the stream when it doesn't already exist. - self.streams.get_or_create( + let stream = self.streams.get_or_create( self.options.clone(), stream_name.to_owned(), metadata, ingestor_id, ); + // Set hot tier configuration in memory based on stored metadata + if let Some(hot_tier_config) = hot_tier { + stream.set_hot_tier(Some(hot_tier_config)); + } + //commit schema in memory commit_schema(stream_name, schema).map_err(|e| StreamError::Anyhow(e.into()))?; diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index b84eebc00..1dab2542a 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -53,6 +53,7 @@ use crate::{ DEFAULT_TIMESTAMP_KEY, format::{LogSource, LogSourceEntry}, }, + hottier::StreamHotTier, metadata::{LogStreamMetadata, SchemaVersion}, metrics, option::Mode, @@ -921,8 +922,18 @@ impl Stream { self.metadata.write().expect(LOCK_EXPECT).custom_partition = custom_partition.cloned(); } - pub fn set_hot_tier(&self, enable: bool) { - self.metadata.write().expect(LOCK_EXPECT).hot_tier_enabled = enable; + pub fn set_hot_tier(&self, hot_tier: Option) { + let mut metadata = self.metadata.write().expect(LOCK_EXPECT); + metadata.hot_tier.clone_from(&hot_tier); + metadata.hot_tier_enabled = hot_tier.is_some(); + } + + pub fn get_hot_tier(&self) -> Option { + self.metadata.read().expect(LOCK_EXPECT).hot_tier.clone() + } + + pub fn is_hot_tier_enabled(&self) -> bool { + self.metadata.read().expect(LOCK_EXPECT).hot_tier_enabled } pub fn get_stream_type(&self) -> StreamType { diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 2872b453e..99e22f30b 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -25,6 +25,7 @@ use crate::{ catalog::snapshot::Snapshot, event::format::LogSourceEntry, handlers::TelemetryType, + hottier::StreamHotTier, metadata::SchemaVersion, metastore::{MetastoreErrorDetail, metastore_traits::MetastoreObject}, option::StandaloneWithDistributed, @@ -122,6 +123,8 @@ pub struct ObjectStoreFormat { pub static_schema_flag: bool, #[serde(default)] pub hot_tier_enabled: bool, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub hot_tier: Option, #[serde(default)] pub stream_type: StreamType, #[serde(default)] @@ -242,6 +245,7 @@ impl Default for ObjectStoreFormat { custom_partition: None, static_schema_flag: false, hot_tier_enabled: false, + hot_tier: None, log_source: vec![LogSourceEntry::default()], telemetry_type: TelemetryType::Logs, }