Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs: schema evolution #1911

Merged
merged 4 commits into from
Jul 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
263 changes: 221 additions & 42 deletions docs/read_and_write.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,48 +41,6 @@ also supports writing a dataset in iterator of :py:class:`pyarrow.RecordBatch` e
:py:meth:`lance.write_dataset` supports writing :py:class:`pyarrow.Table`, :py:class:`pandas.DataFrame`,
:py:class:`pyarrow.Dataset`, and ``Iterator[pyarrow.RecordBatch]``. Check its doc for more details.

Adding new columns
~~~~~~~~~~~~~~~~~~

New columns can be merged into an existing dataset in using :py:meth:`lance.Dataset.merge`.
This allows filling in additional columns without having to rewrite the whole dataset.

To use the ``merge`` method, provide a new table that includes the columns you
want to add, and a column name to use for joining the new data to the existing
dataset.

For example, imagine we have a dataset of embeddings and ids:

.. testcode::

import lance
import pyarrow as pa
import numpy as np
table = pa.table({
"id": pa.array([1, 2, 3]),
"embedding": pa.array([np.array([1, 2, 3]), np.array([4, 5, 6]),
np.array([7, 8, 9])])
})
dataset = lance.write_dataset(table, "embeddings")

Now if we want to add a column of labels we have generated, we can do so by merging a new table:

.. testcode::

new_data = pa.table({
"id": pa.array([1, 2, 3]),
"label": pa.array(["horse", "rabbit", "cat"])
})
dataset.merge(new_data, "id")
dataset.to_table().to_pandas()

.. testoutput::

id embedding label
0 1 [1, 2, 3] horse
1 2 [4, 5, 6] rabbit
2 3 [7, 8, 9] cat

Deleting rows
~~~~~~~~~~~~~

Expand Down Expand Up @@ -259,6 +217,227 @@ example:
.execute()


Evolving the schema
-------------------

Lance supports schema evolution: adding, removing, and altering columns in a
dataset. Most of these operations can be performed *without* rewriting the
data files in the dataset, making them very efficient operations.

In general, schema changes will conflict with most other concurrent write
operations. For example, if you change the schema of the dataset while someone
else is appending data to it, either your schema change or the append will fail,
depending on the order of the operations. Thus, it's recommended to perform
schema changes when no other writes are happening.

Renaming columns
~~~~~~~~~~~~~~~~

Columns can be renamed using the :py:meth:`lance.LanceDataset.alter_columns`
method.

.. testcode::

import lance
import pyarrow as pa
table = pa.table({"id": pa.array([1, 2, 3])})
dataset = lance.write_dataset(table, "ids")
dataset.alter_columns({"path": "id", "name": "new_id"})
dataset.to_table().to_pandas()

.. testoutput::

new_id
0 1
1 2
2 3

This works for nested columns as well. To address a nested column, use a dot
(``.``) to separate the levels of nesting. For example:

.. testcode::

data = [
{"meta": {"id": 1, "name": "Alice"}},
{"meta": {"id": 2, "name": "Bob"}},
]
dataset = lance.write_dataset(data, "nested_rename")
dataset.alter_columns({"path": "meta.id", "name": "new_id"})

.. testoutput::

meta
0 {"new_id": 1, "name": "Alice"}
1 {"new_id": 2, "name": "Bob"}


Casting column data types
~~~~~~~~~~~~~~~~~~~~~~~~~

In addition to changing column names, you can also change the data type of a
column using the :py:meth:`lance.LanceDataset.alter_columns` method. This
requires rewriting that column to new data files, but does not require rewriting
the other columns.

.. note::

If the column has an index, the index will be dropped if the column type is
changed.

This method can be used to change the vector type of a column. For example, we
can change a float32 embedding column into a float16 column to save disk space
at the cost of lower precision:

.. testcode::

import lance
import pyarrow as pa
import numpy as np
table = pa.table({
"id": pa.array([1, 2, 3]),
"embedding": pa.FixedShapeTensorArray.from_numpy_ndarray(
np.random.rand(3, 128).astype("float32"))
})
dataset = lance.write_dataset(table, "embeddings")
dataset.alter_columns({"path": "embedding",
"type": pa.list_(pa.float16(), 128)})
dataset.schema()

.. testoutput::

id: int64
embedding: fixed_size_list<item: float16, 128>


Adding new columns
~~~~~~~~~~~~~~~~~~~

New columns can be added and populated within a single operation using the
:py:meth:`lance.LanceDataset.add_columns` method. There are two ways to specify
how to populate the new columns: first, by providing a SQL expression for each
new column, or second, by providing a function to generate the new column data.

SQL expressions can either be independent expressions or reference existing
columns. SQL literal values can be used to set a single value for all
existing rows.

.. testcode::

import lance
import pyarrow as pa
table = pa.table({"name": pa.array(["Alice", "Bob", "Carla"])})
dataset = lance.write_dataset(table, "names")
dataset.add_columns({
"hash": "sha256(name)",
"status": "'active'",
})
dataset.to_table().to_pandas()

