Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,9 @@ async fn create_manifest(
}
};
first_event_at = Some(lower_bound.with_timezone(&Local).to_rfc3339());
if let Err(err) = STREAM_INFO.set_first_event_at(stream_name, first_event_at.clone()) {
if let Err(err) =
STREAM_INFO.set_first_event_at(stream_name, first_event_at.as_ref().unwrap())
{
error!(
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
stream_name
Expand Down Expand Up @@ -330,8 +332,8 @@ pub async fn remove_manifest_from_snapshot(
let manifests = &mut meta.snapshot.manifest_list;
// Filter out items whose manifest_path contains any of the dates_to_delete
manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));
STREAM_INFO.reset_first_event_at(stream_name)?;
meta.first_event_at = None;
STREAM_INFO.set_first_event_at(stream_name, None)?;
storage.put_snapshot(stream_name, meta.snapshot).await?;
}
match CONFIG.options.mode {
Expand Down Expand Up @@ -391,7 +393,7 @@ pub async fn get_first_event(
first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
meta.first_event_at = Some(first_event_at.clone());
storage.put_stream_manifest(stream_name, &meta).await?;
STREAM_INFO.set_first_event_at(stream_name, Some(first_event_at.clone()))?;
STREAM_INFO.set_first_event_at(stream_name, &first_event_at)?;
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ impl EventFormat for Event {
};

if value_arr
.iter()
.any(|value| fields_mismatch(&schema, value, schema_version))
.iter()
.any(|value| fields_mismatch(&schema, value, schema_version))
{
return Err(anyhow!(
"Could not process this event due to mismatch in datatype"
Expand Down
7 changes: 2 additions & 5 deletions src/event/format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,8 @@ pub trait EventFormat: Sized {
time_partition: Option<&String>,
schema_version: SchemaVersion,
) -> Result<(RecordBatch, bool), AnyError> {
let (data, mut schema, is_first) = self.to_data(
storage_schema,
time_partition,
schema_version,
)?;
let (data, mut schema, is_first) =
self.to_data(storage_schema, time_partition, schema_version)?;

if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() {
return Err(anyhow!(
Expand Down
33 changes: 15 additions & 18 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, St
use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME};
use super::ingest::create_stream_if_not_exists;
use super::modal::utils::logstream_utils::{
create_stream_and_schema_from_storage, create_update_stream,
create_stream_and_schema_from_storage, create_update_stream, update_first_event_at,
};
use super::query::update_schema_when_distributed;
use crate::alerts::Alerts;
use crate::catalog::get_first_event;
use crate::event::format::{override_data_type, LogSource};
use crate::handlers::STREAM_TYPE_KEY;
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
Expand Down Expand Up @@ -57,7 +56,7 @@ use std::fs;
use std::num::NonZeroU32;
use std::str::FromStr;
use std::sync::Arc;
use tracing::{error, warn};
use tracing::warn;

pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();
Expand Down Expand Up @@ -550,19 +549,19 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
return Err(StreamError::StreamNotFound(stream_name));
}
}

let store = CONFIG.storage().get_object_store();
let dates: Vec<String> = Vec::new();
if let Ok(Some(first_event_at)) = get_first_event(store, &stream_name, dates).await {
if let Err(err) =
metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at))
let storage = CONFIG.storage().get_object_store();
// if first_event_at is not found in memory map, check if it exists in the storage
// if it exists in the storage, update the first_event_at in memory map
let stream_first_event_at =
if let Ok(Some(first_event_at)) = STREAM_INFO.get_first_event(&stream_name) {
Some(first_event_at)
} else if let Ok(Some(first_event_at)) =
storage.get_first_event_from_storage(&stream_name).await
{
error!(
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
stream_name
);
}
}
update_first_event_at(&stream_name, &first_event_at).await
} else {
None
};

let hash_map = STREAM_INFO.read().unwrap();
let stream_meta = &hash_map
Expand All @@ -572,7 +571,7 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
let stream_info: StreamInfo = StreamInfo {
stream_type: stream_meta.stream_type.clone(),
created_at: stream_meta.created_at.clone(),
first_event_at: stream_meta.first_event_at.clone(),
first_event_at: stream_first_event_at,
time_partition: stream_meta.time_partition.clone(),
time_partition_limit: stream_meta
.time_partition_limit
Expand All @@ -582,8 +581,6 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
log_source: stream_meta.log_source.clone(),
};

// get the other info from

Ok((web::Json(stream_info), StatusCode::OK))
}

Expand Down
2 changes: 1 addition & 1 deletion src/handlers/http/modal/ingest_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ impl IngestServer {
web::put()
.to(ingestor_logstream::put_stream)
.authorize_for_stream(Action::CreateStream),
)
),
)
.service(
// GET "/logstream/{logstream}/info" ==> Get info for given log stream
Expand Down
54 changes: 54 additions & 0 deletions src/handlers/http/modal/utils/logstream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::{
storage::{LogStream, ObjectStoreFormat, StreamType},
validator,
};
use tracing::error;

pub async fn create_update_stream(
headers: &HeaderMap,
Expand Down Expand Up @@ -508,3 +509,56 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<

Ok(true)
}

/// Updates the first-event-at in storage and logstream metadata for the specified stream.
///
/// This function updates the `first-event-at` in both the object store and the stream info metadata.
/// If either update fails, an error is logged, but the function will still return the `first-event-at`.
///
/// # Arguments
///
/// * `stream_name` - The name of the stream to update.
/// * `first_event_at` - The value of first-event-at.
///
/// # Returns
///
/// * `Option<String>` - Returns `Some(String)` with the provided timestamp if the update is successful,
/// or `None` if an error occurs.
///
/// # Errors
///
/// This function logs an error if:
/// * The `first-event-at` cannot be updated in the object store.
/// * The `first-event-at` cannot be updated in the stream info.
///
/// # Examples
///```ignore
/// ```rust
/// use parseable::handlers::http::modal::utils::logstream_utils::update_first_event_at;
/// let result = update_first_event_at("my_stream", "2023-01-01T00:00:00Z").await;
/// match result {
/// Some(timestamp) => println!("first-event-at: {}", timestamp),
/// None => eprintln!("Failed to update first-event-at"),
/// }
/// ```
pub async fn update_first_event_at(stream_name: &str, first_event_at: &str) -> Option<String> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should pass DateTime/NaiveDateTime instead of using &str to represent this information?

let storage = CONFIG.storage().get_object_store();
if let Err(err) = storage
.update_first_event_in_stream(stream_name, first_event_at)
.await
{
error!(
"Failed to update first_event_at in storage for stream {:?}: {err:?}",
stream_name
);
}

if let Err(err) = metadata::STREAM_INFO.set_first_event_at(stream_name, first_event_at) {
error!(
"Failed to update first_event_at in stream info for stream {:?}: {err:?}",
stream_name
);
}

Some(first_event_at.to_string())
}
37 changes: 35 additions & 2 deletions src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,13 +212,46 @@ impl StreamInfo {
pub fn set_first_event_at(
&self,
stream_name: &str,
first_event_at: Option<String>,
first_event_at: &str,
) -> Result<(), MetadataError> {
let mut map = self.write().expect(LOCK_EXPECT);
map.get_mut(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| {
metadata.first_event_at = first_event_at;
metadata.first_event_at = Some(first_event_at.to_owned());
})
}

/// Removes the `first_event_at` timestamp for the specified stream from the LogStreamMetadata.
///
/// This function is called during the retention task, when the parquet files along with the manifest files are deleted from the storage.
/// The manifest path is removed from the snapshot in the stream.json
/// and the first_event_at value in the stream.json is removed.
///
/// # Arguments
///
/// * `stream_name` - The name of the stream for which the `first_event_at` timestamp is to be removed.
///
/// # Returns
///
/// * `Result<(), MetadataError>` - Returns `Ok(())` if the `first_event_at` timestamp is successfully removed,
/// or a `MetadataError` if the stream metadata is not found.
///
/// # Examples
/// ```ignore
/// ```rust
/// let result = metadata.remove_first_event_at("my_stream");
/// match result {
/// Ok(()) => println!("first-event-at removed successfully"),
/// Err(e) => eprintln!("Error removing first-event-at from STREAM_INFO: {}", e),
/// }
/// ```
pub fn reset_first_event_at(&self, stream_name: &str) -> Result<(), MetadataError> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel a good comment would explain where we are using this function, e.g. when retention threshold is met and we are removing first event time since that information is outdated.

let mut map = self.write().expect(LOCK_EXPECT);
map.get_mut(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| {
metadata.first_event_at.take();
})
}

Expand Down
110 changes: 109 additions & 1 deletion src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use actix_web_prometheus::PrometheusMetrics;
use arrow_schema::Schema;
use async_trait::async_trait;
use bytes::Bytes;
use chrono::Local;
use chrono::{DateTime, Local, Utc};
use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder};
use once_cell::sync::OnceCell;
use relative_path::RelativePath;
Expand Down Expand Up @@ -217,6 +217,42 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
Ok(())
}

