diff --git a/server/src/main.rs b/server/src/main.rs index 954ed6ddd..f0a1d087c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -55,9 +55,9 @@ use crate::localcache::LocalCacheManager; async fn main() -> anyhow::Result<()> { env_logger::init(); let storage = CONFIG.storage().get_object_store(); + CONFIG.validate().await?; migration::run_metadata_migration(&CONFIG).await?; let metadata = storage::resolve_parseable_metadata().await?; - CONFIG.validate_staging()?; banner::print(&CONFIG, &metadata).await; rbac::map::init(&metadata); metadata.set_global(); diff --git a/server/src/migration.rs b/server/src/migration.rs index 899d86eae..5484e84c3 100644 --- a/server/src/migration.rs +++ b/server/src/migration.rs @@ -32,6 +32,8 @@ use crate::{ storage::{ObjectStorage, ObjectStorageError}, }; +/// Migrate the metdata from v1 or v2 to v3 +/// This is a one time migration pub async fn run_metadata_migration(config: &Config) -> anyhow::Result<()> { let object_store = config.storage().get_object_store(); let storage_metadata = get_storage_metadata(&*object_store).await?; diff --git a/server/src/option.rs b/server/src/option.rs index 8b3983170..50dab8878 100644 --- a/server/src/option.rs +++ b/server/src/option.rs @@ -27,8 +27,7 @@ use std::sync::Arc; use url::Url; use crate::oidc::{self, OpenidConfig}; -use crate::storage::{FSConfig, ObjectStorageProvider, S3Config}; -use crate::utils::validate_path_is_writeable; +use crate::storage::{FSConfig, ObjectStorageError, ObjectStorageProvider, S3Config}; pub const MIN_CACHE_SIZE_BYTES: u64 = 1000u64.pow(3); // 1 GiB @@ -99,9 +98,36 @@ impl Config { } } - pub fn validate_staging(&self) -> anyhow::Result<()> { - let staging_path = self.staging_dir(); - validate_path_is_writeable(staging_path) + pub async fn validate(&self) -> Result<(), ObjectStorageError> { + let obj_store = self.storage.get_object_store(); + let rel_path = relative_path::RelativePathBuf::from(".parseable.json"); + + let has_parseable_json = obj_store.get_object(&rel_path).await.is_ok(); + + let has_dirs = match obj_store.list_dirs_in_storage().await { + Ok(dirs) => !dirs.is_empty(), + Err(_) => false, + }; + + let has_streams = obj_store.list_streams().await.is_ok(); + + if !has_dirs || has_parseable_json && has_streams { + Ok(()) + } else if has_parseable_json && !has_streams { + Err(ObjectStorageError::Custom( + "Could not start the server because storage contains stale data from previous deployment, please choose an empty storage and restart the server.\nJoin us on Parseable Slack to report this incident : launchpass.com/parseable" + .to_owned(), + )) + } else if !has_parseable_json && !has_streams && has_dirs { + Err(ObjectStorageError::Custom( + "Could not start the server because storage contains some stale data, please provide an empty storage and restart the server.\nJoin us on Parseable Slack to report this incident : launchpass.com/parseable".to_owned(), + )) + } else { + Err(ObjectStorageError::Custom( + "Could not start the server because storage contains stale data from previous deployment.\nJoin us on Parseable Slack to report this incident : launchpass.com/parseable" + .to_owned() + )) + } } pub fn storage(&self) -> Arc { diff --git a/server/src/storage/localfs.rs b/server/src/storage/localfs.rs index df88499a9..fad4538ea 100644 --- a/server/src/storage/localfs.rs +++ b/server/src/storage/localfs.rs @@ -32,7 +32,7 @@ use tokio::fs::{self, DirEntry}; use tokio_stream::wrappers::ReadDirStream; use crate::metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics}; -use crate::{option::validation, utils::validate_path_is_writeable}; +use crate::option::validation; use super::{object_storage, LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider}; @@ -139,8 +139,8 @@ impl ObjectStorage for LocalFS { } async fn check(&self) -> Result<(), ObjectStorageError> { - fs::create_dir_all(&self.root).await?; - validate_path_is_writeable(&self.root) + fs::create_dir_all(&self.root) + .await .map_err(|e| ObjectStorageError::UnhandledError(e.into())) } @@ -169,6 +169,23 @@ impl ObjectStorage for LocalFS { Ok(logstreams) } + async fn list_dirs_in_storage(&self) -> Result, ObjectStorageError> { + let dirs = ReadDirStream::new(fs::read_dir(&self.root).await?) + .try_collect::>() + .await? + .into_iter() + .map(dir_name); + + let dirs = FuturesUnordered::from_iter(dirs) + .try_collect::>() + .await? + .into_iter() + .flatten() + .collect::>(); + + Ok(dirs) + } + async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError> { let path = self.root.join(stream_name); let directories = ReadDirStream::new(fs::read_dir(&path).await?); diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index be4b1c1c6..cd0051345 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -75,6 +75,7 @@ pub trait ObjectStorage: Sync + 'static { async fn check(&self) -> Result<(), ObjectStorageError>; async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError>; async fn list_streams(&self) -> Result, ObjectStorageError>; + async fn list_dirs_in_storage(&self) -> Result, ObjectStorageError>; async fn list_dates(&self, stream_name: &str) -> Result, ObjectStorageError>; async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>; diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index af171bfae..dbd7dc915 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -470,6 +470,18 @@ impl ObjectStorage for S3 { fn store_url(&self) -> url::Url { url::Url::parse(&format!("s3://{}", self.bucket)).unwrap() } + + async fn list_dirs_in_storage(&self) -> Result, ObjectStorageError> { + let pre = object_store::path::Path::from("/"); + let resp = self.client.list_with_delimiter(Some(&pre)).await?; + + Ok(resp + .common_prefixes + .iter() + .flat_map(|path| path.parts()) + .map(|name| name.as_ref().to_string()) + .collect::>()) + } } impl From for ObjectStorageError { diff --git a/server/src/storage/store_metadata.rs b/server/src/storage/store_metadata.rs index 0e9ad955f..137cf1e24 100644 --- a/server/src/storage/store_metadata.rs +++ b/server/src/storage/store_metadata.rs @@ -92,8 +92,8 @@ impl StorageMetadata { } } -// always returns remote metadata as it is source of truth -// overwrites staging metadata while updating storage info +/// always returns remote metadata as it is source of truth +/// overwrites staging metadata while updating storage info pub async fn resolve_parseable_metadata() -> Result { let staging_metadata = get_staging_metadata()?; let storage = CONFIG.storage().get_object_store(); @@ -168,7 +168,7 @@ pub async fn resolve_parseable_metadata() -> Result String { s[0..1].to_uppercase() + &s[1..] } -pub fn validate_path_is_writeable(path: &Path) -> anyhow::Result<()> { - let Ok(md) = std::fs::metadata(path) else { - anyhow::bail!("Could not read metadata for staging dir") - }; - let permissions = md.permissions(); - if permissions.readonly() { - anyhow::bail!("Staging directory {} is not writable", path.display()) - } - Ok(()) -} - /// Convert minutes to a slot range /// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19" pub fn minute_to_slot(minute: u32, data_granularity: u32) -> Option { @@ -263,7 +250,7 @@ mod tests { ] )] #[case::same_hour_with_00_to_59_minute_block( - "2022-06-11T16:00:00+00:00", "2022-06-11T16:59:59+00:00", + "2022-06-11T16:00:00+00:00", "2022-06-11T16:59:59+00:00", &["date=2022-06-11/hour=16/"] )] #[case::same_date_different_hours_coherent_minute( @@ -274,14 +261,14 @@ mod tests { ] )] #[case::same_date_different_hours_incoherent_minutes( - "2022-06-11T15:59:00+00:00", "2022-06-11T16:01:00+00:00", + "2022-06-11T15:59:00+00:00", "2022-06-11T16:01:00+00:00", &[ "date=2022-06-11/hour=15/minute=59/", "date=2022-06-11/hour=16/minute=00/" ] )] #[case::same_date_different_hours_whole_hours_between_incoherent_minutes( - "2022-06-11T15:59:00+00:00", "2022-06-11T17:01:00+00:00", + "2022-06-11T15:59:00+00:00", "2022-06-11T17:01:00+00:00", &[ "date=2022-06-11/hour=15/minute=59/", "date=2022-06-11/hour=16/", @@ -289,14 +276,14 @@ mod tests { ] )] #[case::different_date_coherent_hours_and_minutes( - "2022-06-11T00:00:00+00:00", "2022-06-13T00:00:00+00:00", + "2022-06-11T00:00:00+00:00", "2022-06-13T00:00:00+00:00", &[ "date=2022-06-11/", "date=2022-06-12/" ] )] #[case::different_date_incoherent_hours_coherent_minutes( - "2022-06-11T23:00:01+00:00", "2022-06-12T01:59:59+00:00", + "2022-06-11T23:00:01+00:00", "2022-06-12T01:59:59+00:00", &[ "date=2022-06-11/hour=23/", "date=2022-06-12/hour=00/", @@ -304,7 +291,7 @@ mod tests { ] )] #[case::different_date_incoherent_hours_incoherent_minutes( - "2022-06-11T23:59:59+00:00", "2022-06-12T00:01:00+00:00", + "2022-06-11T23:59:59+00:00", "2022-06-12T00:01:00+00:00", &[ "date=2022-06-11/hour=23/minute=59/", "date=2022-06-12/hour=00/minute=00/"