Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ pub async fn put_stream_hot_tier(

validator::hot_tier(&hottier.size.to_string())?;

stream.set_hot_tier(true);
stream.set_hot_tier(Some(hottier.clone()));
let Some(hot_tier_manager) = HotTierManager::global() else {
return Err(StreamError::HotTierNotEnabled(stream_name));
};
Expand All @@ -418,6 +418,7 @@ pub async fn put_stream_hot_tier(
.await?,
)?;
stream_metadata.hot_tier_enabled = true;
stream_metadata.hot_tier = Some(hottier.clone());

PARSEABLE
.metastore
Expand Down Expand Up @@ -480,6 +481,7 @@ pub async fn delete_stream_hot_tier(
.await?,
)?;
stream_metadata.hot_tier_enabled = false;
stream_metadata.hot_tier = None;

PARSEABLE
.metastore
Expand Down
50 changes: 50 additions & 0 deletions src/handlers/http/modal/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::{
alerts::{ALERTS, get_alert_manager, target::TARGETS},
cli::Options,
correlation::CORRELATIONS,
hottier::{HotTierManager, StreamHotTier},
metastore::metastore_traits::MetastoreObject,
oidc::Claims,
option::Mode,
Expand Down Expand Up @@ -594,6 +595,55 @@ pub type IndexerMetadata = NodeMetadata;
pub type QuerierMetadata = NodeMetadata;
pub type PrismMetadata = NodeMetadata;

/// Initialize hot tier metadata files for streams that have hot tier configuration
/// in their stream metadata but don't have local hot tier metadata files yet.
/// This function is called once during query server startup.
pub async fn initialize_hot_tier_metadata_on_startup(
hot_tier_manager: &HotTierManager,
) -> anyhow::Result<()> {
// Collect hot tier configurations from streams before doing async operations
let hot_tier_configs: Vec<(String, StreamHotTier)> = {
let streams_guard = PARSEABLE.streams.read().unwrap();
streams_guard
.iter()
.filter_map(|(stream_name, stream)| {
// Skip if hot tier metadata file already exists for this stream
if hot_tier_manager.check_stream_hot_tier_exists(stream_name) {
return None;
}

// Get the hot tier configuration from the in-memory stream metadata
stream
.get_hot_tier()
.map(|config| (stream_name.clone(), config))
})
.collect()
};

for (stream_name, hot_tier_config) in hot_tier_configs {
// Create the hot tier metadata file with the configuration from stream metadata
let mut hot_tier_metadata = hot_tier_config;
hot_tier_metadata.used_size = 0;
hot_tier_metadata.available_size = hot_tier_metadata.size;
hot_tier_metadata.oldest_date_time_entry = None;
if hot_tier_metadata.version.is_none() {
hot_tier_metadata.version = Some(crate::hottier::CURRENT_HOT_TIER_VERSION.to_string());
}

if let Err(e) = hot_tier_manager
.put_hot_tier(&stream_name, &mut hot_tier_metadata)
.await
{
warn!(
"Failed to initialize hot tier metadata for stream {}: {}",
stream_name, e
);
}
}

Ok(())
}

#[cfg(test)]
mod test {
use actix_web::body::MessageBody;
Expand Down
6 changes: 6 additions & 0 deletions src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::thread;
use crate::handlers::airplane;
use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular};
use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt};
use crate::handlers::http::modal::initialize_hot_tier_metadata_on_startup;
use crate::handlers::http::{MAX_EVENT_PAYLOAD_SIZE, logstream};
use crate::handlers::http::{base_path, prism_base_path, resource_check};
use crate::handlers::http::{rbac, role};
Expand Down Expand Up @@ -141,6 +142,11 @@ impl ParseableServer for QueryServer {
});
if let Some(hot_tier_manager) = HotTierManager::global() {
hot_tier_manager.put_internal_stream_hot_tier().await?;
// Initialize hot tier metadata files for streams that have hot tier configuration
// but don't have local hot tier metadata files yet
if let Err(e) = initialize_hot_tier_metadata_on_startup(hot_tier_manager).await {
tracing::warn!("Failed to initialize hot tier metadata on startup: {}", e);
}
hot_tier_manager.download_from_s3()?;
};

Expand Down
6 changes: 6 additions & 0 deletions src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::handlers::http::alerts;
use crate::handlers::http::base_path;
use crate::handlers::http::demo_data::get_demo_data;
use crate::handlers::http::health_check;
use crate::handlers::http::modal::initialize_hot_tier_metadata_on_startup;
use crate::handlers::http::prism_base_path;
use crate::handlers::http::query;
use crate::handlers::http::resource_check;
Expand Down Expand Up @@ -143,6 +144,11 @@ impl ParseableServer for Server {
});

if let Some(hot_tier_manager) = HotTierManager::global() {
// Initialize hot tier metadata files for streams that have hot tier configuration
// but don't have local hot tier metadata files yet
if let Err(e) = initialize_hot_tier_metadata_on_startup(hot_tier_manager).await {
tracing::warn!("Failed to initialize hot tier metadata on startup: {}", e);
}
hot_tier_manager.download_from_s3()?;
};

Expand Down
2 changes: 1 addition & 1 deletion src/hottier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const HOT_TIER_SYNC_DURATION: Interval = clokwerk::Interval::Minutes(1);
pub const INTERNAL_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10485760; //10 MiB
pub const CURRENT_HOT_TIER_VERSION: &str = "v2";

