Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ use crate::localcache::LocalCacheManager;
async fn main() -> anyhow::Result<()> {
env_logger::init();
let storage = CONFIG.storage().get_object_store();
CONFIG.validate().await?;
migration::run_metadata_migration(&CONFIG).await?;
let metadata = storage::resolve_parseable_metadata().await?;
CONFIG.validate_staging()?;
banner::print(&CONFIG, &metadata).await;
rbac::map::init(&metadata);
metadata.set_global();
Expand Down
2 changes: 2 additions & 0 deletions server/src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ use crate::{
storage::{ObjectStorage, ObjectStorageError},
};

/// Migrate the metdata from v1 or v2 to v3
/// This is a one time migration
pub async fn run_metadata_migration(config: &Config) -> anyhow::Result<()> {
let object_store = config.storage().get_object_store();
let storage_metadata = get_storage_metadata(&*object_store).await?;
Expand Down
36 changes: 31 additions & 5 deletions server/src/option.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ use std::sync::Arc;
use url::Url;

use crate::oidc::{self, OpenidConfig};
use crate::storage::{FSConfig, ObjectStorageProvider, S3Config};
use crate::utils::validate_path_is_writeable;
use crate::storage::{FSConfig, ObjectStorageError, ObjectStorageProvider, S3Config};

pub const MIN_CACHE_SIZE_BYTES: u64 = 1000u64.pow(3); // 1 GiB

Expand Down Expand Up @@ -99,9 +98,36 @@ impl Config {
}
}

pub fn validate_staging(&self) -> anyhow::Result<()> {
let staging_path = self.staging_dir();
validate_path_is_writeable(staging_path)
pub async fn validate(&self) -> Result<(), ObjectStorageError> {
let obj_store = self.storage.get_object_store();
let rel_path = relative_path::RelativePathBuf::from(".parseable.json");

let has_parseable_json = obj_store.get_object(&rel_path).await.is_ok();

let has_dirs = match obj_store.list_dirs_in_storage().await {
Ok(dirs) => !dirs.is_empty(),
Err(_) => false,
};

let has_streams = obj_store.list_streams().await.is_ok();

if !has_dirs || has_parseable_json && has_streams {
Ok(())
} else if has_parseable_json && !has_streams {
Err(ObjectStorageError::Custom(
"Could not start the server because storage contains stale data from previous deployment, please choose an empty storage and restart the server.\nJoin us on Parseable Slack to report this incident : launchpass.com/parseable"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Eshanatnight please update to below error message

"Could not start the server because storage indicates stale data from previous deployment, please choose an empty storage and restart the server.\nJoin us on Parseable Slack to report this incident : launchpass.com/parseable"

.to_owned(),
))
} else if !has_parseable_json && !has_streams && has_dirs {
Err(ObjectStorageError::Custom(
"Could not start the server because storage contains some stale data, please provide an empty storage and restart the server.\nJoin us on Parseable Slack to report this incident : launchpass.com/parseable".to_owned(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Eshanatnight please update to below error

"Could not start the server because storage indicates stale data not related to parseable, please choose an empty storage and restart the server.\nJoin us on Parseable Slack to report this incident : launchpass.com/parseable"

))
} else {
Err(ObjectStorageError::Custom(
"Could not start the server because storage contains stale data from previous deployment.\nJoin us on Parseable Slack to report this incident : launchpass.com/parseable"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Eshanatnight please update to below error

"Could not start the server because storage indicates stale data from previous deployment, please choose an empty storage and restart the server.\nJoin us on Parseable Slack to report this incident : launchpass.com/parseable"

.to_owned()
))
}
}

pub fn storage(&self) -> Arc<dyn ObjectStorageProvider + Send + Sync> {
Expand Down
23 changes: 20 additions & 3 deletions server/src/storage/localfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use tokio::fs::{self, DirEntry};
use tokio_stream::wrappers::ReadDirStream;

use crate::metrics::storage::{localfs::REQUEST_RESPONSE_TIME, StorageMetrics};
use crate::{option::validation, utils::validate_path_is_writeable};
use crate::option::validation;

use super::{object_storage, LogStream, ObjectStorage, ObjectStorageError, ObjectStorageProvider};

Expand Down Expand Up @@ -139,8 +139,8 @@ impl ObjectStorage for LocalFS {
}

async fn check(&self) -> Result<(), ObjectStorageError> {
fs::create_dir_all(&self.root).await?;
validate_path_is_writeable(&self.root)
fs::create_dir_all(&self.root)
.await
.map_err(|e| ObjectStorageError::UnhandledError(e.into()))
}

Expand Down Expand Up @@ -169,6 +169,23 @@ impl ObjectStorage for LocalFS {
Ok(logstreams)
}

async fn list_dirs_in_storage(&self) -> Result<Vec<String>, ObjectStorageError> {
let dirs = ReadDirStream::new(fs::read_dir(&self.root).await?)
.try_collect::<Vec<DirEntry>>()
.await?
.into_iter()
.map(dir_name);

let dirs = FuturesUnordered::from_iter(dirs)
.try_collect::<Vec<_>>()
.await?
.into_iter()
.flatten()
.collect::<Vec<_>>();

Ok(dirs)
}

async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError> {
let path = self.root.join(stream_name);
let directories = ReadDirStream::new(fs::read_dir(&path).await?);
Expand Down
1 change: 1 addition & 0 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ pub trait ObjectStorage: Sync + 'static {
async fn check(&self) -> Result<(), ObjectStorageError>;
async fn delete_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError>;
async fn list_streams(&self) -> Result<Vec<LogStream>, ObjectStorageError>;
async fn list_dirs_in_storage(&self) -> Result<Vec<String>, ObjectStorageError>;
async fn list_dates(&self, stream_name: &str) -> Result<Vec<String>, ObjectStorageError>;
async fn upload_file(&self, key: &str, path: &Path) -> Result<(), ObjectStorageError>;

Expand Down
12 changes: 12 additions & 0 deletions server/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,18 @@ impl ObjectStorage for S3 {
fn store_url(&self) -> url::Url {
url::Url::parse(&format!("s3://{}", self.bucket)).unwrap()
}

async fn list_dirs_in_storage(&self) -> Result<Vec<String>, ObjectStorageError> {
let pre = object_store::path::Path::from("/");
let resp = self.client.list_with_delimiter(Some(&pre)).await?;

Ok(resp
.common_prefixes
.iter()
.flat_map(|path| path.parts())
.map(|name| name.as_ref().to_string())
.collect::<Vec<_>>())
}
}

impl From<object_store::Error> for ObjectStorageError {
Expand Down
6 changes: 3 additions & 3 deletions server/src/storage/store_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ impl StorageMetadata {
}
}

// always returns remote metadata as it is source of truth
// overwrites staging metadata while updating storage info
/// always returns remote metadata as it is source of truth
/// overwrites staging metadata while updating storage info
pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStorageError> {
let staging_metadata = get_staging_metadata()?;
let storage = CONFIG.storage().get_object_store();
Expand Down Expand Up @@ -168,7 +168,7 @@ pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStora
// variant contain remote metadata
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EnvChange {
/// No change in env i.e both staging and remote have same id
/// No change in env i.e both staging and remote have same id
/// or deployment id of staging is not matching with that of remote
None(StorageMetadata),
/// Metadata not found in storage. Treated as possible misconfiguration on user side.
Expand Down
25 changes: 6 additions & 19 deletions server/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ pub mod json;
pub mod uid;
pub mod update;

use std::path::Path;

use chrono::{DateTime, NaiveDate, Timelike, Utc};

#[allow(dead_code)]
Expand All @@ -43,17 +41,6 @@ pub fn capitalize_ascii(s: &str) -> String {
s[0..1].to_uppercase() + &s[1..]
}

pub fn validate_path_is_writeable(path: &Path) -> anyhow::Result<()> {
let Ok(md) = std::fs::metadata(path) else {
anyhow::bail!("Could not read metadata for staging dir")
};
let permissions = md.permissions();
if permissions.readonly() {
anyhow::bail!("Staging directory {} is not writable", path.display())
}
Ok(())
}

/// Convert minutes to a slot range
/// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19"
pub fn minute_to_slot(minute: u32, data_granularity: u32) -> Option<String> {
Expand Down Expand Up @@ -263,7 +250,7 @@ mod tests {
]
)]
#[case::same_hour_with_00_to_59_minute_block(
"2022-06-11T16:00:00+00:00", "2022-06-11T16:59:59+00:00",
"2022-06-11T16:00:00+00:00", "2022-06-11T16:59:59+00:00",
&["date=2022-06-11/hour=16/"]
)]
#[case::same_date_different_hours_coherent_minute(
Expand All @@ -274,37 +261,37 @@ mod tests {
]
)]
#[case::same_date_different_hours_incoherent_minutes(
"2022-06-11T15:59:00+00:00", "2022-06-11T16:01:00+00:00",
"2022-06-11T15:59:00+00:00", "2022-06-11T16:01:00+00:00",
&[
"date=2022-06-11/hour=15/minute=59/",
"date=2022-06-11/hour=16/minute=00/"
]
)]
#[case::same_date_different_hours_whole_hours_between_incoherent_minutes(
"2022-06-11T15:59:00+00:00", "2022-06-11T17:01:00+00:00",
"2022-06-11T15:59:00+00:00", "2022-06-11T17:01:00+00:00",
&[
"date=2022-06-11/hour=15/minute=59/",
"date=2022-06-11/hour=16/",
"date=2022-06-11/hour=17/minute=00/"
]
)]
#[case::different_date_coherent_hours_and_minutes(
"2022-06-11T00:00:00+00:00", "2022-06-13T00:00:00+00:00",
"2022-06-11T00:00:00+00:00", "2022-06-13T00:00:00+00:00",
&[
"date=2022-06-11/",
"date=2022-06-12/"
]
)]
#[case::different_date_incoherent_hours_coherent_minutes(
"2022-06-11T23:00:01+00:00", "2022-06-12T01:59:59+00:00",
"2022-06-11T23:00:01+00:00", "2022-06-12T01:59:59+00:00",
&[
"date=2022-06-11/hour=23/",
"date=2022-06-12/hour=00/",
"date=2022-06-12/hour=01/"
]
)]
#[case::different_date_incoherent_hours_incoherent_minutes(
"2022-06-11T23:59:59+00:00", "2022-06-12T00:01:00+00:00",
"2022-06-11T23:59:59+00:00", "2022-06-12T00:01:00+00:00",
&[
"date=2022-06-11/hour=23/minute=59/",
"date=2022-06-12/hour=00/minute=00/"
Expand Down