/// Updates the first event timestamp in the object store for the specified stream.
///
/// This function retrieves the current object-store format for the given stream,
/// updates the `first_event_at` field with the provided timestamp, and then
/// stores the updated format back in the object store.
///
/// # Arguments
///
/// * `stream_name` - The name of the stream to update.
/// * `first_event` - The timestamp of the first event to set.
///
/// # Returns
///
/// * `Result<(), ObjectStorageError>` - Returns `Ok(())` if the update is successful,
/// or an `ObjectStorageError` if an error occurs.
///
/// # Examples
/// ```ignore
/// ```rust
/// let result = object_store.update_first_event_in_stream("my_stream", "2023-01-01T00:00:00Z").await;
/// assert!(result.is_ok());
/// ```
async fn update_first_event_in_stream(
&self,
stream_name: &str,
first_event: &str,
) -> Result<(), ObjectStorageError> {
let mut format = self.get_object_store_format(stream_name).await?;
format.first_event_at = Some(first_event.to_string());
let format_json = to_bytes(&format);
self.put_object(&stream_json_path(stream_name), format_json)
.await?;

Ok(())
}

async fn put_alerts(
&self,
stream_name: &str,
Expand Down Expand Up @@ -623,6 +659,78 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
Ok(())
}

async fn get_stream_meta_from_storage(
&self,
stream_name: &str,
) -> Result<Vec<ObjectStoreFormat>, ObjectStorageError> {
let mut stream_metas = vec![];
let stream_meta_bytes = self
.get_objects(
Some(&RelativePathBuf::from_iter([
stream_name,
STREAM_ROOT_DIRECTORY,
])),
Box::new(|file_name| file_name.ends_with("stream.json")),
)
.await;
if let Ok(stream_meta_bytes) = stream_meta_bytes {
for stream_meta in stream_meta_bytes {
let stream_meta_ob = serde_json::from_slice::<ObjectStoreFormat>(&stream_meta)?;
stream_metas.push(stream_meta_ob);
}
}

Ok(stream_metas)
}

