From 5af8a36df33e8d40b98bf2f70bc7829204d91328 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 3 Jun 2024 13:27:25 -0700 Subject: [PATCH 1/2] Rename use_experimental_writer to use_legacy_format (and invert the logic) --- python/python/lance/dataset.py | 10 +- python/python/lance/fragment.py | 17 +-- python/python/lance/ray/sink.py | 25 ++--- python/python/tests/test_dataset.py | 2 +- python/python/tests/test_fragment.py | 2 +- python/src/dataset.rs | 6 +- rust/lance/src/dataset.rs | 124 ++++++++++----------- rust/lance/src/dataset/fragment.rs | 30 ++--- rust/lance/src/dataset/scanner.rs | 118 +++++++++----------- rust/lance/src/dataset/schema_evolution.rs | 30 ++--- rust/lance/src/dataset/take.rs | 18 ++- rust/lance/src/dataset/updater.rs | 2 +- rust/lance/src/dataset/write.rs | 54 +++++---- rust/lance/src/session/index_extension.rs | 8 +- rust/lance/src/utils/test.rs | 22 ++-- 15 files changed, 217 insertions(+), 251 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 63ceb147f3..c8e8bd9a99 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -2385,7 +2385,7 @@ def write_dataset( commit_lock: Optional[CommitLock] = None, progress: Optional[FragmentWriteProgress] = None, storage_options: Optional[Dict[str, str]] = None, - use_experimental_writer: bool = False, + use_legacy_format: bool = True, ) -> LanceDataset: """Write a given data_obj to the given uri @@ -2425,9 +2425,9 @@ def write_dataset( storage_options : optional, dict Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc. - use_experimental_writer : optional, bool - Use the Lance v2 writer to write Lance v2 files. This is not recommended - at this time as there are several known limitations in the v2 writer. + use_legacy_format : optional, bool, default True + Use the Lance v1 writer to write Lance v1 files. The default is currently + True but will change as we roll out the v2 format. """ if _check_for_hugging_face(data_obj): # Huggingface datasets @@ -2449,7 +2449,7 @@ def write_dataset( "max_bytes_per_file": max_bytes_per_file, "progress": progress, "storage_options": storage_options, - "use_experimental_writer": use_experimental_writer, + "use_legacy_format": use_legacy_format, } if commit_lock: diff --git a/python/python/lance/fragment.py b/python/python/lance/fragment.py index 631e5f3d75..fb78cd4d8a 100644 --- a/python/python/lance/fragment.py +++ b/python/python/lance/fragment.py @@ -146,7 +146,7 @@ def create( progress: Optional[FragmentWriteProgress] = None, mode: str = "append", *, - use_experimental_writer=False, + use_legacy_format=True, ) -> FragmentMetadata: """Create a :class:`FragmentMetadata` from the given data. @@ -177,6 +177,9 @@ def create( The write mode. If "append" is specified, the data will be checked against the existing dataset's schema. Otherwise, pass "create" or "overwrite" to assign new field ids to the schema. + use_legacy_format: bool, default True + Use the legacy format to write Lance files. The default is True + while the v2 format is still in beta. See Also -------- @@ -215,7 +218,7 @@ def create( max_rows_per_group=max_rows_per_group, progress=progress, mode=mode, - use_experimental_writer=use_experimental_writer, + use_legacy_format=use_legacy_format, ) return FragmentMetadata(inner_meta.json()) @@ -504,7 +507,7 @@ def write_fragments( max_rows_per_group: int = 1024, max_bytes_per_file: int = DEFAULT_MAX_BYTES_PER_FILE, progress: Optional[FragmentWriteProgress] = None, - use_experimental_writer: bool = False, + use_legacy_format: bool = True, storage_options: Optional[Dict[str, str]] = None, ) -> List[FragmentMetadata]: """ @@ -542,9 +545,9 @@ def write_fragments( *Experimental API*. Progress tracking for writing the fragment. Pass a custom class that defines hooks to be called when each fragment is starting to write and finishing writing. - use_experimental_writer : optional, bool - Use the Lance v2 writer to write Lance v2 files. This is not recommended - at this time as there are several known limitations in the v2 writer. + use_legacy_format : optional, bool, default True + Use the Lance v1 writer to write Lance v1 files. The default is currently + True while the v2 format is in beta. storage_options : Optional[Dict[str, str]] Extra options that make sense for a particular storage connection. This is used to store connection parameters like credentials, endpoint, etc. @@ -578,7 +581,7 @@ def write_fragments( max_rows_per_group=max_rows_per_group, max_bytes_per_file=max_bytes_per_file, progress=progress, - use_experimental_writer=use_experimental_writer, + use_legacy_format=use_legacy_format, storage_options=storage_options, ) return [FragmentMetadata.from_metadata(frag) for frag in fragments] diff --git a/python/python/lance/ray/sink.py b/python/python/lance/ray/sink.py index ce1ab86335..ea6d7eab33 100644 --- a/python/python/lance/ray/sink.py +++ b/python/python/lance/ray/sink.py @@ -54,7 +54,7 @@ def _write_fragment( max_rows_per_file: int = 1024 * 1024, max_bytes_per_file: Optional[int] = None, max_rows_per_group: int = 1024, # Only useful for v1 writer. - use_experimental_writer: bool = False, + use_legacy_format: bool = True, storage_options: Optional[Dict[str, Any]] = None, ) -> Tuple[FragmentMetadata, pa.Schema]: from ..dependencies import _PANDAS_AVAILABLE @@ -88,7 +88,7 @@ def record_batch_converter(): max_rows_per_file=max_rows_per_file, max_rows_per_group=max_rows_per_group, max_bytes_per_file=max_bytes_per_file, - use_experimental_writer=use_experimental_writer, + use_legacy_format=use_legacy_format, storage_options=storage_options, ) return [(fragment, schema) for fragment in fragments] @@ -161,9 +161,8 @@ class LanceDatasink(_BaseLanceDatasink): Choices are 'append', 'create', 'overwrite'. max_rows_per_file : int, optional The maximum number of rows per file. Default is 1024 * 1024. - use_experimental_writer : bool, optional - Set true to use v2 writer. Default is False now. Will be removed once - v2 writer become the default. + use_legacy_format : bool, optional + Set True to use the legacy v1 format. Default is False """ NAME = "Lance" @@ -174,14 +173,14 @@ def __init__( schema: Optional[pa.Schema] = None, mode: Literal["create", "append", "overwrite"] = "create", max_rows_per_file: int = 1024 * 1024, - use_experimental_writer: bool = True, + use_legacy_format: bool = False, *args, **kwargs, ): super().__init__(uri, schema=schema, mode=mode, *args, **kwargs) self.max_rows_per_file = max_rows_per_file - self.use_experimental_writer = use_experimental_writer + self.use_legacy_format = use_legacy_format # if mode is append, read_version is read from existing dataset. self.read_version: int | None = None @@ -206,7 +205,7 @@ def write( self.uri, schema=self.schema, max_rows_per_file=self.max_rows_per_file, - use_experimental_writer=self.use_experimental_writer, + use_legacy_format=self.use_legacy_format, ) return [ (pickle.dumps(fragment), pickle.dumps(schema)) @@ -235,8 +234,8 @@ class LanceFragmentWriter: max_rows_per_group : int, optional The maximum number of rows per group. Default is 1024. Only useful for v1 writer. - use_experimental_writer : bool, optional - Set true to use v2 writer. Default is True. + use_legacy_format : bool, optional + Set True to use the legacy v1 writer. Default is False storage_options : Dict[str, Any], optional The storage options for the writer. Default is None. @@ -251,7 +250,7 @@ def __init__( max_rows_per_file: int = 1024 * 1024, max_bytes_per_file: Optional[int] = None, max_rows_per_group: Optional[int] = None, # Only useful for v1 writer. - use_experimental_writer: bool = True, + use_legacy_format: bool = False, storage_options: Optional[Dict[str, Any]] = None, ): self.uri = uri @@ -261,7 +260,7 @@ def __init__( self.max_rows_per_group = max_rows_per_group self.max_rows_per_file = max_rows_per_file self.max_bytes_per_file = max_bytes_per_file - self.use_experimental_writer = use_experimental_writer + self.use_legacy_format = use_legacy_format self.storage_options = storage_options def __call__(self, batch: Union[pa.Table, "pd.DataFrame"]) -> Dict[str, Any]: @@ -277,7 +276,7 @@ def __call__(self, batch: Union[pa.Table, "pd.DataFrame"]) -> Dict[str, Any]: schema=self.schema, max_rows_per_file=self.max_rows_per_file, max_rows_per_group=self.max_rows_per_group, - use_experimental_writer=self.use_experimental_writer, + use_legacy_format=self.use_legacy_format, storage_options=self.storage_options, ) return pa.Table.from_pydict( diff --git a/python/python/tests/test_dataset.py b/python/python/tests/test_dataset.py index 9b3254a86d..322a4848d1 100644 --- a/python/python/tests/test_dataset.py +++ b/python/python/tests/test_dataset.py @@ -1728,7 +1728,7 @@ def test_migrate_manifest(tmp_path: Path): def test_v2_dataset(tmp_path: Path): table = pa.table({"a": range(100), "b": range(100)}) - dataset = lance.write_dataset(table, tmp_path, use_experimental_writer=True) + dataset = lance.write_dataset(table, tmp_path, use_legacy_format=False) batches = list(dataset.to_batches()) assert len(batches) == 1 assert pa.Table.from_batches(batches) == table diff --git a/python/python/tests/test_fragment.py b/python/python/tests/test_fragment.py index 59a0f44a54..ac8f492cf3 100644 --- a/python/python/tests/test_fragment.py +++ b/python/python/tests/test_fragment.py @@ -261,7 +261,7 @@ def test_fragment_v2(tmp_path): fragments = write_fragments( tab, tmp_path, - use_experimental_writer=True, + use_legacy_format=False, ) assert len(fragments) == 1 ds = lance.dataset(dataset_uri) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index e8e5a6a2ee..b7e75eec04 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -1171,10 +1171,10 @@ pub fn get_write_params(options: &PyDict) -> PyResult> { if let Some(maybe_nbytes) = get_dict_opt::(options, "max_bytes_per_file")? { p.max_bytes_per_file = maybe_nbytes; } - if let Some(use_experimental_writer) = - get_dict_opt::(options, "use_experimental_writer")? + if let Some(use_legacy_format) = + get_dict_opt::(options, "use_legacy_format")? { - p.use_experimental_writer = use_experimental_writer; + p.use_legacy_format = use_legacy_format; } if let Some(progress) = get_dict_opt::(options, "progress")? { p.progress = Arc::new(PyWriteProgress::new(progress.to_object(options.py()))); diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 3ffc9eb6e3..3a274743e6 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -1324,10 +1324,10 @@ mod tests { t } - async fn create_file(path: &std::path::Path, mode: WriteMode, use_experimental_writer: bool) { + async fn create_file(path: &std::path::Path, mode: WriteMode, use_legacy_format: bool) { let mut fields = vec![ArrowField::new("i", DataType::Int32, false)]; // TODO (GH-2347): currently the v2 writer does not support dictionary columns. - if !use_experimental_writer { + if use_legacy_format { fields.push(ArrowField::new( "dict", DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)), @@ -1340,7 +1340,7 @@ mod tests { .map(|i| { let mut arrays = vec![Arc::new(Int32Array::from_iter_values(i * 20..(i + 1) * 20)) as ArrayRef]; - if !use_experimental_writer { + if use_legacy_format { arrays.push(Arc::new( DictionaryArray::try_new( UInt16Array::from_iter_values((0_u16..20_u16).map(|v| v % 5)), @@ -1359,7 +1359,7 @@ mod tests { max_rows_per_file: 40, max_rows_per_group: 10, mode, - use_experimental_writer, + use_legacy_format, ..WriteParams::default() }; let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); @@ -1387,7 +1387,7 @@ mod tests { // The batch size batches the group size. // (the v2 writer has no concept of group size) - if !use_experimental_writer { + if use_legacy_format { for batch in &actual_batches { assert_eq!(batch.num_rows(), 10); } @@ -1417,19 +1417,17 @@ mod tests { #[rstest] #[lance_test_macros::test(tokio::test)] - async fn test_create_dataset(#[values(false, true)] use_experimental_writer: bool) { + async fn test_create_dataset(#[values(false, true)] use_legacy_format: bool) { // Appending / Overwriting a dataset that does not exist is treated as Create for mode in [WriteMode::Create, WriteMode::Append, Overwrite] { let test_dir = tempdir().unwrap(); - create_file(test_dir.path(), mode, use_experimental_writer).await + create_file(test_dir.path(), mode, use_legacy_format).await } } #[rstest] #[lance_test_macros::test(tokio::test)] - async fn test_create_and_fill_empty_dataset( - #[values(false, true)] use_experimental_writer: bool, - ) { + async fn test_create_and_fill_empty_dataset(#[values(false, true)] use_legacy_format: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( @@ -1453,7 +1451,7 @@ mod tests { let mut write_params = WriteParams { max_rows_per_file: 40, max_rows_per_group: 10, - use_experimental_writer, + use_legacy_format, ..Default::default() }; // We should be able to append even if the metadata doesn't exactly match. @@ -1510,7 +1508,7 @@ mod tests { #[rstest] #[lance_test_macros::test(tokio::test)] - async fn test_create_with_empty_iter(#[values(false, true)] use_experimental_writer: bool) { + async fn test_create_with_empty_iter(#[values(false, true)] use_legacy_format: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( @@ -1522,7 +1520,7 @@ mod tests { // check schema of reader and original is same assert_eq!(schema.as_ref(), reader.schema().as_ref()); let write_params = Some(WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }); let result = Dataset::write(reader, test_uri, write_params) @@ -1586,7 +1584,7 @@ mod tests { #[rstest] #[tokio::test] - async fn test_write_params(#[values(false, true)] use_experimental_writer: bool) { + async fn test_write_params(#[values(false, true)] use_legacy_format: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -1607,7 +1605,7 @@ mod tests { let write_params = WriteParams { max_rows_per_file: 100, max_rows_per_group: 10, - use_experimental_writer, + use_legacy_format, ..Default::default() }; let dataset = Dataset::write(batches, test_uri, Some(write_params)) @@ -1623,7 +1621,7 @@ mod tests { assert_eq!(fragment.count_rows().await.unwrap(), 100); let reader = fragment.open(dataset.schema(), false).await.unwrap(); // No group / batch concept in v2 - if !use_experimental_writer { + if use_legacy_format { assert_eq!(reader.legacy_num_batches(), 10); for i in 0..reader.legacy_num_batches() as u32 { assert_eq!(reader.legacy_num_rows_in_batch(i).unwrap(), 10); @@ -1634,7 +1632,7 @@ mod tests { #[rstest] #[tokio::test] - async fn test_write_manifest(#[values(false, true)] use_experimental_writer: bool) { + async fn test_write_manifest(#[values(false, true)] use_legacy_format: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -1654,7 +1652,7 @@ mod tests { batches, test_uri, Some(WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }), ); @@ -1734,7 +1732,7 @@ mod tests { test_uri, Some(WriteParams { mode: WriteMode::Append, - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) @@ -1745,7 +1743,7 @@ mod tests { #[rstest] #[tokio::test] - async fn append_dataset(#[values(false, true)] use_experimental_writer: bool) { + async fn append_dataset(#[values(false, true)] use_legacy_format: bool) { let test_dir = tempdir().unwrap(); let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( @@ -1763,7 +1761,7 @@ mod tests { let mut write_params = WriteParams { max_rows_per_file: 40, max_rows_per_group: 10, - use_experimental_writer, + use_legacy_format, ..Default::default() }; let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); @@ -1824,7 +1822,7 @@ mod tests { #[rstest] #[tokio::test] - async fn test_self_dataset_append(#[values(false, true)] use_experimental_writer: bool) { + async fn test_self_dataset_append(#[values(false, true)] use_legacy_format: bool) { let test_dir = tempdir().unwrap(); let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( @@ -1842,7 +1840,7 @@ mod tests { let mut write_params = WriteParams { max_rows_per_file: 40, max_rows_per_group: 10, - use_experimental_writer, + use_legacy_format, ..Default::default() }; let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); @@ -1908,7 +1906,7 @@ mod tests { #[rstest] #[tokio::test] async fn test_self_dataset_append_schema_different( - #[values(false, true)] use_experimental_writer: bool, + #[values(false, true)] use_legacy_format: bool, ) { let test_dir = tempdir().unwrap(); @@ -1938,7 +1936,7 @@ mod tests { let mut write_params = WriteParams { max_rows_per_file: 40, max_rows_per_group: 10, - use_experimental_writer, + use_legacy_format, ..Default::default() }; let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); @@ -2023,7 +2021,7 @@ mod tests { #[rstest] #[tokio::test] - async fn overwrite_dataset(#[values(false, true)] use_experimental_writer: bool) { + async fn overwrite_dataset(#[values(false, true)] use_legacy_format: bool) { let test_dir = tempdir().unwrap(); let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( @@ -2041,7 +2039,7 @@ mod tests { let mut write_params = WriteParams { max_rows_per_file: 40, max_rows_per_group: 10, - use_experimental_writer, + use_legacy_format, ..Default::default() }; let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); @@ -2113,7 +2111,7 @@ mod tests { #[rstest] #[tokio::test] - async fn test_fast_count_rows(#[values(false, true)] use_experimental_writer: bool) { + async fn test_fast_count_rows(#[values(false, true)] use_legacy_format: bool) { let test_dir = tempdir().unwrap(); let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( @@ -2136,7 +2134,7 @@ mod tests { let write_params = WriteParams { max_rows_per_file: 40, max_rows_per_group: 10, - use_experimental_writer, + use_legacy_format, ..Default::default() }; let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); @@ -2159,7 +2157,7 @@ mod tests { #[rstest] #[tokio::test] - async fn test_create_index(#[values(false, true)] use_experimental_writer: bool) { + async fn test_create_index(#[values(false, true)] use_legacy_format: bool) { let test_dir = tempdir().unwrap(); let dimension = 16; @@ -2189,7 +2187,7 @@ mod tests { reader, test_uri, Some(WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) @@ -2217,7 +2215,7 @@ mod tests { // Append should inherit index let write_params = WriteParams { mode: WriteMode::Append, - use_experimental_writer, + use_legacy_format, ..Default::default() }; let batches = vec![RecordBatch::try_new(schema.clone(), vec![vectors.clone()]).unwrap()]; @@ -2253,7 +2251,7 @@ mod tests { // Overwrite should invalidate index let write_params = WriteParams { mode: WriteMode::Overwrite, - use_experimental_writer, + use_legacy_format, ..Default::default() }; let batches = vec![RecordBatch::try_new(schema.clone(), vec![vectors]).unwrap()]; @@ -2272,7 +2270,7 @@ mod tests { #[rstest] #[tokio::test] - async fn test_create_scalar_index(#[values(false, true)] use_experimental_writer: bool) { + async fn test_create_scalar_index(#[values(false, true)] use_legacy_format: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -2282,7 +2280,7 @@ mod tests { data.into_reader_rows(RowCount::from(16 * 1024), BatchCount::from(4)), test_uri, Some(WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) @@ -2312,7 +2310,7 @@ mod tests { dataset.index_statistics(&index_name).await.unwrap(); } - async fn create_bad_file(use_experimental_writer: bool) -> Result { + async fn create_bad_file(use_legacy_format: bool) -> Result { let test_dir = tempdir().unwrap(); let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( @@ -2336,7 +2334,7 @@ mod tests { reader, test_uri, Some(WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) @@ -2345,9 +2343,9 @@ mod tests { #[rstest] #[tokio::test] - async fn test_bad_field_name(#[values(false, true)] use_experimental_writer: bool) { + async fn test_bad_field_name(#[values(false, true)] use_legacy_format: bool) { // don't allow `.` in the field name - assert!(create_bad_file(use_experimental_writer).await.is_err()); + assert!(create_bad_file(use_legacy_format).await.is_err()); } #[tokio::test] @@ -2358,7 +2356,7 @@ mod tests { #[rstest] #[tokio::test] - async fn test_merge(#[values(false, true)] use_experimental_writer: bool) { + async fn test_merge(#[values(false, true)] use_legacy_format: bool) { let schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("i", DataType::Int32, false), ArrowField::new("x", DataType::Float32, false), @@ -2385,7 +2383,7 @@ mod tests { let write_params = WriteParams { mode: WriteMode::Append, - use_experimental_writer, + use_legacy_format, ..Default::default() }; @@ -2475,7 +2473,7 @@ mod tests { #[rstest] #[tokio::test] - async fn test_large_merge(#[values(false, true)] use_experimental_writer: bool) { + async fn test_large_merge(#[values(false, true)] use_legacy_format: bool) { // Tests a merge that spans multiple batches within files // This test also tests "null filling" when merging (e.g. when keys do not match @@ -2491,7 +2489,7 @@ mod tests { let write_params = WriteParams { mode: WriteMode::Append, - use_experimental_writer, + use_legacy_format, max_rows_per_file: 1024, max_rows_per_group: 150, ..Default::default() @@ -2515,7 +2513,7 @@ mod tests { #[rstest] #[tokio::test] - async fn test_delete(#[values(false, true)] use_experimental_writer: bool) { + async fn test_delete(#[values(false, true)] use_legacy_format: bool) { use std::collections::HashSet; fn sequence_data(range: Range) -> RecordBatch { @@ -2543,7 +2541,7 @@ mod tests { let data = sequence_data(0..100); // Split over two files. let batches = vec![data.slice(0, 50), data.slice(50, 50)]; - let mut dataset = TestDatasetGenerator::new(batches, use_experimental_writer) + let mut dataset = TestDatasetGenerator::new(batches, use_legacy_format) .make_hostile(test_uri) .await; @@ -2679,7 +2677,7 @@ mod tests { #[rstest] #[tokio::test] - async fn test_restore(#[values(false, true)] use_experimental_writer: bool) { + async fn test_restore(#[values(false, true)] use_legacy_format: bool) { // Create a table let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( "i", @@ -2699,7 +2697,7 @@ mod tests { reader, test_uri, Some(WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) @@ -2738,7 +2736,7 @@ mod tests { #[rstest] #[tokio::test] - async fn test_search_empty(#[values(false, true)] use_experimental_writer: bool) { + async fn test_search_empty(#[values(false, true)] use_legacy_format: bool) { // Create a table let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( "vec", @@ -2766,7 +2764,7 @@ mod tests { reader, test_uri, Some(WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) @@ -2808,7 +2806,7 @@ mod tests { #[rstest] #[tokio::test] - async fn test_search_empty_after_delete(#[values(false, true)] use_experimental_writer: bool) { + async fn test_search_empty_after_delete(#[values(false, true)] use_legacy_format: bool) { // Create a table let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( "vec", @@ -2836,7 +2834,7 @@ mod tests { reader, test_uri, Some(WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) @@ -2914,7 +2912,7 @@ mod tests { #[rstest] #[tokio::test] - async fn test_num_small_files(#[values(false, true)] use_experimental_writer: bool) { + async fn test_num_small_files(#[values(false, true)] use_legacy_format: bool) { let test_dir = tempdir().unwrap(); let dimensions = 16; let column_name = "vec"; @@ -2943,7 +2941,7 @@ mod tests { reader, test_uri, Some(WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) @@ -3054,7 +3052,7 @@ mod tests { #[rstest] #[tokio::test] - async fn test_v0_7_5_migration(#[values(false, true)] use_experimental_writer: bool) { + async fn test_v0_7_5_migration(#[values(false, true)] use_legacy_format: bool) { // We migrate to add Fragment.physical_rows and DeletionFile.num_deletions // after this version. @@ -3083,7 +3081,7 @@ mod tests { let batches = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); let write_params = WriteParams { mode: WriteMode::Append, - use_experimental_writer, + use_legacy_format, ..Default::default() }; let dataset = Dataset::write(batches, test_uri, Some(write_params)) @@ -3124,9 +3122,7 @@ mod tests { #[rstest] #[tokio::test] - async fn test_fix_v0_8_0_broken_migration( - #[values(false, true)] use_experimental_writer: bool, - ) { + async fn test_fix_v0_8_0_broken_migration(#[values(false, true)] use_legacy_format: bool) { // The migration from v0.7.5 was broken in 0.8.0. This validates we can // automatically fix tables that have this problem. @@ -3156,7 +3152,7 @@ mod tests { let batches = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); let write_params = WriteParams { mode: WriteMode::Append, - use_experimental_writer, + use_legacy_format, ..Default::default() }; let dataset = Dataset::write(batches, test_uri, Some(write_params)) @@ -3206,7 +3202,7 @@ mod tests { #[rstest] #[tokio::test] async fn test_v0_8_14_invalid_index_fragment_bitmap( - #[values(false, true)] use_experimental_writer: bool, + #[values(false, true)] use_legacy_format: bool, ) { // Old versions of lance could create an index whose fragment bitmap was // invalid because it did not include fragments that were part of the index @@ -3254,7 +3250,7 @@ mod tests { .append( data, Some(WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) @@ -3339,9 +3335,7 @@ mod tests { #[rstest] #[tokio::test] - async fn test_bfloat16_roundtrip( - #[values(false, true)] use_experimental_writer: bool, - ) -> Result<()> { + async fn test_bfloat16_roundtrip(#[values(false, true)] use_legacy_format: bool) -> Result<()> { let inner_field = Arc::new( ArrowField::new("item", DataType::FixedSizeBinary(2), true).with_metadata( [ @@ -3371,7 +3365,7 @@ mod tests { RecordBatchIterator::new(vec![Ok(batch.clone())], schema.clone()), test_uri, Some(WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) diff --git a/rust/lance/src/dataset/fragment.rs b/rust/lance/src/dataset/fragment.rs index e80d86e1c1..333faf311b 100644 --- a/rust/lance/src/dataset/fragment.rs +++ b/rust/lance/src/dataset/fragment.rs @@ -1617,7 +1617,7 @@ mod tests { use super::*; use crate::dataset::transaction::Operation; - async fn create_dataset(test_uri: &str, use_experimental_writer: bool) -> Dataset { + async fn create_dataset(test_uri: &str, use_legacy_format: bool) -> Dataset { let schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("i", DataType::Int32, true), ArrowField::new("s", DataType::Utf8, true), @@ -1641,7 +1641,7 @@ mod tests { let write_params = WriteParams { max_rows_per_file: 40, max_rows_per_group: 10, - use_experimental_writer, + use_legacy_format, ..Default::default() }; let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); @@ -1672,7 +1672,7 @@ mod tests { let write_params = WriteParams { max_rows_per_file: 40, max_rows_per_group: 10, - use_experimental_writer: true, + use_legacy_format: false, ..Default::default() }; let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); @@ -1687,7 +1687,7 @@ mod tests { async fn test_fragment_scan() { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); - let dataset = create_dataset(test_uri, false).await; + let dataset = create_dataset(test_uri, true).await; let fragment = &dataset.get_fragments()[2]; let mut scanner = fragment.scan(); let batches = scanner @@ -1768,7 +1768,7 @@ mod tests { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); // Creates 400 rows in 10 fragments - let mut dataset = create_dataset(test_uri, false).await; + let mut dataset = create_dataset(test_uri, true).await; // Delete last 20 rows in first fragment dataset.delete("i >= 20").await.unwrap(); // Last fragment has 20 rows but 40 addressible rows @@ -1796,7 +1796,7 @@ mod tests { async fn test_fragment_scan_deletions() { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); - let mut dataset = create_dataset(test_uri, false).await; + let mut dataset = create_dataset(test_uri, true).await; dataset.delete("i >= 0 and i < 15").await.unwrap(); let fragment = &dataset.get_fragments()[0]; @@ -1827,7 +1827,7 @@ mod tests { async fn test_fragment_take_indices() { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); - let mut dataset = create_dataset(test_uri, false).await; + let mut dataset = create_dataset(test_uri, true).await; let fragment = dataset .get_fragments() .into_iter() @@ -1875,7 +1875,7 @@ mod tests { async fn test_fragment_take_rows() { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); - let mut dataset = create_dataset(test_uri, false).await; + let mut dataset = create_dataset(test_uri, true).await; let fragment = dataset .get_fragments() .into_iter() @@ -1940,7 +1940,7 @@ mod tests { async fn test_recommit_from_file() { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); - let dataset = create_dataset(test_uri, false).await; + let dataset = create_dataset(test_uri, true).await; let schema = dataset.schema(); let dataset_rows = dataset.count_rows(None).await.unwrap(); @@ -1984,10 +1984,10 @@ mod tests { #[rstest] #[tokio::test] - async fn test_fragment_count(#[values(false, true)] use_experimental_writer: bool) { + async fn test_fragment_count(#[values(false, true)] use_legacy_format: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); - let dataset = create_dataset(test_uri, use_experimental_writer).await; + let dataset = create_dataset(test_uri, use_legacy_format).await; let fragment = dataset.get_fragments().pop().unwrap(); assert_eq!(fragment.count_rows().await.unwrap(), 40); @@ -2013,11 +2013,11 @@ mod tests { #[rstest] #[tokio::test] - async fn test_append_new_columns(#[values(false, true)] use_experimental_writer: bool) { + async fn test_append_new_columns(#[values(false, true)] use_legacy_format: bool) { for with_delete in [true, false] { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); - let mut dataset = create_dataset(test_uri, use_experimental_writer).await; + let mut dataset = create_dataset(test_uri, use_legacy_format).await; dataset.validate().await.unwrap(); assert_eq!(dataset.count_rows(None).await.unwrap(), 200); @@ -2102,10 +2102,10 @@ mod tests { #[rstest] #[tokio::test] - async fn test_merge_fragment(#[values(false, true)] use_experimental_writer: bool) { + async fn test_merge_fragment(#[values(false, true)] use_legacy_format: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); - let mut dataset = create_dataset(test_uri, use_experimental_writer).await; + let mut dataset = create_dataset(test_uri, use_legacy_format).await; dataset.validate().await.unwrap(); assert_eq!(dataset.count_rows(None).await.unwrap(), 200); diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index b93ed4ed9b..462ed96dbe 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -1530,7 +1530,7 @@ pub mod test_dataset { } impl TestVectorDataset { - pub async fn new(use_experimental_writer: bool) -> Result { + pub async fn new(use_legacy_format: bool) -> Result { let tmp_dir = tempdir()?; let path = tmp_dir.path().to_str().unwrap(); @@ -1576,7 +1576,7 @@ pub mod test_dataset { let params = WriteParams { max_rows_per_group: 10, - use_experimental_writer, + use_legacy_format, ..Default::default() }; let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); @@ -1776,7 +1776,7 @@ mod test { #[tokio::test] async fn test_filter_parsing() -> Result<()> { - let test_ds = TestVectorDataset::new(false).await?; + let test_ds = TestVectorDataset::new(true).await?; let dataset = &test_ds.dataset; let mut scan = dataset.scan(); @@ -1809,8 +1809,8 @@ mod test { #[rstest] #[tokio::test] - async fn test_limit(#[values(false, true)] use_experimental_writer: bool) -> Result<()> { - let test_ds = TestVectorDataset::new(use_experimental_writer).await?; + async fn test_limit(#[values(false, true)] use_legacy_format: bool) -> Result<()> { + let test_ds = TestVectorDataset::new(use_legacy_format).await?; let dataset = &test_ds.dataset; let full_data = dataset.scan().try_into_batch().await?.slice(19, 2); @@ -1827,11 +1827,9 @@ mod test { #[rstest] #[tokio::test] - async fn test_knn_nodes(#[values(false, true)] use_experimental_writer: bool) { + async fn test_knn_nodes(#[values(false, true)] use_legacy_format: bool) { for build_index in &[true, false] { - let mut test_ds = TestVectorDataset::new(use_experimental_writer) - .await - .unwrap(); + let mut test_ds = TestVectorDataset::new(use_legacy_format).await.unwrap(); if *build_index { test_ds.make_vector_index().await.unwrap(); } @@ -1884,10 +1882,8 @@ mod test { #[rstest] #[tokio::test] - async fn test_knn_with_new_data(#[values(false, true)] use_experimental_writer: bool) { - let mut test_ds = TestVectorDataset::new(use_experimental_writer) - .await - .unwrap(); + async fn test_knn_with_new_data(#[values(false, true)] use_legacy_format: bool) { + let mut test_ds = TestVectorDataset::new(use_legacy_format).await.unwrap(); test_ds.make_vector_index().await.unwrap(); test_ds.append_new_data().await.unwrap(); let dataset = &test_ds.dataset; @@ -1966,10 +1962,8 @@ mod test { #[rstest] #[tokio::test] - async fn test_knn_with_prefilter(#[values(false, true)] use_experimental_writer: bool) { - let mut test_ds = TestVectorDataset::new(use_experimental_writer) - .await - .unwrap(); + async fn test_knn_with_prefilter(#[values(false, true)] use_legacy_format: bool) { + let mut test_ds = TestVectorDataset::new(use_legacy_format).await.unwrap(); test_ds.make_vector_index().await.unwrap(); let dataset = &test_ds.dataset; @@ -2025,13 +2019,11 @@ mod test { #[rstest] #[tokio::test] - async fn test_knn_filter_new_data(#[values(false, true)] use_experimental_writer: bool) { + async fn test_knn_filter_new_data(#[values(false, true)] use_legacy_format: bool) { // This test verifies that a filter (prefilter or postfilter) gets applied to the flat KNN results // in a combined KNN scan (a scan that combines results from an indexed ANN with an unindexed flat // search of new data) - let mut test_ds = TestVectorDataset::new(use_experimental_writer) - .await - .unwrap(); + let mut test_ds = TestVectorDataset::new(use_legacy_format).await.unwrap(); test_ds.make_vector_index().await.unwrap(); test_ds.append_new_data().await.unwrap(); let dataset = &test_ds.dataset; @@ -2091,10 +2083,8 @@ mod test { #[rstest] #[tokio::test] - async fn test_knn_with_filter(#[values(false, true)] use_experimental_writer: bool) { - let test_ds = TestVectorDataset::new(use_experimental_writer) - .await - .unwrap(); + async fn test_knn_with_filter(#[values(false, true)] use_legacy_format: bool) { + let test_ds = TestVectorDataset::new(use_legacy_format).await.unwrap(); let dataset = &test_ds.dataset; let mut scan = dataset.scan(); @@ -2144,10 +2134,8 @@ mod test { #[rstest] #[tokio::test] - async fn test_refine_factor(#[values(false, true)] use_experimental_writer: bool) { - let test_ds = TestVectorDataset::new(use_experimental_writer) - .await - .unwrap(); + async fn test_refine_factor(#[values(false, true)] use_legacy_format: bool) { + let test_ds = TestVectorDataset::new(use_legacy_format).await.unwrap(); let dataset = &test_ds.dataset; let mut scan = dataset.scan(); @@ -2197,7 +2185,7 @@ mod test { #[tokio::test] async fn test_scan_unordered_with_row_id() { // This test doesn't make sense for v2 files, there is no way to get an out-of-order scan - let test_ds = TestVectorDataset::new(/*use_experimental_writer=*/ false) + let test_ds = TestVectorDataset::new(/*use_legacy_format=*/ true) .await .unwrap(); let dataset = &test_ds.dataset; @@ -2247,7 +2235,7 @@ mod test { #[rstest] #[tokio::test] - async fn test_scan_order(#[values(false, true)] use_experimental_writer: bool) { + async fn test_scan_order(#[values(false, true)] use_legacy_format: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -2271,7 +2259,7 @@ mod test { let params = WriteParams { mode: WriteMode::Append, - use_experimental_writer, + use_legacy_format, ..Default::default() }; @@ -2318,7 +2306,7 @@ mod test { #[rstest] #[tokio::test] - async fn test_scan_sort(#[values(false, true)] use_experimental_writer: bool) { + async fn test_scan_sort(#[values(false, true)] use_legacy_format: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -2351,7 +2339,7 @@ mod test { data.into_reader_rows(RowCount::from(5), BatchCount::from(1)), test_uri, Some(WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) @@ -2405,7 +2393,7 @@ mod test { #[rstest] #[tokio::test] - async fn test_sort_multi_columns(#[values(false, true)] use_experimental_writer: bool) { + async fn test_sort_multi_columns(#[values(false, true)] use_legacy_format: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -2430,7 +2418,7 @@ mod test { data.into_reader_rows(RowCount::from(5), BatchCount::from(1)), test_uri, Some(WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) @@ -2458,7 +2446,7 @@ mod test { #[rstest] #[tokio::test] - async fn test_ann_prefilter(#[values(false, true)] use_experimental_writer: bool) { + async fn test_ann_prefilter(#[values(false, true)] use_legacy_format: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -2479,7 +2467,7 @@ mod test { .unwrap()]; let write_params = WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }; let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); @@ -2530,7 +2518,7 @@ mod test { #[rstest] #[tokio::test] - async fn test_filter_on_large_utf8(#[values(false, true)] use_experimental_writer: bool) { + async fn test_filter_on_large_utf8(#[values(false, true)] use_legacy_format: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -2549,7 +2537,7 @@ mod test { .unwrap()]; let write_params = WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }; let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); @@ -2583,7 +2571,7 @@ mod test { #[rstest] #[tokio::test] - async fn test_filter_with_regex(#[values(false, true)] use_experimental_writer: bool) { + async fn test_filter_with_regex(#[values(false, true)] use_legacy_format: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -2602,7 +2590,7 @@ mod test { .unwrap()]; let write_params = WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }; let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); @@ -2671,7 +2659,7 @@ mod test { let write_params = WriteParams { max_rows_per_file: 40, max_rows_per_group: 10, - use_experimental_writer: false, + use_legacy_format: true, ..Default::default() }; Dataset::write(batches, test_uri, Some(write_params)) @@ -2724,7 +2712,7 @@ mod test { #[rstest] #[tokio::test] - async fn test_ann_with_deletion(#[values(false, true)] use_experimental_writer: bool) { + async fn test_ann_with_deletion(#[values(false, true)] use_legacy_format: bool) { let vec_params = vec![ // TODO: re-enable diskann test when we can tune to get reproducible results. // VectorIndexParams::with_diskann_params(MetricType::L2, DiskANNParams::new(10, 1.5, 10)), @@ -2766,7 +2754,7 @@ mod test { reader, test_uri, Some(WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) @@ -2866,7 +2854,7 @@ mod test { test_uri, Some(WriteParams { mode: WriteMode::Append, - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) @@ -2915,7 +2903,7 @@ mod test { #[rstest] #[tokio::test] - async fn test_count_rows_with_filter(#[values(false, true)] use_experimental_writer: bool) { + async fn test_count_rows_with_filter(#[values(false, true)] use_legacy_format: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); let mut data_gen = BatchGenerator::new().col(Box::new( @@ -2925,7 +2913,7 @@ mod test { data_gen.batch(32), test_uri, Some(WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) @@ -2948,7 +2936,7 @@ mod test { #[rstest] #[tokio::test] - async fn test_dynamic_projection(#[values(false, true)] use_experimental_writer: bool) { + async fn test_dynamic_projection(#[values(false, true)] use_legacy_format: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); let mut data_gen = @@ -2957,7 +2945,7 @@ mod test { data_gen.batch(32), test_uri, Some(WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) @@ -2993,7 +2981,7 @@ mod test { #[rstest] #[tokio::test] - async fn test_column_casting_function(#[values(false, true)] use_experimental_writer: bool) { + async fn test_column_casting_function(#[values(false, true)] use_legacy_format: bool) { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); let mut data_gen = @@ -3002,7 +2990,7 @@ mod test { data_gen.batch(32), test_uri, Some(WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) @@ -3084,7 +3072,7 @@ mod test { } impl ScalarIndexTestFixture { - async fn new(use_experimental_writer: bool) -> Self { + async fn new(use_legacy_format: bool) -> Self { let test_dir = tempdir().unwrap(); let test_uri = test_dir.path().to_str().unwrap(); @@ -3109,7 +3097,7 @@ mod test { test_uri, Some(WriteParams { max_rows_per_file: 500, - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) @@ -3160,7 +3148,7 @@ mod test { .append( RecordBatchIterator::new(vec![Ok(append_data)], data.schema()), Some(WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) @@ -3482,8 +3470,8 @@ mod test { // different configurations to ensure that we get consistent results #[rstest] #[tokio::test] - async fn test_secondary_index_scans(#[values(false, true)] use_experimental_writer: bool) { - let fixture = ScalarIndexTestFixture::new(use_experimental_writer).await; + async fn test_secondary_index_scans(#[values(false, true)] use_legacy_format: bool) { + let fixture = ScalarIndexTestFixture::new(use_legacy_format).await; for use_index in [false, true] { for use_projection in [false, true] { @@ -3568,7 +3556,7 @@ mod test { #[rstest] #[tokio::test] - async fn test_late_materialization(#[values(false, true)] use_experimental_writer: bool) { + async fn test_late_materialization(#[values(false, true)] use_legacy_format: bool) { // Create a large dataset with a scalar indexed column and a sorted but not scalar // indexed column let data = gen() @@ -3589,7 +3577,7 @@ mod test { object_store_wrapper: Some(io_stats_wrapper), ..Default::default() }), - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) @@ -3670,9 +3658,7 @@ mod test { #[rstest] #[tokio::test] - async fn test_project_nested( - #[values(false, true)] use_experimental_writer: bool, - ) -> Result<()> { + async fn test_project_nested(#[values(false, true)] use_legacy_format: bool) -> Result<()> { let struct_i_field = ArrowField::new("i", DataType::Int32, true); let struct_o_field = ArrowField::new("o", DataType::Utf8, true); let schema = Arc::new(ArrowSchema::new(vec![ @@ -3713,7 +3699,7 @@ mod test { let write_params = WriteParams { max_rows_per_file: 40, max_rows_per_group: 10, - use_experimental_writer, + use_legacy_format, ..Default::default() }; Dataset::write(batches, test_uri, Some(write_params)) @@ -3739,14 +3725,14 @@ mod test { #[rstest] #[tokio::test] - async fn test_plans(#[values(false, true)] use_experimental_writer: bool) -> Result<()> { + async fn test_plans(#[values(false, true)] use_legacy_format: bool) -> Result<()> { // Create a vector dataset - let mut dataset = TestVectorDataset::new(use_experimental_writer).await?; + let mut dataset = TestVectorDataset::new(use_legacy_format).await?; // Scans // --------------------------------------------------------------------- // Experimental writer does not use LancePushdownScan - if !use_experimental_writer { + if use_legacy_format { assert_plan_equals( &dataset.dataset, |scan| scan.project(&["s"])?.filter("i > 10 and i < 20"), diff --git a/rust/lance/src/dataset/schema_evolution.rs b/rust/lance/src/dataset/schema_evolution.rs index 3ea9a536d8..eb228cc1c2 100644 --- a/rust/lance/src/dataset/schema_evolution.rs +++ b/rust/lance/src/dataset/schema_evolution.rs @@ -561,7 +561,7 @@ mod test { reader, test_uri, Some(WriteParams { - use_experimental_writer: false, + use_legacy_format: true, ..Default::default() }), ) @@ -622,9 +622,7 @@ mod test { #[rstest] #[tokio::test] - async fn test_append_columns_udf( - #[values(false, true)] use_experimental_writer: bool, - ) -> Result<()> { + async fn test_append_columns_udf(#[values(false, true)] use_legacy_format: bool) -> Result<()> { use arrow_array::Float64Array; let num_rows = 5; @@ -646,7 +644,7 @@ mod test { reader, test_uri, Some(WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) @@ -752,7 +750,7 @@ mod test { Some(WriteParams { max_rows_per_file: 50, max_rows_per_group: 25, - use_experimental_writer: false, + use_legacy_format: true, ..Default::default() }), ) @@ -897,9 +895,7 @@ mod test { #[rstest] #[tokio::test] - async fn test_rename_columns( - #[values(false, true)] use_experimental_writer: bool, - ) -> Result<()> { + async fn test_rename_columns(#[values(false, true)] use_legacy_format: bool) -> Result<()> { use std::collections::HashMap; use arrow_array::{ArrayRef, StructArray}; @@ -941,7 +937,7 @@ mod test { batches, test_uri, Some(WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) @@ -1013,7 +1009,7 @@ mod test { #[rstest] #[tokio::test] - async fn test_cast_column(#[values(false, true)] use_experimental_writer: bool) -> Result<()> { + async fn test_cast_column(#[values(false, true)] use_legacy_format: bool) -> Result<()> { // Create a table with 2 scalar columns, 1 vector column use arrow::datatypes::{Int32Type, Int64Type}; @@ -1065,7 +1061,7 @@ mod test { RecordBatchIterator::new(vec![Ok(batch.clone())], schema.clone()), test_uri, Some(WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) @@ -1216,7 +1212,7 @@ mod test { #[rstest] #[tokio::test] - async fn test_drop_columns(#[values(false, true)] use_experimental_writer: bool) -> Result<()> { + async fn test_drop_columns(#[values(false, true)] use_legacy_format: bool) -> Result<()> { use std::collections::HashMap; use arrow_array::{ArrayRef, Float32Array, StructArray}; @@ -1265,7 +1261,7 @@ mod test { batches, test_uri, Some(WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) @@ -1310,9 +1306,7 @@ mod test { #[rstest] #[tokio::test] - async fn test_drop_add_columns( - #[values(false, true)] use_experimental_writer: bool, - ) -> Result<()> { + async fn test_drop_add_columns(#[values(false, true)] use_legacy_format: bool) -> Result<()> { let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( "i", DataType::Int32, @@ -1329,7 +1323,7 @@ mod test { batches, test_uri, Some(WriteParams { - use_experimental_writer, + use_legacy_format, ..Default::default() }), ) diff --git a/rust/lance/src/dataset/take.rs b/rust/lance/src/dataset/take.rs index 1b67c6d06f..257a2b5b0f 100644 --- a/rust/lance/src/dataset/take.rs +++ b/rust/lance/src/dataset/take.rs @@ -373,7 +373,7 @@ mod test { #[rstest] #[tokio::test] - async fn test_take(#[values(false, true)] use_experimental_writer: bool) { + async fn test_take(#[values(false, true)] use_legacy_format: bool) { let test_dir = tempfile::tempdir().unwrap(); let schema = Arc::new(ArrowSchema::new(vec![ @@ -398,7 +398,7 @@ mod test { let write_params = WriteParams { max_rows_per_file: 40, max_rows_per_group: 10, - use_experimental_writer, + use_legacy_format, ..Default::default() }; let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); @@ -445,11 +445,9 @@ mod test { #[rstest] #[tokio::test] - async fn test_take_rows_out_of_bound(#[values(false, true)] use_experimental_writer: bool) { + async fn test_take_rows_out_of_bound(#[values(false, true)] use_legacy_format: bool) { // a dataset with 1 fragment and 400 rows - let test_ds = TestVectorDataset::new(use_experimental_writer) - .await - .unwrap(); + let test_ds = TestVectorDataset::new(use_legacy_format).await.unwrap(); let ds = test_ds.dataset; // take the last row of first fragment @@ -486,7 +484,7 @@ mod test { #[rstest] #[tokio::test] - async fn test_take_rows(#[values(false, true)] use_experimental_writer: bool) { + async fn test_take_rows(#[values(false, true)] use_legacy_format: bool) { let test_dir = tempfile::tempdir().unwrap(); let schema = Arc::new(ArrowSchema::new(vec![ @@ -511,7 +509,7 @@ mod test { let write_params = WriteParams { max_rows_per_file: 40, max_rows_per_group: 10, - use_experimental_writer, + use_legacy_format, ..Default::default() }; let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone()); @@ -568,7 +566,7 @@ mod test { #[rstest] #[tokio::test] - async fn take_scan_dataset(#[values(false, true)] use_experimental_writer: bool) { + async fn take_scan_dataset(#[values(false, true)] use_legacy_format: bool) { use arrow::datatypes::Int32Type; use arrow_array::Float32Array; @@ -590,7 +588,7 @@ mod test { let write_params = WriteParams { max_rows_per_group: 2, - use_experimental_writer, + use_legacy_format, ..Default::default() }; diff --git a/rust/lance/src/dataset/updater.rs b/rust/lance/src/dataset/updater.rs index 93a8859126..b03df8c53e 100644 --- a/rust/lance/src/dataset/updater.rs +++ b/rust/lance/src/dataset/updater.rs @@ -132,7 +132,7 @@ impl Updater { &self.fragment.dataset().object_store, &schema, &self.fragment.dataset().base, - !is_legacy, + is_legacy, ) .await } diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index ddb3371af2..c7c4fb08de 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -96,11 +96,11 @@ pub struct WriteParams { /// must also be provided. pub commit_handler: Option>, - /// If set to true then the Lance v2 writer will be used instead of the Lance v1 writer + /// If set to true then the Lance v1 writer will be used instead of the Lance v2 writer /// - /// Unless you are intentionally testing the v2 writer, you should leave this as false + /// Unless you are intentionally testing the v2 writer, you should leave this as true /// as the v2 writer is still experimental and not fully implemented. - pub use_experimental_writer: bool, + pub use_legacy_format: bool, /// Experimental: if set to true, the writer will use move-stable row ids. /// These row ids are stable after compaction operations, but not after updates. @@ -121,7 +121,7 @@ impl Default for WriteParams { store_params: None, progress: Arc::new(NoopFragmentWriteProgress::new()), commit_handler: None, - use_experimental_writer: false, + use_legacy_format: true, enable_move_stable_row_ids: false, } } @@ -215,20 +215,16 @@ pub async fn write_fragments_internal( schema }; - let mut buffered_reader = if params.use_experimental_writer { + let mut buffered_reader = if params.use_legacy_format { + chunk_stream(data, params.max_rows_per_group) + } else { // In v2 we don't care about group size but we do want to chunk // by max_rows_per_file chunk_stream(data, params.max_rows_per_file) - } else { - chunk_stream(data, params.max_rows_per_group) }; - let writer_generator = WriterGenerator::new( - object_store, - base_dir, - schema, - params.use_experimental_writer, - ); + let writer_generator = + WriterGenerator::new(object_store, base_dir, schema, params.use_legacy_format); let mut writer: Option> = None; let mut num_rows_in_current_file = 0; let mut fragments = Vec::new(); @@ -351,21 +347,13 @@ pub async fn open_writer( object_store: &ObjectStore, schema: &Schema, base_dir: &Path, - use_v2: bool, + use_legacy_format: bool, ) -> Result> { let filename = format!("{}.lance", Uuid::new_v4()); let full_path = base_dir.child(DATA_DIR).child(filename.as_str()); - let writer = if use_v2 { - let writer = object_store.create(&full_path).await?; - Box::new(v2::writer::FileWriter::try_new( - writer, - filename, - schema.clone(), - FileWriterOptions::default(), - )?) as Box - } else { + let writer = if use_legacy_format { Box::new(( FileWriter::::try_new( object_store, @@ -376,6 +364,14 @@ pub async fn open_writer( .await?, filename, )) + } else { + let writer = object_store.create(&full_path).await?; + Box::new(v2::writer::FileWriter::try_new( + writer, + filename, + schema.clone(), + FileWriterOptions::default(), + )?) as Box }; Ok(writer) } @@ -385,7 +381,7 @@ struct WriterGenerator { object_store: Arc, base_dir: Path, schema: Schema, - use_v2: bool, + use_legacy_format: bool, } impl WriterGenerator { @@ -393,13 +389,13 @@ impl WriterGenerator { object_store: Arc, base_dir: &Path, schema: &Schema, - use_v2: bool, + use_legacy_format: bool, ) -> Self { Self { object_store, base_dir: base_dir.clone(), schema: schema.clone(), - use_v2, + use_legacy_format, } } @@ -411,7 +407,7 @@ impl WriterGenerator { &self.object_store, &self.schema, &self.base_dir, - self.use_v2, + self.use_legacy_format, ) .await?; @@ -572,7 +568,7 @@ mod tests { .unwrap(); let write_params = WriteParams { - use_experimental_writer: true, + use_legacy_format: false, // This parameter should be ignored max_rows_per_group: 1, ..Default::default() @@ -639,7 +635,7 @@ mod tests { .unwrap(); let write_params = WriteParams { - use_experimental_writer: false, + use_legacy_format: true, ..Default::default() }; let data_stream = Box::pin(RecordBatchStreamAdapter::new( diff --git a/rust/lance/src/session/index_extension.rs b/rust/lance/src/session/index_extension.rs index 8b9a8dff9e..09190efc53 100644 --- a/rust/lance/src/session/index_extension.rs +++ b/rust/lance/src/session/index_extension.rs @@ -267,13 +267,9 @@ mod test { #[rstest] #[tokio::test] - async fn test_vector_index_extension_roundtrip( - #[values(false, true)] use_experimental_writer: bool, - ) { + async fn test_vector_index_extension_roundtrip(#[values(false, true)] use_legacy_format: bool) { // make dataset and index that is not supported natively - let test_ds = TestVectorDataset::new(use_experimental_writer) - .await - .unwrap(); + let test_ds = TestVectorDataset::new(use_legacy_format).await.unwrap(); let idx = test_ds.dataset.load_indices().await.unwrap(); assert_eq!(idx.len(), 0); diff --git a/rust/lance/src/utils/test.rs b/rust/lance/src/utils/test.rs index e60c45e11f..1fa32968a9 100644 --- a/rust/lance/src/utils/test.rs +++ b/rust/lance/src/utils/test.rs @@ -35,19 +35,19 @@ use crate::Dataset; pub struct TestDatasetGenerator { seed: Option, data: Vec, - use_experimental_writer: bool, + use_legacy_format: bool, } impl TestDatasetGenerator { /// Create a new dataset generator with the given data. /// /// Each batch will become a separate fragment in the dataset. - pub fn new(data: Vec, use_experimental_writer: bool) -> Self { + pub fn new(data: Vec, use_legacy_format: bool) -> Self { assert!(!data.is_empty()); Self { data, seed: None, - use_experimental_writer, + use_legacy_format, } } @@ -195,7 +195,7 @@ impl TestDatasetGenerator { let sub_frag = FragmentCreateBuilder::new(uri) .schema(&file_schema) .write_params(&WriteParams { - use_experimental_writer: self.use_experimental_writer, + use_legacy_format: self.use_legacy_format, ..Default::default() }) .write(reader, None) @@ -403,7 +403,7 @@ mod tests { #[rstest] #[test] - fn test_make_schema(#[values(false, true)] use_experimental_writer: bool) { + fn test_make_schema(#[values(false, true)] use_legacy_format: bool) { let arrow_schema = Arc::new(ArrowSchema::new(vec![ ArrowField::new("a", DataType::Int32, false), ArrowField::new( @@ -421,7 +421,7 @@ mod tests { ])); let data = vec![RecordBatch::new_empty(arrow_schema.clone())]; - let generator = TestDatasetGenerator::new(data, use_experimental_writer); + let generator = TestDatasetGenerator::new(data, use_legacy_format); let schema = generator.make_schema(&mut rand::thread_rng()); let roundtripped_schema = ArrowSchema::from(&schema); @@ -445,7 +445,7 @@ mod tests { #[rstest] #[tokio::test] - async fn test_make_fragment(#[values(false, true)] use_experimental_writer: bool) { + async fn test_make_fragment(#[values(false, true)] use_legacy_format: bool) { let tmp_dir = tempfile::tempdir().unwrap(); let struct_fields: ArrowFields = vec![ @@ -475,7 +475,7 @@ mod tests { ) .unwrap(); - let generator = TestDatasetGenerator::new(vec![data.clone()], use_experimental_writer); + let generator = TestDatasetGenerator::new(vec![data.clone()], use_legacy_format); let mut rng = rand::thread_rng(); for _ in 1..50 { let schema = generator.make_schema(&mut rng); @@ -507,7 +507,7 @@ mod tests { #[rstest] #[tokio::test] - async fn test_make_hostile(#[values(false, true)] use_experimental_writer: bool) { + async fn test_make_hostile(#[values(false, true)] use_legacy_format: bool) { let tmp_dir = tempfile::tempdir().unwrap(); let schema = Arc::new(ArrowSchema::new(vec![ @@ -537,7 +537,7 @@ mod tests { ]; let seed = 42; - let generator = TestDatasetGenerator::new(data.clone(), use_experimental_writer).seed(seed); + let generator = TestDatasetGenerator::new(data.clone(), use_legacy_format).seed(seed); let path = tmp_dir.path().join("ds1"); let dataset = generator.make_hostile(path.to_str().unwrap()).await; @@ -559,7 +559,7 @@ mod tests { .map(|rb| rb.project(&projection).unwrap()) .collect::>(); - let generator = TestDatasetGenerator::new(data.clone(), use_experimental_writer); + let generator = TestDatasetGenerator::new(data.clone(), use_legacy_format); // Sample a few for i in 1..20 { let path = tmp_dir.path().join(format!("test_ds_{}_{}", num_cols, i)); From 7e93e58b88ff4089c30f0564a26e896eb5eef510 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Mon, 3 Jun 2024 14:40:30 -0700 Subject: [PATCH 2/2] rustfmt on python --- python/src/dataset.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index b7e75eec04..f075685bbb 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -1171,9 +1171,7 @@ pub fn get_write_params(options: &PyDict) -> PyResult> { if let Some(maybe_nbytes) = get_dict_opt::(options, "max_bytes_per_file")? { p.max_bytes_per_file = maybe_nbytes; } - if let Some(use_legacy_format) = - get_dict_opt::(options, "use_legacy_format")? - { + if let Some(use_legacy_format) = get_dict_opt::(options, "use_legacy_format")? { p.use_legacy_format = use_legacy_format; } if let Some(progress) = get_dict_opt::(options, "progress")? {