Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 8 additions & 11 deletions rust/crates/sift_rs/src/wrappers/ingestion_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,9 @@ pub trait IngestionConfigServiceWrapper:

/// Create [FlowConfig]s for a given ingestion config. If this function does not return an
/// error, then it is safe to assume that all [FlowConfig]s in `configs` was created.
async fn try_create_flows(
&mut self,
ingestion_config_id: &str,
configs: &[FlowConfig],
) -> Result<()>;
async fn try_create_flows<I>(&mut self, ingestion_config_id: &str, configs: I) -> Result<()>
where
I: Into<Vec<FlowConfig>> + Send;

/// Retrieve all flows that satisfy the provided filter.
async fn try_filter_flows(
Expand Down Expand Up @@ -121,15 +119,14 @@ impl IngestionConfigServiceWrapper for IngestionConfigServiceImpl {

/// Create [FlowConfig]s for a given ingestion config. If this function does not return an
/// error, then it is safe to assume that all [FlowConfig]s in `configs` was created.
async fn try_create_flows(
&mut self,
ingestion_config_id: &str,
configs: &[FlowConfig],
) -> Result<()> {
async fn try_create_flows<I>(&mut self, ingestion_config_id: &str, configs: I) -> Result<()>
where
I: Into<Vec<FlowConfig>> + Send,
{
let _ = self
.create_ingestion_config_flows(CreateIngestionConfigFlowsRequest {
ingestion_config_id: ingestion_config_id.to_string(),
flows: configs.to_vec(),
flows: configs.into(),
})
.await
.map_err(|e| Error::new(ErrorKind::CreateFlowError, e))?;
Expand Down
15 changes: 14 additions & 1 deletion rust/crates/sift_stream/src/stream/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,16 @@ impl SiftStreamBuilder<IngestionConfigMode> {
.try_filter_flows(&ingestion_config.ingestion_config_id, &filter)
.await?;

// If no flows are provided, use the existing flows in Sift to populate the local flow cache.
if flows.is_empty() {
#[cfg(feature = "tracing")]
tracing::info!(
ingestion_config_id = ingestion_config.ingestion_config_id,
"no flows provided, using existing flows in Sift to populate the local flow cache"
);
return Ok((ingestion_config, existing_flows, asset));
}

let mut flows_to_create: Vec<FlowConfig> = Vec::new();

for flow in &flows {
Expand All @@ -565,7 +575,10 @@ impl SiftStreamBuilder<IngestionConfigMode> {

if !flows_to_create.is_empty() {
let _ = ingestion_config_service
.try_create_flows(&ingestion_config.ingestion_config_id, &flows_to_create)
.try_create_flows(
&ingestion_config.ingestion_config_id,
flows_to_create.as_slice(),
)
.await;

#[cfg(feature = "tracing")]
Expand Down
31 changes: 28 additions & 3 deletions rust/crates/sift_stream/src/stream/mode/ingestion_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,17 +298,42 @@ impl SiftStream<IngestionConfigMode> {
/// Modify the existing ingestion config by adding new flows that weren't accounted for during
/// initialization.
pub async fn add_new_flows(&mut self, flow_configs: &[FlowConfig]) -> Result<()> {
// Filter out flows that already exist.
let filtered = flow_configs
.iter()
.filter(|f| !self.mode.flows_by_name.contains_key(&f.name))
.collect::<Vec<_>>();

// If no new flows are provided, return early.
if filtered.is_empty() {
return Ok(());
}

#[cfg(feature = "tracing")]
tracing::info!(
ingestion_config_id = self.mode.ingestion_config.ingestion_config_id,
new_flows = filtered
.iter()
.map(|f| f.name.as_str())
.collect::<Vec<&str>>()
.join(","),
"adding new flows to ingestion config"
);

new_ingestion_config_service(self.grpc_channel.clone())
.try_create_flows(
&self.mode.ingestion_config.ingestion_config_id,
flow_configs,
filtered
.iter()
.map(|f| (*f).clone())
.collect::<Vec<FlowConfig>>(),
)
.await
.context("SiftStream::add_new_flows")?;

self.metrics.loaded_flows.add(flow_configs.len() as u64);
self.metrics.loaded_flows.add(filtered.len() as u64);

for flow_config in flow_configs {
for flow_config in filtered {
self.mode
.flows_by_name
.entry(flow_config.name.clone())
Expand Down
171 changes: 170 additions & 1 deletion rust/crates/sift_stream/src/stream/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use crate::backup::DiskBackupPolicy;
use crate::{
ChannelValue, Flow, IngestionConfigForm, RecoveryStrategy, RunForm, SiftStreamBuilder,
};
use sift_rs::common::r#type::v1::ChannelDataType;
use sift_rs::ingestion_configs::v2::{ChannelConfig, FlowConfig};
use tempdir::TempDir;
use tracing_test::traced_test;

Expand Down Expand Up @@ -44,6 +46,7 @@ async fn test_sift_stream_builder_backup_manager_directory_naming_with_run() {
retry_policy,
disk_backup_policy,
})
.metrics_streaming_interval(None)
.build()
.await
.expect("failed to build sift stream");
Expand Down Expand Up @@ -72,7 +75,7 @@ async fn test_sift_stream_builder_backup_manager_directory_naming_with_run() {
let test_dir = fs::read_dir(tmp_dir_path)
.expect("failed to read backups directory")
.collect::<Vec<_>>();
assert_eq!(test_dir.len(), 1);
assert_eq!(test_dir.len(), 1, "{:?}", test_dir);

// The first subdirectory should be the asset name.
let asset_dir = test_dir[0].as_ref().expect("failed to get file");
Expand Down Expand Up @@ -122,6 +125,7 @@ async fn test_sift_stream_builder_backup_manager_directory_naming_no_run() {
retry_policy,
disk_backup_policy,
})
.metrics_streaming_interval(None)
.build()
.await
.expect("failed to build sift stream");
Expand Down Expand Up @@ -230,3 +234,168 @@ async fn test_sift_stream_drop_without_finish() {
.await
.expect("timeout waiting for tasks to shutdown");
}

#[tokio::test]
async fn test_sift_stream_builder_load_ingestion_config_with_no_flows() {
let backups_dir = uuid::Uuid::new_v4().to_string();

let tmp_dir = TempDir::new(&backups_dir).expect("failed to creat tempdir");
let tmp_dir_path = tmp_dir.path();

let ingestion_config = IngestionConfigForm {
asset_name: "already_exists_asset".to_string(),
client_key: "already_exists_client_key".to_string(),
flows: vec![],
};
let disk_backup_policy = DiskBackupPolicy {
backups_dir: Some(tmp_dir_path.to_path_buf()),
retain_backups: true,
..Default::default()
};
let retry_policy = crate::RetryPolicy::default();
let (grpc_channel, _mock_service) = crate::test::create_mock_grpc_channel_with_service().await;

let mut sift_stream = SiftStreamBuilder::from_channel(grpc_channel)
.ingestion_config(ingestion_config)
.recovery_strategy(RecoveryStrategy::RetryWithBackups {
retry_policy,
disk_backup_policy,
})
.build()
.await
.expect("failed to build sift stream");

// The mock sift server should have returned 1 flow.
let flows = sift_stream.get_flows();
assert_eq!(flows.len(), 1);

let existing_flow = FlowConfig {
name: "already_exists_flow".to_string(),
channels: vec![ChannelConfig {
name: "channel1".to_string(),
data_type: ChannelDataType::Double.into(),
..Default::default()
}],
};

// Add the existing flow again to ensure it is not added again.
assert!(sift_stream.add_new_flows(&[existing_flow]).await.is_ok());
let flows = sift_stream.get_flows();
assert_eq!(flows.len(), 1);

sift_stream
.finish()
.await
.expect("failed to finish sift stream");
}

#[tokio::test]
async fn test_sift_stream_builder_load_ingestion_config_with_flows() {
let backups_dir = uuid::Uuid::new_v4().to_string();

let tmp_dir = TempDir::new(&backups_dir).expect("failed to creat tempdir");
let tmp_dir_path = tmp_dir.path();

let existing_flow = FlowConfig {
name: "already_exists_flow".to_string(),
channels: vec![ChannelConfig {
name: "channel1".to_string(),
data_type: ChannelDataType::Double.into(),
..Default::default()
}],
};

let ingestion_config = IngestionConfigForm {
asset_name: "test_asset".to_string(),
client_key: "test_client_key".to_string(),
flows: vec![existing_flow.clone()],
};
let disk_backup_policy = DiskBackupPolicy {
backups_dir: Some(tmp_dir_path.to_path_buf()),
retain_backups: true,
..Default::default()
};
let retry_policy = crate::RetryPolicy::default();
let (grpc_channel, _mock_service) = crate::test::create_mock_grpc_channel_with_service().await;

let mut sift_stream = SiftStreamBuilder::from_channel(grpc_channel)
.ingestion_config(ingestion_config)
.recovery_strategy(RecoveryStrategy::RetryWithBackups {
retry_policy,
disk_backup_policy,
})
.build()
.await
.expect("failed to build sift stream");

// The mock sift server should have returned 1 flow.
let flows = sift_stream.get_flows();
assert_eq!(flows.len(), 1);

// Add the existing flow again to ensure it is not added again.
assert!(sift_stream.add_new_flows(&[existing_flow]).await.is_ok());
let flows = sift_stream.get_flows();
assert_eq!(flows.len(), 1);
}

#[tokio::test]
async fn test_sift_stream_builder_load_ingestion_config_with_new_flows() {
let backups_dir = uuid::Uuid::new_v4().to_string();

let tmp_dir = TempDir::new(&backups_dir).expect("failed to creat tempdir");
let tmp_dir_path = tmp_dir.path();

let new_flow = FlowConfig {
name: "new_flow".to_string(),
channels: vec![ChannelConfig {
name: "channel-new".to_string(),
data_type: ChannelDataType::Uint32.into(),
..Default::default()
}],
};

let ingestion_config = IngestionConfigForm {
asset_name: "test_asset".to_string(),
client_key: "test_client_key".to_string(),
flows: vec![new_flow.clone()],
};
let disk_backup_policy = DiskBackupPolicy {
backups_dir: Some(tmp_dir_path.to_path_buf()),
retain_backups: true,
..Default::default()
};
let retry_policy = crate::RetryPolicy::default();
let (grpc_channel, _mock_service) = crate::test::create_mock_grpc_channel_with_service().await;

let mut sift_stream = SiftStreamBuilder::from_channel(grpc_channel)
.ingestion_config(ingestion_config)
.recovery_strategy(RecoveryStrategy::RetryWithBackups {
retry_policy,
disk_backup_policy,
})
.build()
.await
.expect("failed to build sift stream");

// The mock sift server should have returned 1 flow.
let flows = sift_stream.get_flows();
assert_eq!(flows.len(), 1);

// Add the existing flow again to ensure it is not added again.
assert!(sift_stream.add_new_flows(&[new_flow]).await.is_ok());
let flows = sift_stream.get_flows();
assert_eq!(flows.len(), 1);

// Add another new flow to ensure it is added.
let new_flow2 = FlowConfig {
name: "new_flow2".to_string(),
channels: vec![ChannelConfig {
name: "channel-new2".to_string(),
data_type: ChannelDataType::Uint32.into(),
..Default::default()
}],
};
assert!(sift_stream.add_new_flows(&[new_flow2]).await.is_ok());
let flows = sift_stream.get_flows();
assert_eq!(flows.len(), 2);
}
Loading
Loading