/// Retrieves the earliest first-event-at from the storage for the specified stream.
///
/// This function fetches the object-store format from all the stream.json files for the given stream from the storage,
/// extracts the `first_event_at` timestamps, and returns the earliest `first_event_at`.
///
/// # Arguments
///
/// * `stream_name` - The name of the stream for which `first_event_at` is to be retrieved.
///
/// # Returns
///
/// * `Result<Option<String>, ObjectStorageError>` - Returns `Ok(Some(String))` with the earliest
/// first event timestamp if found, `Ok(None)` if no timestamps are found, or an `ObjectStorageError`
/// if an error occurs.
///
/// # Examples
/// ```ignore
/// ```rust
/// let result = get_first_event_from_storage("my_stream").await;
/// match result {
/// Ok(Some(first_event)) => println!("first-event-at: {}", first_event),
/// Ok(None) => println!("first-event-at not found"),
/// Err(err) => println!("Error: {:?}", err),
/// }
/// ```
async fn get_first_event_from_storage(
&self,
stream_name: &str,
) -> Result<Option<String>, ObjectStorageError> {
let mut all_first_events = vec![];
let stream_metas = self.get_stream_meta_from_storage(stream_name).await;
if let Ok(stream_metas) = stream_metas {
for stream_meta in stream_metas.iter() {
if let Some(first_event) = &stream_meta.first_event_at {
let first_event = DateTime::parse_from_rfc3339(first_event).unwrap();
let first_event = first_event.with_timezone(&Utc);
all_first_events.push(first_event);
}
}
}

if all_first_events.is_empty() {
return Ok(None);
}
let first_event_at = all_first_events.iter().min().unwrap().to_rfc3339();
Ok(Some(first_event_at))
}

// pick a better name
fn get_bucket_name(&self) -> String;
}
Expand Down
4 changes: 2 additions & 2 deletions src/storage/retention.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,9 @@ mod action {
return;
}
}
if let Ok(first_event_at) = res_remove_manifest {
if let Ok(Some(first_event_at)) = res_remove_manifest {
if let Err(err) =
metadata::STREAM_INFO.set_first_event_at(&stream_name, first_event_at)
metadata::STREAM_INFO.set_first_event_at(&stream_name, &first_event_at)
{
error!(
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
Expand Down
Loading