Skip to content
Merged
2 changes: 1 addition & 1 deletion src/banner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ async fn storage_info(config: &Parseable) {
Staging Path: \"{}\"",
"Storage:".to_string().bold(),
config.get_storage_mode_string(),
config.staging_dir().to_string_lossy(),
config.options.staging_dir().to_string_lossy(),
);

if let Some(path) = &config.options.hot_tier_storage_path {
Expand Down
72 changes: 71 additions & 1 deletion src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/

use clap::Parser;
use std::path::PathBuf;
use std::{env, fs, path::PathBuf};

use url::Url;

Expand Down Expand Up @@ -385,4 +385,74 @@ impl Options {
pub fn is_default_creds(&self) -> bool {
self.username == DEFAULT_USERNAME && self.password == DEFAULT_PASSWORD
}

/// Path to staging directory, ensures that it exists or panics
pub fn staging_dir(&self) -> &PathBuf {
fs::create_dir_all(&self.local_staging_path)
.expect("Should be able to create dir if doesn't exist");

&self.local_staging_path
}

/// TODO: refactor and document
Copy link
Contributor Author

@de-sh de-sh Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In another PR we can move this code to using Url over a regular String and being optional, which will significantly simplify and make things more readable

pub fn get_url(&self) -> Url {
if self.ingestor_endpoint.is_empty() {
return format!(
"{}://{}",
self.get_scheme(),
self.address
)
.parse::<Url>() // if the value was improperly set, this will panic before hand
.unwrap_or_else(|err| {
panic!("{err}, failed to parse `{}` as Url. Please set the environment variable `P_ADDR` to `<ip address>:<port>` without the scheme (e.g., 192.168.1.1:8000). Please refer to the documentation: https://logg.ing/env for more details.", self.address)
});
}

let ingestor_endpoint = &self.ingestor_endpoint;

if ingestor_endpoint.starts_with("http") {
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
}

let addr_from_env = ingestor_endpoint.split(':').collect::<Vec<&str>>();

if addr_from_env.len() != 2 {
panic!("Invalid value `{}`, please set the environement variable `P_INGESTOR_ENDPOINT` to `<ip address / DNS>:<port>` without the scheme (e.g., 192.168.1.1:8000 or example.com:8000). Please refer to the documentation: https://logg.ing/env for more details.", ingestor_endpoint);
}

let mut hostname = addr_from_env[0].to_string();
let mut port = addr_from_env[1].to_string();

// if the env var value fits the pattern $VAR_NAME:$VAR_NAME
// fetch the value from the specified env vars
if hostname.starts_with('$') {
let var_hostname = hostname[1..].to_string();
hostname = env::var(&var_hostname).unwrap_or_default();

if hostname.is_empty() {
panic!("The environement variable `{}` is not set, please set as <ip address / DNS> without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", var_hostname);
}
if hostname.starts_with("http") {
panic!("Invalid value `{}`, please set the environement variable `{}` to `<ip address / DNS>` without the scheme (e.g., 192.168.1.1 or example.com). Please refer to the documentation: https://logg.ing/env for more details.", hostname, var_hostname);
} else {
hostname = format!("{}://{}", self.get_scheme(), hostname);
}
}

if port.starts_with('$') {
let var_port = port[1..].to_string();
port = env::var(&var_port).unwrap_or_default();

if port.is_empty() {
panic!(
"Port is not set in the environement variable `{}`. Please refer to the documentation: https://logg.ing/env for more details.",
var_port
);
}
}

format!("{}://{}:{}", self.get_scheme(), hostname, port)
.parse::<Url>()
.expect("Valid URL")
}
}
2 changes: 1 addition & 1 deletion src/handlers/http/about.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ pub async fn about() -> Json<Value> {
let staging = if PARSEABLE.options.mode == Mode::Query {
"".to_string()
} else {
PARSEABLE.staging_dir().display().to_string()
PARSEABLE.options.staging_dir().display().to_string()
};
let grpc_port = PARSEABLE.options.grpc_port;

Expand Down
120 changes: 30 additions & 90 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use crate::event::format::override_data_type;
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
use crate::metadata::SchemaVersion;
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
use crate::option::Mode;
use crate::parseable::{StreamNotFound, PARSEABLE};
use crate::rbac::role::Action;
use crate::rbac::Users;
Expand All @@ -47,7 +46,8 @@ use tracing::warn;

pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();
if !PARSEABLE.streams.contains(&stream_name) {
// Error out if stream doesn't exist in memory, or in the case of query node, in storage as well
if PARSEABLE.check_or_load_stream(&stream_name).await {
return Err(StreamNotFound(stream_name).into());
}

Expand Down Expand Up @@ -120,15 +120,11 @@ pub async fn detect_schema(Json(json): Json<Value>) -> Result<impl Responder, St
Ok((web::Json(schema), StatusCode::OK))
}

pub async fn schema(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
pub async fn get_schema(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();

// Ensure parseable is aware of stream in distributed mode
if PARSEABLE.options.mode == Mode::Query
&& PARSEABLE
.create_stream_and_schema_from_storage(&stream_name)
.await?
{
if PARSEABLE.check_or_load_stream(&stream_name).await {
return Err(StreamNotFound(stream_name.clone()).into());
}

Expand Down Expand Up @@ -164,14 +160,8 @@ pub async fn get_retention(stream_name: Path<String>) -> Result<impl Responder,
// For query mode, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
if PARSEABLE.options.mode == Mode::Query {
match PARSEABLE
.create_stream_and_schema_from_storage(&stream_name)
.await
{
Ok(true) => {}
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
}
if PARSEABLE.check_or_load_stream(&stream_name).await {
return Err(StreamNotFound(stream_name.clone()).into());
}

let retention = PARSEABLE
Expand All @@ -183,36 +173,24 @@ pub async fn get_retention(stream_name: Path<String>) -> Result<impl Responder,

pub async fn put_retention(
stream_name: Path<String>,
Json(json): Json<Value>,
Json(retention): Json<Retention>,
) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();

// For query mode, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
if PARSEABLE.options.mode == Mode::Query {
match PARSEABLE
.create_stream_and_schema_from_storage(&stream_name)
.await
{
Ok(true) => {}
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
}
if PARSEABLE.check_or_load_stream(&stream_name).await {
return Err(StreamNotFound(stream_name).into());
}
let stream = PARSEABLE.get_stream(&stream_name)?;

let retention: Retention = match serde_json::from_value(json) {
Ok(retention) => retention,
Err(err) => return Err(StreamError::InvalidRetentionConfig(err)),
};

PARSEABLE
.storage
.get_object_store()
.put_retention(&stream_name, &retention)
.await?;

stream.set_retention(retention);
PARSEABLE.get_stream(&stream_name)?.set_retention(retention);

Ok((
format!("set retention configuration for log stream {stream_name}"),
Expand Down Expand Up @@ -250,21 +228,11 @@ pub async fn get_stats(
) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();

if !PARSEABLE.streams.contains(&stream_name) {
// For query mode, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
if PARSEABLE.options.mode == Mode::Query {
match PARSEABLE
.create_stream_and_schema_from_storage(&stream_name)
.await
{
Ok(true) => {}
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
}
} else {
return Err(StreamNotFound(stream_name).into());
}
// For query mode, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
if PARSEABLE.check_or_load_stream(&stream_name).await {
return Err(StreamNotFound(stream_name.clone()).into());
}

let query_string = req.query_string();
Expand Down Expand Up @@ -356,19 +324,13 @@ pub async fn get_stats(

pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();
if !PARSEABLE.streams.contains(&stream_name) {
if PARSEABLE.options.mode == Mode::Query {
match PARSEABLE
.create_stream_and_schema_from_storage(&stream_name)
.await
{
Ok(true) => {}
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
}
} else {
return Err(StreamNotFound(stream_name).into());
}
// For query mode, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
if PARSEABLE.check_or_load_stream(&stream_name).await {
return Err(StreamNotFound(stream_name.clone()).into());
}

let storage = PARSEABLE.storage.get_object_store();
// if first_event_at is not found in memory map, check if it exists in the storage
// if it exists in the storage, update the first_event_at in memory map
Expand Down Expand Up @@ -417,14 +379,8 @@ pub async fn put_stream_hot_tier(
// For query mode, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
if PARSEABLE.options.mode == Mode::Query {
match PARSEABLE
.create_stream_and_schema_from_storage(&stream_name)
.await
{
Ok(true) => {}
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
}
if PARSEABLE.check_or_load_stream(&stream_name).await {
return Err(StreamNotFound(stream_name).into());
}

let stream = PARSEABLE.get_stream(&stream_name)?;
Expand Down Expand Up @@ -467,21 +423,11 @@ pub async fn put_stream_hot_tier(
pub async fn get_stream_hot_tier(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();

if !PARSEABLE.streams.contains(&stream_name) {
// For query mode, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
if PARSEABLE.options.mode == Mode::Query {
match PARSEABLE
.create_stream_and_schema_from_storage(&stream_name)
.await
{
Ok(true) => {}
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
}
} else {
return Err(StreamNotFound(stream_name).into());
}
// For query mode, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
if PARSEABLE.check_or_load_stream(&stream_name).await {
return Err(StreamNotFound(stream_name.clone()).into());
}

let Some(hot_tier_manager) = HotTierManager::global() else {
Expand All @@ -500,14 +446,8 @@ pub async fn delete_stream_hot_tier(
// For query mode, if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
if PARSEABLE.options.mode == Mode::Query {
match PARSEABLE
.create_stream_and_schema_from_storage(&stream_name)
.await
{
Ok(true) => {}
Ok(false) | Err(_) => return Err(StreamNotFound(stream_name.clone()).into()),
}
if PARSEABLE.check_or_load_stream(&stream_name).await {
return Err(StreamNotFound(stream_name).into());
}

if PARSEABLE.get_stream(&stream_name)?.get_stream_type() == StreamType::Internal {
Expand Down
Loading
Loading