From d8aa081196e74564cd78e650d096385ffa9ad62a Mon Sep 17 00:00:00 2001 From: gsilvestrin Date: Fri, 21 Apr 2023 10:28:08 -0700 Subject: [PATCH 1/3] bugfix for dataset overwrite method Closes #792 --- rust/src/dataset.rs | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/rust/src/dataset.rs b/rust/src/dataset.rs index 7b209449c2e..0251a58b94a 100644 --- a/rust/src/dataset.rs +++ b/rust/src/dataset.rs @@ -194,7 +194,7 @@ impl Dataset { } // append + dataset doesn't already exists = warn + switch to create mode - if !flag_dataset_exists && matches!(params.mode, WriteMode::Append) { + if !flag_dataset_exists && matches!(params.mode, WriteMode::Append) || matches!(params.mode, WriteMode::Overwrite) { eprintln!("Warning: No existing dataset at {uri}, it will be created"); params = WriteParams { mode: WriteMode::Create, @@ -713,10 +713,7 @@ mod tests { use futures::stream::TryStreamExt; use tempfile::tempdir; - #[tokio::test] - async fn create_dataset() { - let test_dir = tempdir().unwrap(); - + async fn create_file(path: &std::path::Path, mode: WriteMode) { let schema = Arc::new(ArrowSchema::new(vec![ Field::new("i", DataType::Int32, false), Field::new( @@ -738,21 +735,22 @@ mod tests { &UInt16Array::from_iter_values((0_u16..20_u16).map(|v| v % 5)), &dict_values, ) - .unwrap(), + .unwrap(), ), ], ) - .unwrap() + .unwrap() }) .collect(), ); let expected_batches = batches.batches.clone(); - let test_uri = test_dir.path().to_str().unwrap(); - + let test_uri = path.to_str().unwrap(); let mut write_params = WriteParams::default(); write_params.max_rows_per_file = 40; write_params.max_rows_per_group = 10; + write_params.mode = mode; + println!("{:?}", write_params); let mut reader: Box = Box::new(batches); Dataset::write(&mut reader, test_uri, Some(write_params)) .await @@ -793,6 +791,15 @@ mod tests { ) } + #[tokio::test] + async fn test_create_dataset() { + // Appending / Overwriting a dataset that does not exist is treated as Create + for mode in [WriteMode::Create, WriteMode::Append, Overwrite] { + let test_dir = tempdir().unwrap(); + create_file(test_dir.path(), mode).await + } + } + #[tokio::test] async fn append_dataset() { let test_dir = tempdir().unwrap(); @@ -812,6 +819,7 @@ mod tests { let mut write_params = WriteParams::default(); write_params.max_rows_per_file = 40; write_params.max_rows_per_group = 10; + write_params.mode = WriteMode::Append; let mut batches: Box = Box::new(batches); Dataset::write(&mut batches, test_uri, Some(write_params)) .await From e56257369cfa81f68354211601a789ec3bc83b62 Mon Sep 17 00:00:00 2001 From: gsilvestrin Date: Fri, 21 Apr 2023 10:28:46 -0700 Subject: [PATCH 2/3] cargo fmt --- rust/src/dataset.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/rust/src/dataset.rs b/rust/src/dataset.rs index 0251a58b94a..5c4a4dcecde 100644 --- a/rust/src/dataset.rs +++ b/rust/src/dataset.rs @@ -194,7 +194,9 @@ impl Dataset { } // append + dataset doesn't already exists = warn + switch to create mode - if !flag_dataset_exists && matches!(params.mode, WriteMode::Append) || matches!(params.mode, WriteMode::Overwrite) { + if !flag_dataset_exists && matches!(params.mode, WriteMode::Append) + || matches!(params.mode, WriteMode::Overwrite) + { eprintln!("Warning: No existing dataset at {uri}, it will be created"); params = WriteParams { mode: WriteMode::Create, @@ -735,11 +737,11 @@ mod tests { &UInt16Array::from_iter_values((0_u16..20_u16).map(|v| v % 5)), &dict_values, ) - .unwrap(), + .unwrap(), ), ], ) - .unwrap() + .unwrap() }) .collect(), ); From 3b779be592090d2022b54e8adec0d12cc38ca561 Mon Sep 17 00:00:00 2001 From: gsilvestrin Date: Fri, 21 Apr 2023 10:38:05 -0700 Subject: [PATCH 3/3] cleanup --- rust/src/dataset.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/rust/src/dataset.rs b/rust/src/dataset.rs index 5c4a4dcecde..6274d0ba1a9 100644 --- a/rust/src/dataset.rs +++ b/rust/src/dataset.rs @@ -194,8 +194,9 @@ impl Dataset { } // append + dataset doesn't already exists = warn + switch to create mode - if !flag_dataset_exists && matches!(params.mode, WriteMode::Append) - || matches!(params.mode, WriteMode::Overwrite) + if !flag_dataset_exists + && (matches!(params.mode, WriteMode::Append) + || matches!(params.mode, WriteMode::Overwrite)) { eprintln!("Warning: No existing dataset at {uri}, it will be created"); params = WriteParams { @@ -752,7 +753,6 @@ mod tests { write_params.max_rows_per_file = 40; write_params.max_rows_per_group = 10; write_params.mode = mode; - println!("{:?}", write_params); let mut reader: Box = Box::new(batches); Dataset::write(&mut reader, test_uri, Some(write_params)) .await @@ -821,7 +821,6 @@ mod tests { let mut write_params = WriteParams::default(); write_params.max_rows_per_file = 40; write_params.max_rows_per_group = 10; - write_params.mode = WriteMode::Append; let mut batches: Box = Box::new(batches); Dataset::write(&mut batches, test_uri, Some(write_params)) .await