#[derive(Debug, serde::Deserialize, serde::Serialize)]
#[derive(Debug, Clone, PartialEq, Eq, serde::Deserialize, serde::Serialize)]
pub struct StreamHotTier {
pub version: Option<String>,
#[serde(with = "crate::utils::human_size")]
Expand Down
2 changes: 2 additions & 0 deletions src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use std::sync::Arc;
use crate::catalog::snapshot::ManifestItem;
use crate::event::format::LogSourceEntry;
use crate::handlers::TelemetryType;
use crate::hottier::StreamHotTier;
use crate::metrics::{
EVENTS_INGESTED, EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE, EVENTS_INGESTED_SIZE_DATE,
EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_INGESTED, LIFETIME_EVENTS_INGESTED_SIZE,
Expand Down Expand Up @@ -87,6 +88,7 @@ pub struct LogStreamMetadata {
pub custom_partition: Option<String>,
pub static_schema_flag: bool,
pub hot_tier_enabled: bool,
pub hot_tier: Option<StreamHotTier>,
pub stream_type: StreamType,
pub log_source: Vec<LogSourceEntry>,
pub telemetry_type: TelemetryType,
Expand Down
5 changes: 1 addition & 4 deletions src/metastore/metastores/object_store_metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,7 @@ impl Metastore for ObjectStoreMetastore {
/// Delete an overview
async fn delete_overview(&self, stream: &str) -> Result<(), MetastoreError> {
let path = RelativePathBuf::from_iter([stream, "overview"]);
Ok(self
.storage
.delete_object(&path)
.await?)
Ok(self.storage.delete_object(&path).await?)
}

/// This function fetches all the keystones from the underlying object store
Expand Down
2 changes: 2 additions & 0 deletions src/migration/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ async fn setup_logstream_metadata(
custom_partition,
static_schema_flag,
hot_tier_enabled,
hot_tier,
stream_type,
log_source,
telemetry_type,
Expand Down Expand Up @@ -402,6 +403,7 @@ async fn setup_logstream_metadata(
custom_partition,
static_schema_flag,
hot_tier_enabled,
hot_tier,
stream_type,
log_source,
telemetry_type,
Expand Down
16 changes: 14 additions & 2 deletions src/parseable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,11 +359,13 @@ impl Parseable {
.and_then(|limit| limit.parse().ok());
let custom_partition = stream_metadata.custom_partition;
let static_schema_flag = stream_metadata.static_schema_flag;
let hot_tier_enabled = stream_metadata.hot_tier_enabled;
let hot_tier = stream_metadata.hot_tier.clone();
let stream_type = stream_metadata.stream_type;
let schema_version = stream_metadata.schema_version;
let log_source = stream_metadata.log_source;
let telemetry_type = stream_metadata.telemetry_type;
let metadata = LogStreamMetadata::new(
let mut metadata = LogStreamMetadata::new(
created_at,
time_partition,
time_partition_limit,
Expand All @@ -375,18 +377,28 @@ impl Parseable {
log_source,
telemetry_type,
);

// Set hot tier fields from the stored metadata
metadata.hot_tier_enabled = hot_tier_enabled;
metadata.hot_tier.clone_from(&hot_tier);

let ingestor_id = INGESTOR_META
.get()
.map(|ingestor_metadata| ingestor_metadata.get_node_id());

// Gets write privileges only for creating the stream when it doesn't already exist.
self.streams.get_or_create(
let stream = self.streams.get_or_create(
self.options.clone(),
stream_name.to_owned(),
metadata,
ingestor_id,
);

// Set hot tier configuration in memory based on stored metadata
if let Some(hot_tier_config) = hot_tier {
stream.set_hot_tier(Some(hot_tier_config));
}

//commit schema in memory
commit_schema(stream_name, schema).map_err(|e| StreamError::Anyhow(e.into()))?;

Expand Down
15 changes: 13 additions & 2 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use crate::{
DEFAULT_TIMESTAMP_KEY,
format::{LogSource, LogSourceEntry},
},
hottier::StreamHotTier,
metadata::{LogStreamMetadata, SchemaVersion},
metrics,
option::Mode,
Expand Down Expand Up @@ -921,8 +922,18 @@ impl Stream {
self.metadata.write().expect(LOCK_EXPECT).custom_partition = custom_partition.cloned();
}

pub fn set_hot_tier(&self, enable: bool) {
self.metadata.write().expect(LOCK_EXPECT).hot_tier_enabled = enable;
pub fn set_hot_tier(&self, hot_tier: Option<StreamHotTier>) {
let mut metadata = self.metadata.write().expect(LOCK_EXPECT);
metadata.hot_tier.clone_from(&hot_tier);
metadata.hot_tier_enabled = hot_tier.is_some();
}

pub fn get_hot_tier(&self) -> Option<StreamHotTier> {
self.metadata.read().expect(LOCK_EXPECT).hot_tier.clone()
}

pub fn is_hot_tier_enabled(&self) -> bool {
self.metadata.read().expect(LOCK_EXPECT).hot_tier_enabled
}

pub fn get_stream_type(&self) -> StreamType {
Expand Down
4 changes: 4 additions & 0 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::{
catalog::snapshot::Snapshot,
event::format::LogSourceEntry,
handlers::TelemetryType,
hottier::StreamHotTier,
metadata::SchemaVersion,
metastore::{MetastoreErrorDetail, metastore_traits::MetastoreObject},
option::StandaloneWithDistributed,
Expand Down Expand Up @@ -122,6 +123,8 @@ pub struct ObjectStoreFormat {
pub static_schema_flag: bool,
#[serde(default)]
pub hot_tier_enabled: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub hot_tier: Option<StreamHotTier>,
#[serde(default)]
pub stream_type: StreamType,
#[serde(default)]
Expand Down Expand Up @@ -242,6 +245,7 @@ impl Default for ObjectStoreFormat {
custom_partition: None,
static_schema_flag: false,
hot_tier_enabled: false,
hot_tier: None,
log_source: vec![LogSourceEntry::default()],
telemetry_type: TelemetryType::Logs,
}
Expand Down
Loading