Skip to content

Commit 666ad02

Browse files
authored
rust(feature): support calculated channels in cli (#501)
1 parent 400b828 commit 666ad02

File tree

5 files changed

+350
-66
lines changed

5 files changed

+350
-66
lines changed

rust/crates/sift_cli/src/cli/mod.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,18 @@ pub struct ExportArgs {
108108
#[arg(long)]
109109
pub channel_id: Vec<String>,
110110

111+
/// Regular expression used to filter calculated channels to include in the export
112+
#[arg(long)]
113+
pub calculated_channel_regex: Option<String>,
114+
115+
/// Name of calculated channel to include in the export; can be specified multiple times
116+
#[arg(long)]
117+
pub calculated_channel: Vec<String>,
118+
119+
/// ID of calculated channel to include in the export; can be specified multiple times
120+
#[arg(long)]
121+
pub calculated_channel_id: Vec<String>,
122+
111123
/// Start time in RFC 3339 format (required for asset exports)
112124
#[arg(long)]
113125
pub start: Option<String>,

rust/crates/sift_cli/src/cmd/export.rs

Lines changed: 48 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,12 @@ use zip::ZipArchive;
3030
use crate::{
3131
cli::{ExportAssetArgs, ExportRunArgs},
3232
util::{
33-
api::create_grpc_channel, channel::filter_channels, job::JobServiceWrapper,
34-
progress::Spinner, tty::Output,
33+
api::create_grpc_channel,
34+
calculated_channel::{ResolveScope, resolve_calculated_channels},
35+
channel::resolve_channel_ids,
36+
job::JobServiceWrapper,
37+
progress::Spinner,
38+
tty::Output,
3539
},
3640
};
3741

@@ -85,40 +89,25 @@ pub async fn run(ctx: Context, args: ExportRunArgs) -> Result<ExitCode> {
8589
run.name.clone().yellow()
8690
));
8791
}
88-
let asset_ids_cel = run
89-
.asset_ids
90-
.iter()
91-
.map(|a| format!("'{a}'"))
92-
.collect::<Vec<String>>()
93-
.join(",");
94-
95-
let mut channel_ids = args.common.channel_id;
96-
97-
if !args.common.channel.is_empty() {
98-
let channel_names_cel = args
99-
.common
100-
.channel
101-
.iter()
102-
.map(|c| format!("'{c}'"))
103-
.collect::<Vec<String>>()
104-
.join(",");
105-
106-
let filter = format!("asset_id in [{asset_ids_cel}] && name in [{channel_names_cel}]");
107-
let query_res = filter_channels(grpc_channel.clone(), &filter).await?;
108-
109-
for channel in query_res {
110-
channel_ids.push(channel.channel_id);
111-
}
112-
}
113-
114-
if let Some(re) = args.common.channel_regex {
115-
let filter = format!("asset_id in [{asset_ids_cel}] && name.matches(\"{re}\")");
116-
let query_res = filter_channels(grpc_channel.clone(), &filter).await?;
117-
118-
for channel in query_res {
119-
channel_ids.push(channel.channel_id);
120-
}
121-
}
92+
let channel_ids = resolve_channel_ids(
93+
grpc_channel.clone(),
94+
&args.common.channel,
95+
args.common.channel_regex.as_deref(),
96+
args.common.channel_id,
97+
&run.asset_ids,
98+
)
99+
.await?;
100+
101+
let scope = ResolveScope::Run(&run.run_id);
102+
let calculated_channel_configs = resolve_calculated_channels(
103+
grpc_channel.clone(),
104+
&args.common.calculated_channel,
105+
args.common.calculated_channel_regex.as_deref(),
106+
&args.common.calculated_channel_id,
107+
&run.asset_ids,
108+
&scope,
109+
)
110+
.await?;
122111

123112
let start_time = args
124113
.common
@@ -144,6 +133,7 @@ pub async fn run(ctx: Context, args: ExportRunArgs) -> Result<ExitCode> {
144133

145134
let export_req = ExportDataRequest {
146135
channel_ids,
136+
calculated_channel_configs,
147137
output_format: ExportOutputFormat::from(args.common.format).into(),
148138
time_selection: Some(TimeSelection::RunsAndTimeRange(RunsAndTimeRange {
149139
start_time,
@@ -194,44 +184,37 @@ pub async fn asset(ctx: Context, args: ExportAssetArgs) -> Result<ExitCode> {
194184
.into_inner();
195185

196186
if assets.is_empty() {
197-
return Err(anyhow!("no run found"));
187+
return Err(anyhow!("no asset found"));
198188
}
199189
let asset = assets.first().unwrap();
200190
let asset_id = &asset.asset_id;
201191

202-
let mut channel_ids = args.common.channel_id;
203-
204-
if !args.common.channel.is_empty() {
205-
let channel_names_cel = args
206-
.common
207-
.channel
208-
.iter()
209-
.map(|c| format!("'{c}'"))
210-
.collect::<Vec<String>>()
211-
.join(",");
212-
213-
let filter = format!("asset_id == '{asset_id}' && name in [{channel_names_cel}]");
214-
let query_res = filter_channels(grpc_channel.clone(), &filter).await?;
215-
216-
for channel in query_res {
217-
channel_ids.push(channel.channel_id);
218-
}
219-
}
220-
221-
if let Some(re) = args.common.channel_regex {
222-
let filter = format!("asset_id == '{asset_id}' && name.matches(\"{re}\")");
223-
let query_res = filter_channels(grpc_channel.clone(), &filter).await?;
224-
225-
for channel in query_res {
226-
channel_ids.push(channel.channel_id);
227-
}
228-
}
192+
let asset_ids = vec![asset_id.to_string()];
193+
let channel_ids = resolve_channel_ids(
194+
grpc_channel.clone(),
195+
&args.common.channel,
196+
args.common.channel_regex.as_deref(),
197+
args.common.channel_id,
198+
&asset_ids,
199+
)
200+
.await?;
201+
let scope = ResolveScope::Assets(&asset_ids);
202+
let calculated_channel_configs = resolve_calculated_channels(
203+
grpc_channel.clone(),
204+
&args.common.calculated_channel,
205+
args.common.calculated_channel_regex.as_deref(),
206+
&args.common.calculated_channel_id,
207+
&asset_ids,
208+
&scope,
209+
)
210+
.await?;
229211

230212
let export_req = ExportDataRequest {
231213
channel_ids,
214+
calculated_channel_configs,
232215
output_format: ExportOutputFormat::from(args.common.format).into(),
233216
time_selection: Some(TimeSelection::AssetsAndTimeRange(AssetsAndTimeRange {
234-
asset_ids: vec![asset_id.to_string()],
217+
asset_ids,
235218
start_time: Some(start_time),
236219
stop_time: Some(stop_time),
237220
})),

0 commit comments

Comments
 (0)