.. testoutput::

name hash... status
0 Alice 3bc51062973c... active
1 Bob cd9fb1e148cc... active
2 Carla ad8d83ffd82b... active

You can also provide a Python function to generate the new column data. This can
be used, for example, to compute a new embedding column. This function should
take a PyArrow RecordBatch and return either a PyArrow RecordBatch or a Pandas
DataFrame. The function will be called once for each batch in the dataset.

If the function is expensive to compute and can fail, it is recommended to set
a checkpoint file in the UDF. This checkpoint file saves the state of the UDF
after each invocation, so that if the UDF fails, it can be restarted from the
last checkpoint. Note that this file can get quite large, since it needs to store
unsaved results for up to an entire data file.

.. code-block::

import lance
import pyarrow as pa
import numpy as np

table = pa.table({"id": pa.array([1, 2, 3])})
dataset = lance.write_dataset(table, "ids")

@lance.batch_udf(checkpoint_file="embedding_checkpoint.sqlite")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sqlite might an interesting choice of extension to an outside observer. If we want these files to be opaque maybe just .bin?

def add_random_vector(batch):
embeddings = np.random.rand(batch.num_rows, 128).astype("float32")
return pd.DataFrame({"embedding": embeddings})
dataset.add_columns(add_random_vector)


Adding new columns using merge
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

If you have pre-computed one or more new columns, you can add them to an existing
dataset using the :py:meth:`lance.LanceDataset.merge` method. This allows filling in
additional columns without having to rewrite the whole dataset.


To use the ``merge`` method, provide a new dataset that includes the columns you
want to add, and a column name to use for joining the new data to the existing
dataset.

For example, imagine we have a dataset of embeddings and ids:

.. testcode::

import lance
import pyarrow as pa
import numpy as np
table = pa.table({
"id": pa.array([1, 2, 3]),
"embedding": pa.array([np.array([1, 2, 3]), np.array([4, 5, 6]),
np.array([7, 8, 9])])
})
dataset = lance.write_dataset(table, "embeddings")

Now if we want to add a column of labels we have generated, we can do so by merging a new table:

.. testcode::

new_data = pa.table({
"id": pa.array([1, 2, 3]),
"label": pa.array(["horse", "rabbit", "cat"])
})
dataset.merge(new_data, "id")
dataset.to_table().to_pandas()

.. testoutput::

id embedding label
0 1 [1, 2, 3] horse
1 2 [4, 5, 6] rabbit
2 3 [7, 8, 9] cat


Dropping columns
~~~~~~~~~~~~~~~~

Finally, you can drop columns from a dataset using the :py:meth:`lance.LanceDataset.drop_columns`
method. This is a metadata-only operation and does not delete the data on disk. This makes
it very quick.

.. testcode::

import lance
import pyarrow as pa
table = pa.table({"id": pa.array([1, 2, 3]),
"name": pa.array(["Alice", "Bob", "Carla"])})
dataset = lance.write_dataset(table, "names")
dataset.drop_columns(["name"])
dataset.schema()

.. testoutput::

id: int64

To actually remove the data from disk, the files must be rewritten to remove the
columns and then the old files must be deleted. This can be done using
:py:meth:`lance.dataset.DatasetOptimizer.compact_files()` followed by
:py:meth:`lance.LanceDataset.cleanup_old_versions()`.


Reading Lance Dataset
---------------------
Expand Down
14 changes: 9 additions & 5 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -716,10 +716,14 @@ def alter_columns(self, *alterations: Iterable[Dict[str, Any]]):
list columns can be casted between their size variants. For example,
string to large string, binary to large binary, and list to large list.

Columns that are renamed can keep any indices that are on them. However, if
the column is casted to a different type, it's indices will be dropped.

Parameters
----------
alterations : Iterable[Dict[str, Any]]
A sequence of dictionaries, each with the following keys:

- "path": str
The column path to alter. For a top-level column, this is the name.
For a nested column, this is the dot-separated path, e.g. "a.b.c".
Expand Down Expand Up @@ -903,17 +907,17 @@ def add_columns(
def drop_columns(self, columns: List[str]):
"""Drop one or more columns from the dataset

This is a metadata-only operation and does not remove the data from the
underlying storage. In order to remove the data, you must subsequently
call ``compact_files`` to rewrite the data without the removed columns and
then call ``cleanup_old_versions`` to remove the old files.

Parameters
----------
columns : list of str
The names of the columns to drop. These can be nested column references
(e.g. "a.b.c") or top-level column names (e.g. "a").

This is a metadata-only operation and does not remove the data from the
underlying storage. In order to remove the data, you must subsequently
call ``compact_files`` to rewrite the data without the removed columns and
then call ``cleanup_old_versions`` to remove the old files.

Examples
--------
>>> import lance
Expand Down
Loading
Loading