From 98d7a43d9e2c6b93804e4c26c089eadeb5756b2e Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 16 Jan 2024 14:57:02 +0530 Subject: [PATCH 01/15] Add Trait to get the list of directories --- server/src/storage/object_storage.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index be4b1c1c6..cd0051345 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -75,6 +75,7 @@ pub trait ObjectStorage: Sync + 'static { async fn check(&self) -> Result<(), ObjectStorageError>; async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError>; async fn list_streams(&self) -> Result, ObjectStorageError>; + async fn list_dirs_in_storage(&self) -> Result, ObjectStorageError>; async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError>; async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>; From a5306f6bc9474ec04668e92b57ae0618729651bf Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 16 Jan 2024 14:57:58 +0530 Subject: [PATCH 02/15] Impl list_dirs_in_storage trait for localfs --- server/src/storage/localfs.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index df88499a9..ff9d9c98d 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -169,6 +169,23 @@ impl ObjectStorage for LocalFS { Ok(logstreams) } + async fn list_dirs_in_storage(&self) -> Result, ObjectStorageError> { + let dirs = ReadDirStream::new(fs::read_dir(&self.root).await?) + .try_collect::>() + .await? + .into_iter() + .map(dir_name); + + let dirs = FuturesUnordered::from_iter(dirs) + .try_collect::>() + .await? + .into_iter() + .flatten() + .collect::>(); + + Ok(dirs) + } + async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError> { let path = self.root.join(stream_name); let directories = ReadDirStream::new(fs::read_dir(&path).await?); From e5581b0199cec11f9e2da38ae742b34cece3c81a Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 16 Jan 2024 14:58:17 +0530 Subject: [PATCH 03/15] Impl list_dirs_in_storage trait for s3 --- server/src/storage/s3.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index af171bfae..dbd7dc915 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -470,6 +470,18 @@ impl ObjectStorage for S3 { fn store_url(&self) -> url::Url { url::Url::parse(&format!("s3://{}", self.bucket)).unwrap() } + + async fn list_dirs_in_storage(&self) -> Result, ObjectStorageError> { + let pre = object_store::path::Path::from("/"); + let resp = self.client.list_with_delimiter(Some(&pre)).await?; + + Ok(resp + .common_prefixes + .iter() + .flat_map(|path| path.parts()) + .map(|name| name.as_ref().to_string()) + .collect::>()) + } } impl From for ObjectStorageError { From 691e93284cb57d2ffc2073b830b6795f215c67a6 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 16 Jan 2024 14:58:56 +0530 Subject: [PATCH 04/15] Add a storage validation function --- server/src/option.rs | 33 ++++++++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/server/src/option.rs b/server/src/option.rs index 8b3983170..ccd02724b 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -27,7 +27,7 @@ use std::sync::Arc; use url::Url; use crate::oidc::{self, OpenidConfig}; -use crate::storage::{FSConfig, ObjectStorageProvider, S3Config}; +use crate::storage::{FSConfig, ObjectStorageError, ObjectStorageProvider, S3Config}; use crate::utils::validate_path_is_writeable; pub const MIN_CACHE_SIZE_BYTES: u64 = 1000u64.pow(3); // 1 GiB @@ -104,6 +104,37 @@ impl Config { validate_path_is_writeable(staging_path) } + pub async fn validate_storage(&self) -> Result<(), ObjectStorageError> { + let obj_store = self.storage.get_object_store(); + let rel_path = relative_path::RelativePathBuf::from(".parseable.json"); + + let has_parseable_json = obj_store.get_object(&rel_path).await.is_ok(); + + let has_dirs = match obj_store.list_dirs_in_storage().await { + Ok(dirs) => dirs.is_empty(), + Err(_) => false, + }; + + let has_streams = obj_store.list_streams().await.is_ok(); + + if has_dirs || has_parseable_json && has_streams { + Ok(()) + } else if has_parseable_json && !has_streams { + Err(ObjectStorageError::Custom( + "Parseable config found, but the Storage contains some stale data.".to_owned(), + )) + } else if !has_parseable_json && !has_streams && !has_dirs { + Err(ObjectStorageError::Custom( + "Storage contains some stale data, Please provide an Empty Storage".to_owned(), + )) + } else { + Err(ObjectStorageError::Custom( + "Parseable config is missing, but streams are present in Storage.\nJoin us on Parseable Slack" + .to_owned() + )) + } + } + pub fn storage(&self) -> Arc { self.storage.clone() } From 3be5d05efaf499e1da07d1bd23a58fabecedded8 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 16 Jan 2024 14:59:39 +0530 Subject: [PATCH 05/15] Add Storage Validation Step --- server/src/main.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/main.rs b/server/src/main.rs index 954ed6ddd..74e5a07d7 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -58,6 +58,7 @@ async fn main() -> anyhow::Result<()> { migration::run_metadata_migration(&CONFIG).await?; let metadata = storage::resolve_parseable_metadata().await?; CONFIG.validate_staging()?; + CONFIG.validate_storage().await?; banner::print(&CONFIG, &metadata).await; rbac::map::init(&metadata); metadata.set_global(); From 90ae9bab193de9f545cad7972ba05d20afabb8fd Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 17 Jan 2024 17:05:56 +0530 Subject: [PATCH 06/15] Changed the Error Message when using a storage from a previous deployment --- server/src/option.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/option.rs b/server/src/option.rs index ccd02724b..7f15472f2 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -121,7 +121,8 @@ impl Config { Ok(()) } else if has_parseable_json && !has_streams { Err(ObjectStorageError::Custom( - "Parseable config found, but the Storage contains some stale data.".to_owned(), + "Could not start the server because storage contains stale data from previous deployment, please choose an empty storage and restart the server. Join us on Parseable Slack to report this incident : launchpass.com/parseable" + .to_owned(), )) } else if !has_parseable_json && !has_streams && !has_dirs { Err(ObjectStorageError::Custom( From 421f89de2479cf06a585d95b169e8217058e996c Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Wed, 17 Jan 2024 17:08:51 +0530 Subject: [PATCH 07/15] Change var name to indicate proper behavior --- server/src/option.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/option.rs b/server/src/option.rs index 7f15472f2..9cb4499e6 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -111,20 +111,20 @@ impl Config { let has_parseable_json = obj_store.get_object(&rel_path).await.is_ok(); let has_dirs = match obj_store.list_dirs_in_storage().await { - Ok(dirs) => dirs.is_empty(), + Ok(dirs) => !dirs.is_empty(), Err(_) => false, }; let has_streams = obj_store.list_streams().await.is_ok(); - if has_dirs || has_parseable_json && has_streams { + if !has_dirs || has_parseable_json && has_streams { Ok(()) } else if has_parseable_json && !has_streams { Err(ObjectStorageError::Custom( "Could not start the server because storage contains stale data from previous deployment, please choose an empty storage and restart the server. Join us on Parseable Slack to report this incident : launchpass.com/parseable" .to_owned(), )) - } else if !has_parseable_json && !has_streams && !has_dirs { + } else if !has_parseable_json && !has_streams && has_dirs { Err(ObjectStorageError::Custom( "Storage contains some stale data, Please provide an Empty Storage".to_owned(), )) From fb50327f6642f60d70b4e4886bf5d2ab017adda9 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Mon, 22 Jan 2024 12:15:31 +0530 Subject: [PATCH 08/15] update error messages --- server/src/option.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/option.rs b/server/src/option.rs index 9cb4499e6..9c976513b 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -126,11 +126,11 @@ impl Config { )) } else if !has_parseable_json && !has_streams && has_dirs { Err(ObjectStorageError::Custom( - "Storage contains some stale data, Please provide an Empty Storage".to_owned(), + "Could not start the server because storage contains some stale data, Please provide an Empty Storage".to_owned(), )) } else { Err(ObjectStorageError::Custom( - "Parseable config is missing, but streams are present in Storage.\nJoin us on Parseable Slack" + "Could not start the server because storage contains stale data from previous deployment.\nJoin us on Parseable Slack" .to_owned() )) } From 22a6acd8628258393d03b32ab9dffe1424ccf6ec Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Mon, 22 Jan 2024 12:16:07 +0530 Subject: [PATCH 09/15] fix: validate storage should happen before metadata migration --- server/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main.rs b/server/src/main.rs index 74e5a07d7..51f625a0f 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -55,10 +55,10 @@ use crate::localcache::LocalCacheManager; async fn main() -> anyhow::Result<()> { env_logger::init(); let storage = CONFIG.storage().get_object_store(); + CONFIG.validate_storage().await?; migration::run_metadata_migration(&CONFIG).await?; let metadata = storage::resolve_parseable_metadata().await?; CONFIG.validate_staging()?; - CONFIG.validate_storage().await?; banner::print(&CONFIG, &metadata).await; rbac::map::init(&metadata); metadata.set_global(); From ae44f8d6a0bc94a4dec64482b3def3660eb1e6ef Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Mon, 22 Jan 2024 12:20:29 +0530 Subject: [PATCH 10/15] Made Error Messages Consistant --- server/src/option.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/option.rs b/server/src/option.rs index 9c976513b..4324123b7 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -121,16 +121,16 @@ impl Config { Ok(()) } else if has_parseable_json && !has_streams { Err(ObjectStorageError::Custom( - "Could not start the server because storage contains stale data from previous deployment, please choose an empty storage and restart the server. Join us on Parseable Slack to report this incident : launchpass.com/parseable" + "Could not start the server because storage contains stale data from previous deployment, please choose an empty storage and restart the server.\nJoin us on Parseable Slack to report this incident : launchpass.com/parseable" .to_owned(), )) } else if !has_parseable_json && !has_streams && has_dirs { Err(ObjectStorageError::Custom( - "Could not start the server because storage contains some stale data, Please provide an Empty Storage".to_owned(), + "Could not start the server because storage contains some stale data, Please provide an Empty Storage.\nJoin us on Parseable Slack to report this incident : launchpass.com/parseable".to_owned(), )) } else { Err(ObjectStorageError::Custom( - "Could not start the server because storage contains stale data from previous deployment.\nJoin us on Parseable Slack" + "Could not start the server because storage contains stale data from previous deployment.\nJoin us on Parseable Slack to report this incident : launchpass.com/parseable" .to_owned() )) } From e2fe24dc7b3d6896671e8f29aedc7ccebf5a3b10 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Mon, 22 Jan 2024 12:53:39 +0530 Subject: [PATCH 11/15] update error message --- server/src/option.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/option.rs b/server/src/option.rs index 4324123b7..579550b0c 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -126,7 +126,7 @@ impl Config { )) } else if !has_parseable_json && !has_streams && has_dirs { Err(ObjectStorageError::Custom( - "Could not start the server because storage contains some stale data, Please provide an Empty Storage.\nJoin us on Parseable Slack to report this incident : launchpass.com/parseable".to_owned(), + "Could not start the server because storage contains some stale data, please provide an empty storage and restart the server.\nJoin us on Parseable Slack to report this incident : launchpass.com/parseable".to_owned(), )) } else { Err(ObjectStorageError::Custom( From 5f1fc55e2760395f9d04d563a910a3ade61e5c4a Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Mon, 22 Jan 2024 13:25:08 +0530 Subject: [PATCH 12/15] update code documentation --- server/src/migration.rs | 2 ++ server/src/storage/store_metadata.rs | 6 +++--- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/server/src/migration.rs b/server/src/migration.rs index 899d86eae..5484e84c3 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -32,6 +32,8 @@ use crate::{ storage::{ObjectStorage, ObjectStorageError}, }; +/// Migrate the metdata from v1 or v2 to v3 +/// This is a one time migration pub async fn run_metadata_migration(config: &Config) -> anyhow::Result<()> { let object_store = config.storage().get_object_store(); let storage_metadata = get_storage_metadata(&*object_store).await?; diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index 0e9ad955f..137cf1e24 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -92,8 +92,8 @@ impl StorageMetadata { } } -// always returns remote metadata as it is source of truth -// overwrites staging metadata while updating storage info +/// always returns remote metadata as it is source of truth +/// overwrites staging metadata while updating storage info pub async fn resolve_parseable_metadata() -> Result { let staging_metadata = get_staging_metadata()?; let storage = CONFIG.storage().get_object_store(); @@ -168,7 +168,7 @@ pub async fn resolve_parseable_metadata() -> Result Date: Mon, 22 Jan 2024 13:25:27 +0530 Subject: [PATCH 13/15] remove redundent function --- server/src/option.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/server/src/option.rs b/server/src/option.rs index 579550b0c..ff9d21375 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -99,12 +99,10 @@ impl Config { } } - pub fn validate_staging(&self) -> anyhow::Result<()> { - let staging_path = self.staging_dir(); - validate_path_is_writeable(staging_path) - } - pub async fn validate_storage(&self) -> Result<(), ObjectStorageError> { + + + pub async fn validate(&self) -> Result<(), ObjectStorageError> { let obj_store = self.storage.get_object_store(); let rel_path = relative_path::RelativePathBuf::from(".parseable.json"); From f0370e83d21109ac7691b00394e5260982655ec6 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Mon, 22 Jan 2024 13:25:49 +0530 Subject: [PATCH 14/15] removal of redundent function from main --- server/src/main.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/server/src/main.rs b/server/src/main.rs index 51f625a0f..f0a1d087c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -55,10 +55,9 @@ use crate::localcache::LocalCacheManager; async fn main() -> anyhow::Result<()> { env_logger::init(); let storage = CONFIG.storage().get_object_store(); - CONFIG.validate_storage().await?; + CONFIG.validate().await?; migration::run_metadata_migration(&CONFIG).await?; let metadata = storage::resolve_parseable_metadata().await?; - CONFIG.validate_staging()?; banner::print(&CONFIG, &metadata).await; rbac::map::init(&metadata); metadata.set_global(); From d43728c4b46ce632e0206b3a64318e3544e24033 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Mon, 22 Jan 2024 13:35:49 +0530 Subject: [PATCH 15/15] Removed redundent function --- server/src/option.rs | 4 ---- server/src/storage/localfs.rs | 6 +++--- server/src/utils.rs | 25 ++++++------------------- 3 files changed, 9 insertions(+), 26 deletions(-) diff --git a/server/src/option.rs b/server/src/option.rs index ff9d21375..50dab8878 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -28,7 +28,6 @@ use url::Url; use crate::oidc::{self, OpenidConfig}; use crate::storage::{FSConfig, ObjectStorageError, ObjectStorageProvider, S3Config}; -use crate::utils::validate_path_is_writeable; pub const MIN_CACHE_SIZE_BYTES: u64 = 1000u64.pow(3); // 1 GiB @@ -99,9 +98,6 @@ impl Config { } } - - - pub async fn validate(&self) -> Result<(), ObjectStorageError> { let obj_store = self.storage.get_object_store(); let rel_path = relative_path::RelativePathBuf::from(".parseable.json"); diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index ff9d9c98d..fad4538ea 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -32,7 +32,7 @@ use tokio::fs::{self, DirEntry}; use tokio_stream::wrappers::ReadDirStream; use crate::metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics}; -use crate::{option::validation, utils::validate_path_is_writeable}; +use crate::option::validation; use super::{object_storage, LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider}; @@ -139,8 +139,8 @@ impl ObjectStorage for LocalFS { } async fn check(&self) -> Result<(), ObjectStorageError> { - fs::create_dir_all(&self.root).await?; - validate_path_is_writeable(&self.root) + fs::create_dir_all(&self.root) + .await .map_err(|e| ObjectStorageError::UnhandledError(e.into())) } diff --git a/server/src/utils.rs b/server/src/utils.rs index 58f0c3eee..83af01cc6 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -23,8 +23,6 @@ pub mod json; pub mod uid; pub mod update; -use std::path::Path; - use chrono::{DateTime, NaiveDate, Timelike, Utc}; #[allow(dead_code)] @@ -43,17 +41,6 @@ pub fn capitalize_ascii(s: &str) -> String { s[0..1].to_uppercase() + &s[1..] } -pub fn validate_path_is_writeable(path: &Path) -> anyhow::Result<()> { - let Ok(md) = std::fs::metadata(path) else { - anyhow::bail!("Could not read metadata for staging dir") - }; - let permissions = md.permissions(); - if permissions.readonly() { - anyhow::bail!("Staging directory {} is not writable", path.display()) - } - Ok(()) -} - /// Convert minutes to a slot range /// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19" pub fn minute_to_slot(minute: u32, data_granularity: u32) -> Option { @@ -263,7 +250,7 @@ mod tests { ] )] #[case::same_hour_with_00_to_59_minute_block( - "2022-06-11T16:00:00+00:00", "2022-06-11T16:59:59+00:00", + "2022-06-11T16:00:00+00:00", "2022-06-11T16:59:59+00:00", &["date=2022-06-11/hour=16/"] )] #[case::same_date_different_hours_coherent_minute( @@ -274,14 +261,14 @@ mod tests { ] )] #[case::same_date_different_hours_incoherent_minutes( - "2022-06-11T15:59:00+00:00", "2022-06-11T16:01:00+00:00", + "2022-06-11T15:59:00+00:00", "2022-06-11T16:01:00+00:00", &[ "date=2022-06-11/hour=15/minute=59/", "date=2022-06-11/hour=16/minute=00/" ] )] #[case::same_date_different_hours_whole_hours_between_incoherent_minutes( - "2022-06-11T15:59:00+00:00", "2022-06-11T17:01:00+00:00", + "2022-06-11T15:59:00+00:00", "2022-06-11T17:01:00+00:00", &[ "date=2022-06-11/hour=15/minute=59/", "date=2022-06-11/hour=16/", @@ -289,14 +276,14 @@ mod tests { ] )] #[case::different_date_coherent_hours_and_minutes( - "2022-06-11T00:00:00+00:00", "2022-06-13T00:00:00+00:00", + "2022-06-11T00:00:00+00:00", "2022-06-13T00:00:00+00:00", &[ "date=2022-06-11/", "date=2022-06-12/" ] )] #[case::different_date_incoherent_hours_coherent_minutes( - "2022-06-11T23:00:01+00:00", "2022-06-12T01:59:59+00:00", + "2022-06-11T23:00:01+00:00", "2022-06-12T01:59:59+00:00", &[ "date=2022-06-11/hour=23/", "date=2022-06-12/hour=00/", @@ -304,7 +291,7 @@ mod tests { ] )] #[case::different_date_incoherent_hours_incoherent_minutes( - "2022-06-11T23:59:59+00:00", "2022-06-12T00:01:00+00:00", + "2022-06-11T23:59:59+00:00", "2022-06-12T00:01:00+00:00", &[ "date=2022-06-11/hour=23/minute=59/", "date=2022-06-12/hour=00/minute=00/"