Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use serde_json::Value;
use crate::alerts::Alerts;
use crate::metadata::STREAM_INFO;
use crate::option::CONFIG;
use crate::storage::retention::{self, Retention};
use crate::storage::retention::Retention;
use crate::storage::{LogStream, StorageDir};
use crate::{event, stats};
use crate::{metadata, validator};
Expand Down Expand Up @@ -219,8 +219,6 @@ pub async fn put_retention(
.put_retention(&stream_name, &retention)
.await?;

retention::init_scheduler(&stream_name, retention);

Ok((
format!("set retention configuration for log stream {stream_name}"),
StatusCode::OK,
Expand Down
2 changes: 1 addition & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ async fn main() -> anyhow::Result<()> {
}

// track all parquet files already in the data directory
storage::retention::load_retention_from_global().await;
storage::retention::load_retention_from_global();
// load data from stats back to prometheus metrics
metrics::load_from_stats_from_storage().await;

Expand Down
67 changes: 37 additions & 30 deletions server/src/storage/retention.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,40 +43,47 @@ fn async_runtime() -> tokio::runtime::Runtime {
.unwrap()
}

pub async fn load_retention_from_global() {
pub fn load_retention_from_global() {
log::info!("loading retention for all streams");
for stream in STREAM_INFO.list_streams() {
let res = CONFIG
.storage()
.get_object_store()
.get_retention(&stream)
.await;
match res {
Ok(config) => {
if config.tasks.is_empty() {
log::info!("skipping loading retention for {stream}");
continue;
}
init_scheduler(&stream, config)
}
Err(err) => log::warn!("failed to load retention config for {stream} due to {err:?}"),
}
}
init_scheduler();
}

pub fn init_scheduler(stream: &str, config: Retention) {
log::info!("Setting up schedular for {stream}");
pub fn init_scheduler() {
log::info!("Setting up schedular");
let mut scheduler = AsyncScheduler::new();
for Task { action, days, .. } in config.tasks.into_iter() {
let func = match action {
Action::Delete => {
let stream = stream.to_string();
move || action::delete(stream.clone(), u32::from(days))
}
};
let func = move || async {
for stream in STREAM_INFO.list_streams() {
let res = CONFIG
.storage()
.get_object_store()
.get_retention(&stream)
.await;

match res {
Ok(config) => {
for Task { action, days, .. } in config.tasks.into_iter() {
match action {
Action::Delete => {
let stream = stream.to_string();
thread::spawn(move || {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
// Run the asynchronous delete action
action::delete(stream.clone(), u32::from(days)).await;
});
});
}
};
}
}
Err(err) => {
log::warn!("failed to load retention config for {stream} due to {err:?}")
}
};
}
};

scheduler.every(1.day()).at("00:00").run(func);
}
scheduler.every(1.day()).at("00:00").run(func);

let handler = thread::spawn(|| {
let rt = async_runtime();
Expand Down Expand Up @@ -183,7 +190,7 @@ mod action {
use crate::option::CONFIG;

pub(super) async fn delete(stream_name: String, days: u32) {
log::info!("running retention task - delete");
log::info!("running retention task - delete for stream={stream_name}");
let retain_until = get_retain_until(Utc::now().date_naive(), days as u64);

let Ok(dates) = CONFIG
Expand Down