Allow create fragment on non-existed dataset.#825
Conversation
| """ | ||
| ds = self._ds.create_version_from_fragments(new_schema, fragments) | ||
| return LanceDataset(self.uri) | ||
| if isinstance(base_uri, Path): |
There was a problem hiding this comment.
i forgot, do we need to do any relative to absolute conversion or $HOME expansion etc here? or is that all done at the Rust level?
There was a problem hiding this comment.
i am not sure which side does the normalization now.
| def _create_version_from_fragments( | ||
| self, | ||
| @staticmethod | ||
| def _commit( |
There was a problem hiding this comment.
ok so basically the intended usage here is:
- each executor node create a new fragment
- each fragment gets written to gs bucket under the lance directory's /data subdir
- call this _commit to create a) a new manifest file and b) update the _latest.manifest file?
There was a problem hiding this comment.
and these fragments can either be created from scratch or appending a column to an existing fragment?
There was a problem hiding this comment.
Yes, that's right. The logic here is to separate the fragments preparation step with the final commit step (to make the version of dataset visible).
This control flow can be used for append new data (fragments), delete fragments, garbage collections later as well.
| elif isinstance(data, pa.Table): | ||
| reader = data.to_reader() | ||
| elif isinstance(data, pa.dataset.Dataset): | ||
| reader = pa.dataset.Scanner.from_dataset(data).to_reader() |
There was a problem hiding this comment.
if the user passes in a LanceDataset it will fall into this case and then fail.
Instead of using the static method, use the Dataset.scanner() (or maybe to_scanner()) API (see the other lance methods for this).
The reason is that pa.dataset.Scanner.from_dataset(...) ends up referring to Dataset private internals specific to the C++ pyarrow implementation (e.g., CDataset or smth).
There was a problem hiding this comment.
Oh, i was re-using the same code in write_dataset. Can make it a to_scanner() i guess.
| def test_create_from_fragments(tmp_path: Path): | ||
| table = pa.Table.from_pydict({"a": range(100), "b": range(100)}) | ||
| base_dir = tmp_path / "test" | ||
| fragment = lance.fragment.LanceFragment.create(base_dir, 1, table) |
There was a problem hiding this comment.
would the input data to create ever be a LanceDataset? I'm wondering if this has the same tokio runtime issue that the read/write APIs have. If so, you may need to convert the input LanceDataset into a pyarrow Table first until we figure out how to deal with that.
There was a problem hiding this comment.
at fragment level? seems no. If you consider this is the first step to create fragment. this is usually just write in memory data to disk.
| .ds | ||
| .create_version_from_fragments(&new_schema_with_id, &fragment_metadata) | ||
| .await | ||
| LanceDataset::commit(dataset_uri, &schema, &fragment_metadata).await |
There was a problem hiding this comment.
is commit the right terminology here?
There was a problem hiding this comment.
So from the DB term, writing all fragments are the preparing phase of a transaction, and this last step "commit" the change to the dataset (as making them visible)?
| let rt = tokio::runtime::Runtime::new()?; | ||
| let metadata = rt.block_on(async { | ||
| let mut batches: Box<dyn RecordBatchReader> = if reader.is_instance_of::<Scanner>()? { | ||
| let scanner: Scanner = reader.extract()?; |
There was a problem hiding this comment.
have you tested this case in the if else here?
| let indices = self.load_indices().await?; | ||
| write_manifest_file(&self.object_store, &mut manifest, Some(indices)).await?; | ||
| let base = self.object_store.base_path().clone(); | ||
| write_manifest_file(&object_store, &mut manifest, Some(indices)).await?; |
There was a problem hiding this comment.
preserving the indices only makes sense if we're appending rows. Here there's no guarantee as to what the input fragments actually represent?
There was a problem hiding this comment.
that's fair. prevervign indices is an available option for adding columns as well tho.
There was a problem hiding this comment.
Add check for the same WriteMode here as well.
|
Addressed comments. |
| @staticmethod | ||
| def create( | ||
| dataset_uri: Union[str, Path], | ||
| fragment_id: int, |
There was a problem hiding this comment.
Does the fragment_id here need to be sequentially increased from 1
It allows user to distributedly create
Fragmentsfirst, and then commit a Dataset later.