From 363fe999d3894e7ea8d94cbcf68eedb3479315ca Mon Sep 17 00:00:00 2001 From: anant Date: Thu, 23 Oct 2025 07:52:03 +0530 Subject: [PATCH] Updates for Keystone - Added methods for Overview, Keystone, and Conversation in Metastore - Introduced a JSON error struct --- src/metastore/metastore_traits.rs | 21 +++- .../metastores/object_store_metastore.rs | 98 ++++++++++++++++++- src/prism/logstream/mod.rs | 2 +- src/utils/error.rs | 36 +++++++ src/utils/mod.rs | 1 + 5 files changed, 155 insertions(+), 3 deletions(-) create mode 100644 src/utils/error.rs diff --git a/src/metastore/metastore_traits.rs b/src/metastore/metastore_traits.rs index 9a76815ff..27e56e547 100644 --- a/src/metastore/metastore_traits.rs +++ b/src/metastore/metastore_traits.rs @@ -16,7 +16,7 @@ * */ -use std::collections::{BTreeMap, HashSet}; +use std::collections::{BTreeMap, HashMap, HashSet}; use arrow_schema::Schema; use bytes::Bytes; @@ -44,6 +44,25 @@ pub trait Metastore: std::fmt::Debug + Send + Sync { async fn initiate_connection(&self) -> Result<(), MetastoreError>; async fn get_objects(&self, parent_path: &str) -> Result, MetastoreError>; + /// overview + async fn get_overviews(&self) -> Result>, MetastoreError>; + async fn put_overview( + &self, + obj: &dyn MetastoreObject, + stream: &str, + ) -> Result<(), MetastoreError>; + async fn delete_overview(&self, stream: &str) -> Result<(), MetastoreError>; + + /// keystone + async fn get_keystones(&self) -> Result, MetastoreError>; + async fn put_keystone(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>; + async fn delete_keystone(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>; + + /// conversations + async fn get_conversations(&self) -> Result, MetastoreError>; + async fn put_conversation(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>; + async fn delete_conversation(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>; + /// alerts async fn get_alerts(&self) -> Result, MetastoreError>; async fn put_alert(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError>; diff --git a/src/metastore/metastores/object_store_metastore.rs b/src/metastore/metastores/object_store_metastore.rs index fee493160..b93984f05 100644 --- a/src/metastore/metastores/object_store_metastore.rs +++ b/src/metastore/metastores/object_store_metastore.rs @@ -17,7 +17,7 @@ */ use std::{ - collections::{BTreeMap, HashSet}, + collections::{BTreeMap, HashMap, HashSet}, sync::Arc, }; @@ -80,6 +80,102 @@ impl Metastore for ObjectStoreMetastore { .await?) } + /// This function fetches all the overviews from the underlying object store + async fn get_overviews(&self) -> Result>, MetastoreError> { + let streams = self.list_streams().await?; + + let mut all_overviews = HashMap::new(); + for stream in streams { + let overview_path = RelativePathBuf::from_iter([&stream, "overview"]); + + // if the file doesn't exist, load an empty overview + let overview = (self.storage.get_object(&overview_path).await).ok(); + + all_overviews.insert(stream, overview); + } + + Ok(all_overviews) + } + + /// This function puts an overview in the object store at the given path + async fn put_overview( + &self, + obj: &dyn MetastoreObject, + stream: &str, + ) -> Result<(), MetastoreError> { + let path = RelativePathBuf::from_iter([stream, "overview"]); + Ok(self.storage.put_object(&path, to_bytes(obj)).await?) + } + + /// Delete an overview + async fn delete_overview(&self, stream: &str) -> Result<(), MetastoreError> { + let path = RelativePathBuf::from_iter([stream, "overview"]); + Ok(self + .storage + .delete_object(&path) + .await?) + } + + /// This function fetches all the keystones from the underlying object store + async fn get_keystones(&self) -> Result, MetastoreError> { + let keystone_path = RelativePathBuf::from_iter([".keystone"]); + let keystones = self + .storage + .get_objects( + Some(&keystone_path), + Box::new(|file_name| { + file_name.ends_with(".json") && !file_name.starts_with("conv_") + }), + ) + .await?; + + Ok(keystones) + } + + /// This function puts a keystone in the object store at the given path + async fn put_keystone(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError> { + let id = obj.get_object_id(); + let path = RelativePathBuf::from_iter([".keystone", &format!("{id}.json")]); + Ok(self.storage.put_object(&path, to_bytes(obj)).await?) + } + + /// Delete a keystone + async fn delete_keystone(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError> { + let id = obj.get_object_id(); + let path = RelativePathBuf::from_iter([".keystone", &format!("{id}.json")]); + Ok(self.storage.delete_object(&path).await?) + } + + /// This function fetches all the conversations from the underlying object store + async fn get_conversations(&self) -> Result, MetastoreError> { + let keystone_path = RelativePathBuf::from_iter([".keystone"]); + let conversations = self + .storage + .get_objects( + Some(&keystone_path), + Box::new(|file_name| { + file_name.ends_with(".json") && file_name.starts_with("conv_") + }), + ) + .await?; + + Ok(conversations) + } + + /// This function puts a conversation in the object store at the given path + async fn put_conversation(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError> { + let id = obj.get_object_id(); + let path = RelativePathBuf::from_iter([".keystone", &format!("conv_{id}.json")]); + Ok(self.storage.put_object(&path, to_bytes(obj)).await?) + } + + /// Delete a conversation + async fn delete_conversation(&self, obj: &dyn MetastoreObject) -> Result<(), MetastoreError> { + let id = obj.get_object_id(); + let path = RelativePathBuf::from_iter([".keystone", &format!("conv_{id}.json")]); + Ok(self.storage.delete_object(&path).await?) + } + /// This function fetches all the alerts from the underlying object store async fn get_alerts(&self) -> Result, MetastoreError> { let alerts_path = RelativePathBuf::from(ALERTS_ROOT_DIRECTORY); diff --git a/src/prism/logstream/mod.rs b/src/prism/logstream/mod.rs index 70bd9f7a4..8effd291d 100644 --- a/src/prism/logstream/mod.rs +++ b/src/prism/logstream/mod.rs @@ -144,7 +144,7 @@ async fn get_stats(stream_name: &str) -> Result Result { +pub async fn get_stream_info_helper(stream_name: &str) -> Result { // For query mode, if the stream not found in memory map, //check if it exists in the storage //create stream and schema from storage diff --git a/src/utils/error.rs b/src/utils/error.rs new file mode 100644 index 000000000..1a392f80a --- /dev/null +++ b/src/utils/error.rs @@ -0,0 +1,36 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use http::StatusCode; +use serde::Serialize; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize)] +pub struct DetailedError { + pub operation: String, + pub message: String, + pub stream_name: Option, + pub timestamp: chrono::DateTime, + pub metadata: Option>, + pub status_code: u16, +} + +impl DetailedError { + pub fn status_code(&self) -> StatusCode { + StatusCode::from_u16(self.status_code).unwrap() + } +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index b0a49d201..1d84558ab 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -18,6 +18,7 @@ pub mod actix; pub mod arrow; +pub mod error; pub mod header_parsing; pub mod human_size; pub mod json;