Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 5 additions & 15 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ pub async fn sync_streams_with_ingestors(
stream_name
);

// roll back the stream creation
send_stream_rollback_request(&url, ingestor.clone()).await?;
// delete the stream
send_stream_delete_request(&url, ingestor.clone()).await?;
}

// this might be a bit too much
Expand Down Expand Up @@ -188,15 +188,13 @@ async fn send_stream_sync_request(
}

/// send a rollback request to all ingestors
#[allow(dead_code)]
async fn send_stream_rollback_request(
pub async fn send_stream_delete_request(
url: &str,
ingestor: IngestorMetadata,
) -> Result<(), StreamError> {
if !utils::check_liveness(&ingestor.domain_name).await {
return Ok(());
}

let client = reqwest::Client::new();
let resp = client
.delete(url)
Expand All @@ -207,7 +205,7 @@ async fn send_stream_rollback_request(
.map_err(|err| {
// log the error and return a custom error
log::error!(
"Fatal: failed to rollback stream creation: {}\n Error: {:?}",
"Fatal: failed to delete stream: {}\n Error: {:?}",
ingestor.domain_name,
err
);
Expand All @@ -218,18 +216,10 @@ async fn send_stream_rollback_request(
// this could be a bit too much, but we need to be sure it covers all cases
if !resp.status().is_success() {
log::error!(
"failed to rollback stream creation: {}\nResponse Returned: {:?}",
"failed to delete stream: {}\nResponse Returned: {:?}",
ingestor.domain_name,
resp
);
return Err(StreamError::Custom {
msg: format!(
"failed to rollback stream creation: {}\nResponse Returned: {:?}",
ingestor.domain_name,
resp.text().await.unwrap_or_default()
),
status: StatusCode::INTERNAL_SERVER_ERROR,
});
}

Ok(())
Expand Down
46 changes: 34 additions & 12 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo};
use crate::{catalog, event, stats};
use crate::{metadata, validator};

use super::base_path_without_preceding_slash;
use super::cluster::fetch_stats_from_ingestors;
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
use actix_web::http::StatusCode;
Expand All @@ -40,28 +41,49 @@ use std::sync::Arc;

pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

if !metadata::STREAM_INFO.stream_exists(&stream_name) {
return Err(StreamError::StreamNotFound(stream_name));
}
match CONFIG.parseable.mode {
Mode::Query | Mode::All => {
let objectstore = CONFIG.storage().get_object_store();

objectstore.delete_stream(&stream_name).await?;
let stream_dir = StorageDir::new(&stream_name);
if fs::remove_dir_all(&stream_dir.data_path).is_err() {
log::warn!(
"failed to delete local data for stream {}. Clean {} manually",
stream_name,
stream_dir.data_path.to_string_lossy()
)
}

let ingestor_metadata = super::cluster::get_ingestor_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
StreamError::from(err)
})?;

for ingestor in ingestor_metadata {
let url = format!(
"{}{}/logstream/{}",
ingestor.domain_name,
base_path_without_preceding_slash(),
stream_name
);

// delete the stream
super::cluster::send_stream_delete_request(&url, ingestor.clone()).await?;
}
}
_ => {}
}

let objectstore = CONFIG.storage().get_object_store();
objectstore.delete_stream(&stream_name).await?;
metadata::STREAM_INFO.delete_stream(&stream_name);
event::STREAM_WRITERS.delete_stream(&stream_name);
stats::delete_stats(&stream_name, "json").unwrap_or_else(|e| {
log::warn!("failed to delete stats for stream {}: {:?}", stream_name, e)
});

let stream_dir = StorageDir::new(&stream_name);
if fs::remove_dir_all(&stream_dir.data_path).is_err() {
log::warn!(
"failed to delete local data for stream {}. Clean {} manually",
stream_name,
stream_dir.data_path.to_string_lossy()
)
}

Ok((format!("log stream {stream_name} deleted"), StatusCode::OK))
}

Expand Down
7 changes: 7 additions & 0 deletions server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,13 @@ impl IngestServer {
fn logstream_api() -> Scope {
web::scope("/logstream").service(
web::scope("/{logstream}")
.service(
web::resource("").route(
web::delete()
.to(logstream::delete)
.authorize_for_stream(Action::DeleteStream),
),
)
.service(
// GET "/logstream/{logstream}/stats" ==> Get stats for given log stream
web::resource("/stats").route(
Expand Down