Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,46 @@ use super::base_path_without_preceding_slash;

use super::modal::IngestorMetadata;

pub async fn sync_cache_with_ingestors(
url: &str,
ingestor: IngestorMetadata,
body: bool,
) -> Result<(), StreamError> {
if !utils::check_liveness(&ingestor.domain_name).await {
return Ok(());
}
let request_body: Bytes = Bytes::from(body.to_string());
let client = reqwest::Client::new();
let resp = client
.put(url)
.header(header::CONTENT_TYPE, "application/json")
.header(header::AUTHORIZATION, ingestor.token)
.body(request_body)
.send()
.await
.map_err(|err| {
// log the error and return a custom error
log::error!(
"Fatal: failed to set cache: {}\n Error: {:?}",
ingestor.domain_name,
err
);
StreamError::Network(err)
})?;

// if the response is not successful, log the error and return a custom error
// this could be a bit too much, but we need to be sure it covers all cases
if !resp.status().is_success() {
log::error!(
"failed to set cache: {}\nResponse Returned: {:?}",
ingestor.domain_name,
resp.text().await
);
}

Ok(())
}

// forward the request to all ingestors to keep them in sync
#[allow(dead_code)]
pub async fn sync_streams_with_ingestors(
Expand Down Expand Up @@ -218,7 +258,7 @@ pub async fn send_stream_delete_request(
log::error!(
"failed to delete stream: {}\nResponse Returned: {:?}",
ingestor.domain_name,
resp
resp.text().await
);
}

Expand Down
85 changes: 59 additions & 26 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/

use self::error::{CreateStreamError, StreamError};
use super::base_path_without_preceding_slash;
use super::cluster::fetch_stats_from_ingestors;
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
use crate::alerts::Alerts;
use crate::handlers::{STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY};
use crate::metadata::STREAM_INFO;
Expand All @@ -25,15 +28,12 @@ use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema};
use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo};
use crate::{catalog, event, stats};
use crate::{metadata, validator};

use super::base_path_without_preceding_slash;
use super::cluster::fetch_stats_from_ingestors;
use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats};
use actix_web::http::StatusCode;
use actix_web::{web, HttpRequest, Responder};
use arrow_schema::{Field, Schema};
use bytes::Bytes;
use chrono::Utc;
use itertools::Itertools;
use serde_json::Value;
use std::collections::HashMap;
use std::fs;
Expand Down Expand Up @@ -301,33 +301,66 @@ pub async fn put_enable_cache(
req: HttpRequest,
body: web::Json<bool>,
) -> Result<impl Responder, StreamError> {
let enable_cache = body.into_inner();
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
let storage = CONFIG.storage().get_object_store();

if CONFIG.parseable.local_cache_path.is_none() {
return Err(StreamError::CacheNotEnabled(stream_name));
}
if CONFIG.parseable.mode == Mode::Ingest {
// here the ingest server has not found the stream
// so it should check if the stream exists in storage
let streams = storage.list_streams().await?;
if !streams.contains(&LogStream {
name: stream_name.clone().to_owned(),
}) {
log::error!("Stream {} not found", stream_name.clone());
return Err(StreamError::StreamNotFound(stream_name.clone()));
match CONFIG.parseable.mode {
Mode::Query => {
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
return Err(StreamError::StreamNotFound(stream_name));
}
let ingestor_metadata = super::cluster::get_ingestor_info().await.map_err(|err| {
log::error!("Fatal: failed to get ingestor info: {:?}", err);
StreamError::from(err)
})?;
for ingestor in ingestor_metadata {
let url = format!(
"{}{}/logstream/{}/cache",
ingestor.domain_name,
base_path_without_preceding_slash(),
stream_name
);

super::cluster::sync_cache_with_ingestors(&url, ingestor.clone(), *body).await?;
}
}
Mode::Ingest => {
if CONFIG.parseable.local_cache_path.is_none() {
return Err(StreamError::CacheNotEnabled(stream_name));
}
// here the ingest server has not found the stream
// so it should check if the stream exists in storage
let check = storage
.list_streams()
.await?
.iter()
.map(|stream| stream.name.clone())
.contains(&stream_name);

if !check {
log::error!("Stream {} not found", stream_name.clone());
return Err(StreamError::StreamNotFound(stream_name.clone()));
}
metadata::STREAM_INFO
.upsert_stream_info(
&*storage,
LogStream {
name: stream_name.clone().to_owned(),
},
)
.await
.map_err(|_| StreamError::StreamNotFound(stream_name.clone()))?;
}
Mode::All => {
if !metadata::STREAM_INFO.stream_exists(&stream_name) {
return Err(StreamError::StreamNotFound(stream_name));
}
if CONFIG.parseable.local_cache_path.is_none() {
return Err(StreamError::CacheNotEnabled(stream_name));
}
}
metadata::STREAM_INFO
.upsert_stream_info(
&*storage,
LogStream {
name: stream_name.clone().to_owned(),
},
)
.await
.map_err(|_| StreamError::StreamNotFound(stream_name.clone()))?;
}
let enable_cache = body.into_inner();
let mut stream_metadata = storage.get_stream_metadata(&stream_name).await?;
stream_metadata.cache_enabled = enable_cache;
storage
Expand Down
6 changes: 6 additions & 0 deletions server/src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ impl IngestServer {
web::put()
.to(logstream::put_enable_cache)
.authorize_for_stream(Action::PutCacheEnabled),
)
// GET "/logstream/{logstream}/cache" ==> Get retention for given logstream
.route(
web::get()
.to(logstream::get_cache_enabled)
.authorize_for_stream(Action::GetCacheEnabled),
),
),
)
Expand Down