Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions rust/crates/sift_cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ indicatif = { workspace = true }
parquet = { workspace = true }
pbjson-types = { workspace = true }
reqwest = { workspace = true }
serde_json = { workspace = true }
sift_pbfs = { workspace = true }
tdms = { workspace = true }
hdf5 = { workspace = true }
Expand Down
35 changes: 6 additions & 29 deletions rust/crates/sift_cli/src/cmd/import/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use anyhow::{Context as AnyhowContext, Result, anyhow};
use chrono::DateTime;
use crossterm::style::Stylize;
use pbjson_types::Timestamp;
use reqwest::header::{CONTENT_ENCODING, CONTENT_TYPE};
use sift_rs::{
common::r#type::v1::{ChannelConfig, ChannelDataType},
data_imports::v2::{
Expand All @@ -25,15 +24,12 @@ use crate::{
Context,
import::utils::{try_parse_bit_field_config, try_parse_enum_config},
},
util::{
api::{create_grpc_channel, create_rest_client},
tty::Output,
},
util::{api::create_grpc_channel, tty::Output},
};

use super::{
preview_import_config,
utils::{gzip_file, validate_time_format},
utils::{upload_gzipped_file, validate_time_format},
wait_for_job_completion,
};

Expand Down Expand Up @@ -75,29 +71,10 @@ pub async fn run(ctx: Context, args: ImportCsvArgs) -> Result<ExitCode> {
.into_inner();

csv_file.rewind()?;
let compressed_data = gzip_file(csv_file)?;

let rest_client = create_rest_client(&ctx)?;
let res = rest_client
.post(upload_url)
.header(CONTENT_ENCODING, "gzip")
.header(CONTENT_TYPE, "text/csv")
.body(compressed_data)
.send()
let job_id = upload_gzipped_file(&ctx, &upload_url, csv_file, "text/csv")
.await
.context("failed to upload CSV file")?;

if !res.status().is_success() {
let status = res.status();
let text = res
.text()
.await
.unwrap_or_else(|_| "<failed to read body>".into());
return Err(anyhow!(
"failed to upload CSV with http status {status}: {text}"
));
}

let location = args.run.as_ref().map_or_else(
|| format!("asset '{}'", args.asset.cyan()),
|r| format!("run '{}'", r.clone().cyan()),
Expand All @@ -113,7 +90,7 @@ pub async fn run(ctx: Context, args: ImportCsvArgs) -> Result<ExitCode> {

return Ok(ExitCode::SUCCESS);
}
wait_for_job_completion(grpc_channel, location).await
wait_for_job_completion(grpc_channel, job_id, location).await
}

fn create_data_import_request<R: io::Read>(
Expand Down Expand Up @@ -300,14 +277,14 @@ fn create_data_import_request<R: io::Read>(
let mut enum_configs = Vec::new();
let mut bit_field_configs = Vec::new();

if data_type == ChannelDataType::Enum.into() {
if data_type == i32::from(ChannelDataType::Enum) {
let Some(configs) = enum_configs_iter.next() else {
return Err(anyhow!(
"'{name}' was declared as type enum but --enum-config was not specified"
));
};
enum_configs = configs;
} else if data_type == ChannelDataType::BitField.into() {
} else if data_type == i32::from(ChannelDataType::BitField) {
let Some(configs) = bit_field_configs_iter.next() else {
return Err(anyhow!(
"'{name}' was declared as type bit-field but --bit-field-config was not specified"
Expand Down
4 changes: 2 additions & 2 deletions rust/crates/sift_cli/src/cmd/import/hdf5/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub async fn run(ctx: Context, args: ImportHdf5Args) -> Result<ExitCode> {
.context("error creating data import for hdf5")?
.into_inner();

upload_gzipped_file(&ctx, &upload_url, file, "application/x-hdf5")
let job_id = upload_gzipped_file(&ctx, &upload_url, file, "application/x-hdf5")
.await
.context("failed to upload hdf5 file")?;

Expand All @@ -99,7 +99,7 @@ pub async fn run(ctx: Context, args: ImportHdf5Args) -> Result<ExitCode> {
return Ok(ExitCode::SUCCESS);
}

wait_for_job_completion(grpc_channel, location).await
wait_for_job_completion(grpc_channel, job_id, location).await
}

pub fn build_hdf5_config(args: &ImportHdf5Args) -> Result<Hdf5Config> {
Expand Down
15 changes: 4 additions & 11 deletions rust/crates/sift_cli/src/cmd/import/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,9 @@ use crossterm::style::Stylize;
use std::{process::ExitCode, time::Duration};
use tokio::time::sleep;

use sift_rs::{
SiftChannel,
common::r#type::v1::ChannelConfig,
jobs::v1::{JobStatus, JobType},
};
use sift_rs::{SiftChannel, common::r#type::v1::ChannelConfig, jobs::v1::JobStatus};

use crate::util::{job::JobServiceWrapper, progress::Spinner, tty::Output, user::get_user_id};
use crate::util::{job::JobServiceWrapper, progress::Spinner, tty::Output};

pub mod backup;
pub mod csv;
Expand All @@ -25,18 +21,15 @@ const INDENT_4: &str = " ";

pub async fn wait_for_job_completion(
grpc_channel: SiftChannel,
job_id: String,
import_output_location: String,
) -> Result<ExitCode> {
let spinner = Spinner::new();
spinner.set_message(format!("{} file for processing", "Uploaded".green()));

let user_id = get_user_id(grpc_channel.clone()).await?;
let mut job_service = JobServiceWrapper::new(grpc_channel.clone());

let Some(mut job) = job_service
.get_latest_job_for_user(&user_id, JobType::DataImport)
.await?
else {
let Some(mut job) = job_service.get_job(&job_id).await? else {
spinner.finish_and_clear();

Output::new()
Expand Down
31 changes: 4 additions & 27 deletions rust/crates/sift_cli/src/cmd/import/parquet/flat_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::{collections::HashMap, fs::File, io::Seek, process::ExitCode};

use anyhow::{Context as AnyhowContext, Result, anyhow};
use crossterm::style::Stylize;
use reqwest::header::{CONTENT_ENCODING, CONTENT_TYPE};
use sift_rs::{
common::r#type::v1::{ChannelConfig, ChannelDataType},
data_imports::v2::{
Expand All @@ -20,14 +19,11 @@ use crate::{
import::{
parquet::FooterMetadata,
preview_import_config,
utils::{gzip_file, try_parse_bit_field_config, try_parse_enum_config},
utils::{try_parse_bit_field_config, try_parse_enum_config, upload_gzipped_file},
wait_for_job_completion,
},
},
util::{
api::{create_grpc_channel, create_rest_client},
tty::Output,
},
util::{api::create_grpc_channel, tty::Output},
};

pub async fn run(ctx: Context, args: FlatDatasetArgs) -> Result<ExitCode> {
Expand Down Expand Up @@ -78,29 +74,10 @@ pub async fn run(ctx: Context, args: FlatDatasetArgs) -> Result<ExitCode> {
.into_inner();

file.rewind()?;
let compressed_data = gzip_file(file)?;

let rest_client = create_rest_client(&ctx)?;
let res = rest_client
.post(upload_url)
.header(CONTENT_ENCODING, "gzip")
.header(CONTENT_TYPE, "application/vnd.apache.parquet")
.body(compressed_data)
.send()
let job_id = upload_gzipped_file(&ctx, &upload_url, file, "application/vnd.apache.parquet")
.await
.context("failed to upload Parquet file")?;

if !res.status().is_success() {
let status = res.status();
let text = res
.text()
.await
.unwrap_or_else(|_| "<failed to read body>".into());
return Err(anyhow!(
"failed to upload Parquet with http status {status}: {text}"
));
}

let location = args.run.as_ref().map_or_else(
|| format!("asset '{}'", args.asset.cyan()),
|r| format!("run '{}'", r.clone().cyan()),
Expand All @@ -116,7 +93,7 @@ pub async fn run(ctx: Context, args: FlatDatasetArgs) -> Result<ExitCode> {

return Ok(ExitCode::SUCCESS);
}
wait_for_job_completion(grpc_channel, location).await
wait_for_job_completion(grpc_channel, job_id, location).await
}

fn update_config_with_overrides(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub async fn run(ctx: Context, args: ImportTdmsArgs) -> Result<ExitCode> {
.context("error creating data import for tdms")?
.into_inner();

upload_gzipped_file(&ctx, &upload_url, file, "application/octet-stream")
let job_id = upload_gzipped_file(&ctx, &upload_url, file, "application/octet-stream")
.await
.context("failed to upload tdms file")?;

Expand All @@ -89,7 +89,7 @@ pub async fn run(ctx: Context, args: ImportTdmsArgs) -> Result<ExitCode> {
return Ok(ExitCode::SUCCESS);
}

wait_for_job_completion(grpc_channel, location).await
wait_for_job_completion(grpc_channel, job_id, location).await
}

pub fn build_tdms_config(args: &ImportTdmsArgs) -> Result<TdmsConfig> {
Expand Down
22 changes: 19 additions & 3 deletions rust/crates/sift_cli/src/cmd/import/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,14 @@ use sift_rs::common::r#type::v1::{ChannelBitFieldElement, ChannelEnumType};
use crate::{cli::time::TimeFormat, cmd::Context as CmdContext, util::api::create_rest_client};

/// Gzip and upload a file to a pre-signed upload URL with the given content type.
/// Reads from the file's current cursor position.
/// Reads from the file's current cursor position. Returns the job ID from the
/// upload response.
pub async fn upload_gzipped_file(
ctx: &CmdContext,
upload_url: &str,
file: File,
content_type: &str,
) -> Result<()> {
) -> Result<String> {
let compressed_data = gzip_file(file)?;
let rest_client = create_rest_client(ctx).context("failed to create rest client")?;

Expand All @@ -38,7 +39,22 @@ pub async fn upload_gzipped_file(
.unwrap_or_else(|_| "<failed to read body>".into());
return Err(anyhow!("upload failed with http status {status}: {text}"));
}
Ok(())
extract_job_id(res).await
}

/// Parses the `jobId` from a successful upload response.
async fn extract_job_id(res: reqwest::Response) -> Result<String> {
let body_text = res
.text()
.await
.context("failed to read upload response body")?;
let body: serde_json::Value =
serde_json::from_str(&body_text).context("failed to parse upload response as JSON")?;
body.get("jobId")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
.map(String::from)
.ok_or_else(|| anyhow!("upload response did not include jobId: {body_text}"))
}

/// Be sure that the file's cursor is rewinded to the start before hand.
Expand Down
23 changes: 1 addition & 22 deletions rust/crates/sift_cli/src/util/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::ops::{Deref, DerefMut};
use anyhow::{Context, Result};
use sift_rs::{
SiftChannel,
jobs::v1::{Job, JobType, ListJobsRequest, job_service_client::JobServiceClient},
jobs::v1::{Job, ListJobsRequest, job_service_client::JobServiceClient},
};

pub struct JobServiceWrapper(JobServiceClient<SiftChannel>);
Expand All @@ -28,27 +28,6 @@ impl JobServiceWrapper {
JobServiceWrapper(job_service)
}

pub async fn get_latest_job_for_user(
&mut self,
user_id: &str,
job_type: JobType,
) -> Result<Option<Job>> {
let jt = job_type.as_str_name();

let res = self
.list_jobs(ListJobsRequest {
page_size: 1,
filter: format!("job_type == '{jt}' && created_by_user_id == '{user_id}'"),
order_by: "created_date desc".into(),
..Default::default()
})
.await
.context("failed to retrieve latest user job")?
.into_inner();

Ok(res.jobs.first().cloned())
}

pub async fn get_job(&mut self, job_id: &str) -> Result<Option<Job>> {
let res = self
.list_jobs(ListJobsRequest {
Expand Down
1 change: 0 additions & 1 deletion rust/crates/sift_cli/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,3 @@ pub mod channel;
pub mod job;
pub mod progress;
pub mod tty;
pub mod user;
15 changes: 0 additions & 15 deletions rust/crates/sift_cli/src/util/user.rs

This file was deleted.

Loading