diff --git a/rust/crates/sift_rs/src/wrappers/ingestion_configs.rs b/rust/crates/sift_rs/src/wrappers/ingestion_configs.rs index 434bf297c..272439275 100644 --- a/rust/crates/sift_rs/src/wrappers/ingestion_configs.rs +++ b/rust/crates/sift_rs/src/wrappers/ingestion_configs.rs @@ -42,11 +42,9 @@ pub trait IngestionConfigServiceWrapper: /// Create [FlowConfig]s for a given ingestion config. If this function does not return an /// error, then it is safe to assume that all [FlowConfig]s in `configs` was created. - async fn try_create_flows( - &mut self, - ingestion_config_id: &str, - configs: &[FlowConfig], - ) -> Result<()>; + async fn try_create_flows(&mut self, ingestion_config_id: &str, configs: I) -> Result<()> + where + I: Into> + Send; /// Retrieve all flows that satisfy the provided filter. async fn try_filter_flows( @@ -121,15 +119,14 @@ impl IngestionConfigServiceWrapper for IngestionConfigServiceImpl { /// Create [FlowConfig]s for a given ingestion config. If this function does not return an /// error, then it is safe to assume that all [FlowConfig]s in `configs` was created. - async fn try_create_flows( - &mut self, - ingestion_config_id: &str, - configs: &[FlowConfig], - ) -> Result<()> { + async fn try_create_flows(&mut self, ingestion_config_id: &str, configs: I) -> Result<()> + where + I: Into> + Send, + { let _ = self .create_ingestion_config_flows(CreateIngestionConfigFlowsRequest { ingestion_config_id: ingestion_config_id.to_string(), - flows: configs.to_vec(), + flows: configs.into(), }) .await .map_err(|e| Error::new(ErrorKind::CreateFlowError, e))?; diff --git a/rust/crates/sift_stream/src/stream/builder.rs b/rust/crates/sift_stream/src/stream/builder.rs index eff47b943..bc0eb00f8 100644 --- a/rust/crates/sift_stream/src/stream/builder.rs +++ b/rust/crates/sift_stream/src/stream/builder.rs @@ -541,6 +541,16 @@ impl SiftStreamBuilder { .try_filter_flows(&ingestion_config.ingestion_config_id, &filter) .await?; + // If no flows are provided, use the existing flows in Sift to populate the local flow cache. + if flows.is_empty() { + #[cfg(feature = "tracing")] + tracing::info!( + ingestion_config_id = ingestion_config.ingestion_config_id, + "no flows provided, using existing flows in Sift to populate the local flow cache" + ); + return Ok((ingestion_config, existing_flows, asset)); + } + let mut flows_to_create: Vec = Vec::new(); for flow in &flows { @@ -565,7 +575,10 @@ impl SiftStreamBuilder { if !flows_to_create.is_empty() { let _ = ingestion_config_service - .try_create_flows(&ingestion_config.ingestion_config_id, &flows_to_create) + .try_create_flows( + &ingestion_config.ingestion_config_id, + flows_to_create.as_slice(), + ) .await; #[cfg(feature = "tracing")] diff --git a/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs b/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs index 608ff2b26..f8f06019f 100644 --- a/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs +++ b/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs @@ -298,17 +298,42 @@ impl SiftStream { /// Modify the existing ingestion config by adding new flows that weren't accounted for during /// initialization. pub async fn add_new_flows(&mut self, flow_configs: &[FlowConfig]) -> Result<()> { + // Filter out flows that already exist. + let filtered = flow_configs + .iter() + .filter(|f| !self.mode.flows_by_name.contains_key(&f.name)) + .collect::>(); + + // If no new flows are provided, return early. + if filtered.is_empty() { + return Ok(()); + } + + #[cfg(feature = "tracing")] + tracing::info!( + ingestion_config_id = self.mode.ingestion_config.ingestion_config_id, + new_flows = filtered + .iter() + .map(|f| f.name.as_str()) + .collect::>() + .join(","), + "adding new flows to ingestion config" + ); + new_ingestion_config_service(self.grpc_channel.clone()) .try_create_flows( &self.mode.ingestion_config.ingestion_config_id, - flow_configs, + filtered + .iter() + .map(|f| (*f).clone()) + .collect::>(), ) .await .context("SiftStream::add_new_flows")?; - self.metrics.loaded_flows.add(flow_configs.len() as u64); + self.metrics.loaded_flows.add(filtered.len() as u64); - for flow_config in flow_configs { + for flow_config in filtered { self.mode .flows_by_name .entry(flow_config.name.clone()) diff --git a/rust/crates/sift_stream/src/stream/test.rs b/rust/crates/sift_stream/src/stream/test.rs index 9615e7270..cc01daed2 100644 --- a/rust/crates/sift_stream/src/stream/test.rs +++ b/rust/crates/sift_stream/src/stream/test.rs @@ -6,6 +6,8 @@ use crate::backup::DiskBackupPolicy; use crate::{ ChannelValue, Flow, IngestionConfigForm, RecoveryStrategy, RunForm, SiftStreamBuilder, }; +use sift_rs::common::r#type::v1::ChannelDataType; +use sift_rs::ingestion_configs::v2::{ChannelConfig, FlowConfig}; use tempdir::TempDir; use tracing_test::traced_test; @@ -44,6 +46,7 @@ async fn test_sift_stream_builder_backup_manager_directory_naming_with_run() { retry_policy, disk_backup_policy, }) + .metrics_streaming_interval(None) .build() .await .expect("failed to build sift stream"); @@ -72,7 +75,7 @@ async fn test_sift_stream_builder_backup_manager_directory_naming_with_run() { let test_dir = fs::read_dir(tmp_dir_path) .expect("failed to read backups directory") .collect::>(); - assert_eq!(test_dir.len(), 1); + assert_eq!(test_dir.len(), 1, "{:?}", test_dir); // The first subdirectory should be the asset name. let asset_dir = test_dir[0].as_ref().expect("failed to get file"); @@ -122,6 +125,7 @@ async fn test_sift_stream_builder_backup_manager_directory_naming_no_run() { retry_policy, disk_backup_policy, }) + .metrics_streaming_interval(None) .build() .await .expect("failed to build sift stream"); @@ -230,3 +234,168 @@ async fn test_sift_stream_drop_without_finish() { .await .expect("timeout waiting for tasks to shutdown"); } + +#[tokio::test] +async fn test_sift_stream_builder_load_ingestion_config_with_no_flows() { + let backups_dir = uuid::Uuid::new_v4().to_string(); + + let tmp_dir = TempDir::new(&backups_dir).expect("failed to creat tempdir"); + let tmp_dir_path = tmp_dir.path(); + + let ingestion_config = IngestionConfigForm { + asset_name: "already_exists_asset".to_string(), + client_key: "already_exists_client_key".to_string(), + flows: vec![], + }; + let disk_backup_policy = DiskBackupPolicy { + backups_dir: Some(tmp_dir_path.to_path_buf()), + retain_backups: true, + ..Default::default() + }; + let retry_policy = crate::RetryPolicy::default(); + let (grpc_channel, _mock_service) = crate::test::create_mock_grpc_channel_with_service().await; + + let mut sift_stream = SiftStreamBuilder::from_channel(grpc_channel) + .ingestion_config(ingestion_config) + .recovery_strategy(RecoveryStrategy::RetryWithBackups { + retry_policy, + disk_backup_policy, + }) + .build() + .await + .expect("failed to build sift stream"); + + // The mock sift server should have returned 1 flow. + let flows = sift_stream.get_flows(); + assert_eq!(flows.len(), 1); + + let existing_flow = FlowConfig { + name: "already_exists_flow".to_string(), + channels: vec![ChannelConfig { + name: "channel1".to_string(), + data_type: ChannelDataType::Double.into(), + ..Default::default() + }], + }; + + // Add the existing flow again to ensure it is not added again. + assert!(sift_stream.add_new_flows(&[existing_flow]).await.is_ok()); + let flows = sift_stream.get_flows(); + assert_eq!(flows.len(), 1); + + sift_stream + .finish() + .await + .expect("failed to finish sift stream"); +} + +#[tokio::test] +async fn test_sift_stream_builder_load_ingestion_config_with_flows() { + let backups_dir = uuid::Uuid::new_v4().to_string(); + + let tmp_dir = TempDir::new(&backups_dir).expect("failed to creat tempdir"); + let tmp_dir_path = tmp_dir.path(); + + let existing_flow = FlowConfig { + name: "already_exists_flow".to_string(), + channels: vec![ChannelConfig { + name: "channel1".to_string(), + data_type: ChannelDataType::Double.into(), + ..Default::default() + }], + }; + + let ingestion_config = IngestionConfigForm { + asset_name: "test_asset".to_string(), + client_key: "test_client_key".to_string(), + flows: vec![existing_flow.clone()], + }; + let disk_backup_policy = DiskBackupPolicy { + backups_dir: Some(tmp_dir_path.to_path_buf()), + retain_backups: true, + ..Default::default() + }; + let retry_policy = crate::RetryPolicy::default(); + let (grpc_channel, _mock_service) = crate::test::create_mock_grpc_channel_with_service().await; + + let mut sift_stream = SiftStreamBuilder::from_channel(grpc_channel) + .ingestion_config(ingestion_config) + .recovery_strategy(RecoveryStrategy::RetryWithBackups { + retry_policy, + disk_backup_policy, + }) + .build() + .await + .expect("failed to build sift stream"); + + // The mock sift server should have returned 1 flow. + let flows = sift_stream.get_flows(); + assert_eq!(flows.len(), 1); + + // Add the existing flow again to ensure it is not added again. + assert!(sift_stream.add_new_flows(&[existing_flow]).await.is_ok()); + let flows = sift_stream.get_flows(); + assert_eq!(flows.len(), 1); +} + +#[tokio::test] +async fn test_sift_stream_builder_load_ingestion_config_with_new_flows() { + let backups_dir = uuid::Uuid::new_v4().to_string(); + + let tmp_dir = TempDir::new(&backups_dir).expect("failed to creat tempdir"); + let tmp_dir_path = tmp_dir.path(); + + let new_flow = FlowConfig { + name: "new_flow".to_string(), + channels: vec![ChannelConfig { + name: "channel-new".to_string(), + data_type: ChannelDataType::Uint32.into(), + ..Default::default() + }], + }; + + let ingestion_config = IngestionConfigForm { + asset_name: "test_asset".to_string(), + client_key: "test_client_key".to_string(), + flows: vec![new_flow.clone()], + }; + let disk_backup_policy = DiskBackupPolicy { + backups_dir: Some(tmp_dir_path.to_path_buf()), + retain_backups: true, + ..Default::default() + }; + let retry_policy = crate::RetryPolicy::default(); + let (grpc_channel, _mock_service) = crate::test::create_mock_grpc_channel_with_service().await; + + let mut sift_stream = SiftStreamBuilder::from_channel(grpc_channel) + .ingestion_config(ingestion_config) + .recovery_strategy(RecoveryStrategy::RetryWithBackups { + retry_policy, + disk_backup_policy, + }) + .build() + .await + .expect("failed to build sift stream"); + + // The mock sift server should have returned 1 flow. + let flows = sift_stream.get_flows(); + assert_eq!(flows.len(), 1); + + // Add the existing flow again to ensure it is not added again. + assert!(sift_stream.add_new_flows(&[new_flow]).await.is_ok()); + let flows = sift_stream.get_flows(); + assert_eq!(flows.len(), 1); + + // Add another new flow to ensure it is added. + let new_flow2 = FlowConfig { + name: "new_flow2".to_string(), + channels: vec![ChannelConfig { + name: "channel-new2".to_string(), + data_type: ChannelDataType::Uint32.into(), + ..Default::default() + }], + }; + assert!(sift_stream.add_new_flows(&[new_flow2]).await.is_ok()); + let flows = sift_stream.get_flows(); + assert_eq!(flows.len(), 2); +} diff --git a/rust/crates/sift_stream/src/test.rs b/rust/crates/sift_stream/src/test.rs index 4135df2bd..32f26fb01 100644 --- a/rust/crates/sift_stream/src/test.rs +++ b/rust/crates/sift_stream/src/test.rs @@ -8,6 +8,7 @@ use sift_rs::assets::v1::{ GetAssetRequest, GetAssetResponse, ListAssetsRequest, ListAssetsResponse, UpdateAssetRequest, UpdateAssetResponse, }; +use sift_rs::common::r#type::v1::ChannelDataType; use sift_rs::ingest::v1::IngestWithConfigDataStreamRequest; use sift_rs::ingest::v1::{ IngestWithConfigDataStreamResponse, @@ -17,10 +18,11 @@ use sift_rs::ingestion_configs::v2::ingestion_config_service_server::{ IngestionConfigService, IngestionConfigServiceServer, }; use sift_rs::ingestion_configs::v2::{ - CreateIngestionConfigFlowsRequest, CreateIngestionConfigFlowsResponse, - CreateIngestionConfigRequest, CreateIngestionConfigResponse, GetIngestionConfigRequest, - GetIngestionConfigResponse, IngestionConfig, ListIngestionConfigFlowsRequest, - ListIngestionConfigFlowsResponse, ListIngestionConfigsRequest, ListIngestionConfigsResponse, + ChannelConfig, CreateIngestionConfigFlowsRequest, CreateIngestionConfigFlowsResponse, + CreateIngestionConfigRequest, CreateIngestionConfigResponse, FlowConfig, + GetIngestionConfigRequest, GetIngestionConfigResponse, IngestionConfig, + ListIngestionConfigFlowsRequest, ListIngestionConfigFlowsResponse, ListIngestionConfigsRequest, + ListIngestionConfigsResponse, }; use sift_rs::ping::v1::ping_service_server::{PingService, PingServiceServer}; use sift_rs::ping::v1::{PingRequest, PingResponse}; @@ -36,6 +38,7 @@ use std::sync::{Arc, Mutex}; use tonic::transport::{Endpoint, Server, Uri}; use tonic::{Request, Response, Status}; use tower::{ServiceBuilder, service_fn}; +use uuid::Uuid; pub(crate) struct MockPingService; @@ -48,59 +51,129 @@ impl PingService for MockPingService { } } -pub(crate) struct MockIngestionConfigService; +pub(crate) struct MockIngestionConfigService { + existing_flows: Arc>>, + existing_ingestion_configs: Arc>>, +} + +impl Default for MockIngestionConfigService { + fn default() -> Self { + let existing_flow = FlowConfig { + name: "already_exists_flow".to_string(), + channels: vec![ChannelConfig { + name: "channel1".to_string(), + data_type: ChannelDataType::Double.into(), + ..Default::default() + }], + }; + let existing_ingestion_config = IngestionConfig { + ingestion_config_id: Uuid::new_v4().to_string(), + asset_id: "already_exists_asset".to_string(), + client_key: "already_exists_client_key".to_string(), + }; + Self { + existing_flows: Arc::new(Mutex::new(vec![existing_flow])), + existing_ingestion_configs: Arc::new(Mutex::new(vec![existing_ingestion_config])), + } + } +} #[tonic::async_trait] impl IngestionConfigService for MockIngestionConfigService { async fn get_ingestion_config( &self, - _: Request, + request: Request, ) -> Result, Status> { - Ok(Response::new(GetIngestionConfigResponse { - ingestion_config: Some(IngestionConfig { - ingestion_config_id: "123".to_string(), - asset_id: "123".to_string(), - client_key: "test_client_key".to_string(), - }), - })) + let get_ingestion_config = request.into_inner(); + let existing_ingestion_configs = self.existing_ingestion_configs.lock().unwrap(); + let ingestion_config = existing_ingestion_configs + .iter() + .find(|ic| ic.ingestion_config_id == get_ingestion_config.ingestion_config_id); + if let Some(ingestion_config) = ingestion_config { + return Ok(Response::new(GetIngestionConfigResponse { + ingestion_config: Some(ingestion_config.clone()), + })); + } + + Err(Status::not_found("ingestion config not found")) } async fn create_ingestion_config( &self, - _: Request, + request: Request, ) -> Result, Status> { + let create_ingestion_config = request.into_inner(); + + let mut existing_ingestion_configs = self.existing_ingestion_configs.lock().unwrap(); + for config in existing_ingestion_configs.iter() { + if config.client_key == create_ingestion_config.client_key { + return Err(Status::already_exists("ingestion config already exists")); + } + } + + let new_config = IngestionConfig { + ingestion_config_id: Uuid::new_v4().to_string(), + asset_id: create_ingestion_config.asset_name.clone(), + client_key: create_ingestion_config.client_key.clone(), + }; + + existing_ingestion_configs.push(new_config.clone()); + Ok(Response::new(CreateIngestionConfigResponse { - ingestion_config: Some(IngestionConfig { - ingestion_config_id: "123".to_string(), - asset_id: "123".to_string(), - client_key: "test_client_key".to_string(), - }), + ingestion_config: Some(new_config), })) } async fn list_ingestion_configs( &self, - _: Request, + request: Request, ) -> Result, Status> { + let list_configs: ListIngestionConfigsRequest = request.into_inner(); + + let existing_ingestion_configs = self.existing_ingestion_configs.lock().unwrap(); + + let mut ingestion_configs = Vec::new(); + for config in existing_ingestion_configs.iter() { + if list_configs.filter.is_empty() || list_configs.filter.contains(&config.client_key) { + ingestion_configs.push(config.clone()); + } + } + Ok(Response::new(ListIngestionConfigsResponse { - ingestion_configs: vec![IngestionConfig { - ingestion_config_id: "123".to_string(), - asset_id: "123".to_string(), - client_key: "test_client_key".to_string(), - }], + ingestion_configs: ingestion_configs, next_page_token: "".to_string(), })) } async fn create_ingestion_config_flows( &self, - _: Request, + request: Request, ) -> Result, Status> { + let create_flows = request.into_inner(); + + let existing_flows = self.existing_flows.lock().unwrap(); + for flow in create_flows.flows.iter() { + if existing_flows.iter().any(|f| f.name == flow.name) { + return Err(Status::already_exists("flow already exists")); + } + } + Ok(Response::new(CreateIngestionConfigFlowsResponse {})) } async fn list_ingestion_config_flows( &self, - _: Request, + request: Request, ) -> Result, Status> { + let list_flows: ListIngestionConfigFlowsRequest = request.into_inner(); + let existing_flows = self.existing_flows.lock().unwrap(); + + // If the filter contains "already_exists_flow" or is empty, return the existing flow. + let mut flows = Vec::new(); + for flow in existing_flows.iter() { + if list_flows.filter.is_empty() || list_flows.filter.contains(&flow.name) { + flows.push(flow.clone()); + } + } + Ok(Response::new(ListIngestionConfigFlowsResponse { - flows: vec![], + flows, next_page_token: "".to_string(), })) } @@ -112,27 +185,17 @@ pub(crate) struct MockAssetService; impl AssetService for MockAssetService { async fn get_asset( &self, - _: Request, + request: Request, ) -> Result, Status> { + let get_asset = request.into_inner(); + let asset_id = get_asset.asset_id; + Ok(Response::new(GetAssetResponse { asset: Some(Asset { - asset_id: "123".to_string(), - name: "test_asset".to_string(), + asset_id: asset_id.clone(), + name: asset_id.clone(), organization_id: "test".to_string(), - created_by_user_id: "test".to_string(), - modified_by_user_id: "test".to_string(), - created_date: Some(Timestamp { - seconds: 1, - nanos: 0, - }), - modified_date: Some(Timestamp { - seconds: 1, - nanos: 0, - }), - tags: vec!["test".to_string()], - metadata: vec![], - archived_date: None, - is_archived: false, + ..Default::default() }), })) } @@ -321,7 +384,7 @@ pub(crate) async fn create_mock_grpc_channel_with_service() -> (SiftChannel, Moc .add_service(IngestServiceServer::new(mock_ingest_service_clone)) .add_service(PingServiceServer::new(MockPingService)) .add_service(IngestionConfigServiceServer::new( - MockIngestionConfigService, + MockIngestionConfigService::default(), )) .add_service(AssetServiceServer::new(MockAssetService)) .add_service(RunServiceServer::new(MockRunService))