Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: upgrade to lance 0.9.11 and expose merge_insert #906

Merged
merged 5 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/src/python/python.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ pip install lancedb

::: lancedb.schema.vector

::: lancedb.merge.LanceMergeInsertBuilder

## Integrations

### Pydantic
Expand Down
86 changes: 86 additions & 0 deletions python/lancedb/merge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Copyright 2023 LanceDB Developers
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

from typing import TYPE_CHECKING, Iterable, Optional

if TYPE_CHECKING:
from .common import DATA


class LanceMergeInsertBuilder(object):
"""Builder for a LanceDB merge insert operation

See [`merge_insert`][lancedb.table.Table.merge_insert] for
more context
"""

def __init__(self, table: "Table", on: Iterable[str]): # noqa: F821
# Do not put a docstring here. This method should be hidden
# from API docs. Users should use merge_insert to create
# this object.
self._table = table
self._on = on
self._when_matched_update_all = False
self._when_not_matched_insert_all = False
self._when_not_matched_by_source_delete = False
self._when_not_matched_by_source_condition = None

def when_matched_update_all(self) -> LanceMergeInsertBuilder:
"""
Rows that exist in both the source table (new data) and
the target table (old data) will be updated, replacing
the old row with the corresponding matching row.

If there are multiple matches then the behavior is undefined.
Currently this causes multiple copies of the row to be created
but that behavior is subject to change.
"""
self._when_matched_update_all = True
return self

def when_not_matched_insert_all(self) -> LanceMergeInsertBuilder:
"""
Rows that exist only in the source table (new data) should
be inserted into the target table.
"""
self._when_not_matched_insert_all = True
return self

def when_not_matched_by_source_delete(
self, condition: Optional[str] = None
) -> LanceMergeInsertBuilder:
"""
Rows that exist only in the target table (old data) will be
deleted. An optional condition can be provided to limit what
data is deleted.

Parameters
----------
condition: Optional[str], default None
If None then all such rows will be deleted. Otherwise the
condition will be used as an SQL filter to limit what rows
are deleted.
"""
self._when_not_matched_by_source_delete = True
if condition is not None:
self._when_not_matched_by_source_condition = condition
return self

def execute(self, new_data: DATA):
"""
Executes the merge insert operation

Nothing is returned but the [`Table`][lancedb.table.Table] is updated
"""
self._table._do_merge(self, new_data)
4 changes: 4 additions & 0 deletions python/lancedb/remote/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ def _execute_query(self, query: Query) -> pa.Table:
result = self._conn._client.query(self._name, query)
return result.to_arrow()

def _do_merge(self, *_args):
"""_do_merge() is not supported on the LanceDB cloud yet"""
return NotImplementedError("_do_merge() is not supported on the LanceDB cloud")

def delete(self, predicate: str):
"""Delete rows from the table.

Expand Down
81 changes: 81 additions & 0 deletions python/lancedb/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

from .common import DATA, VEC, VECTOR_COLUMN_NAME
from .embeddings import EmbeddingFunctionConfig, EmbeddingFunctionRegistry
from .merge import LanceMergeInsertBuilder
from .pydantic import LanceModel, model_to_dict
from .query import LanceQueryBuilder, Query
from .util import (
Expand Down Expand Up @@ -334,6 +335,64 @@ def add(
"""
raise NotImplementedError

def merge_insert(self, on: Union[str, Iterable[str]]) -> LanceMergeInsertBuilder:
"""
Returns a [`LanceMergeInsertBuilder`][lancedb.merge.LanceMergeInsertBuilder]
that can be used to create a "merge insert" operation

This operation can add rows, update rows, and remove rows all in a single
transaction. It is a very generic tool that can be used to create
behaviors like "insert if not exists", "update or insert (i.e. upsert)",
or even replace a portion of existing data with new data (e.g. replace
all data where month="january")

The merge insert operation works by combining new data from a
**source table** with existing data in a **target table** by using a
join. There are three categories of records.

"Matched" records are records that exist in both the source table and
the target table. "Not matched" records exist only in the source table
(e.g. these are new data) "Not matched by source" records exist only
in the target table (this is old data)

The builder returned by this method can be used to customize what
should happen for each category of data.

Please note that the data may appear to be reordered as part of this
operation. This is because updated rows will be deleted from the
dataset and then reinserted at the end with the new values.

Parameters
----------

on: Union[str, Iterable[str]]
A column (or columns) to join on. This is how records from the
source table and target table are matched. Typically this is some
kind of key or id column.

Examples
--------
>>> import lancedb
>>> data = pa.table({"a": [2, 1, 3], "b": ["a", "b", "c"]})
>>> db = lancedb.connect("./.lancedb")
>>> table = db.create_table("my_table", data)
>>> new_data = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform a "upsert" operation
>>> table.merge_insert("a") \\
... .when_matched_update_all() \\
... .when_not_matched_insert_all() \\
... .execute(new_data)
>>> # The order of new rows is non-deterministic since we use
>>> # a hash-join as part of this operation and so we sort here
>>> table.to_arrow().sort_by("a").to_pandas()
a b
0 1 b
1 2 x
2 3 y
3 4 z
"""
return LanceMergeInsertBuilder(self, on)

@abstractmethod
def search(
self,
Expand Down Expand Up @@ -414,6 +473,16 @@ def search(
def _execute_query(self, query: Query) -> pa.Table:
pass

@abstractmethod
def _do_merge(
self,
merge: LanceMergeInsertBuilder,
new_data: DATA,
*,
schema: Optional[pa.Schema] = None,
):
pass

@abstractmethod
def delete(self, where: str):
"""Delete rows from the table.
Expand Down Expand Up @@ -1196,6 +1265,18 @@ def _execute_query(self, query: Query) -> pa.Table:
with_row_id=query.with_row_id,
)

def _do_merge(self, merge: LanceMergeInsertBuilder, new_data: DATA, *, schema=None):
ds = self.to_lance()
builder = ds.merge_insert(merge._on)
if merge._when_matched_update_all:
builder.when_matched_update_all()
if merge._when_not_matched_insert_all:
builder.when_not_matched_insert_all()
if merge._when_not_matched_by_source_delete:
cond = merge._when_not_matched_by_source_condition
builder.when_not_matched_by_source_delete(cond)
builder.execute(new_data, schema=schema)

def cleanup_old_versions(
self,
older_than: Optional[timedelta] = None,
Expand Down
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "lancedb"
version = "0.5.1"
dependencies = [
"deprecation",
"pylance==0.9.10",
"pylance==0.9.11",
"ratelimiter~=1.0",
"retry>=0.9.2",
"tqdm>=4.27.0",
Expand Down
56 changes: 56 additions & 0 deletions python/tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,62 @@ def test_update_types(db):
assert actual == expected


def test_merge_insert(db):
table = LanceTable.create(
db,
"my_table",
data=pa.table({"a": [1, 2, 3], "b": ["a", "b", "c"]}),
)
assert len(table) == 3
version = table.version

new_data = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})

# upsert
table.merge_insert(
"a"
).when_matched_update_all().when_not_matched_insert_all().execute(new_data)

expected = pa.table({"a": [1, 2, 3, 4], "b": ["a", "x", "y", "z"]})
# These `sort_by` calls can be removed once lance#1892
# is merged (it fixes the ordering)
assert table.to_arrow().sort_by("a") == expected

table.restore(version)

# insert-if-not-exists
table.merge_insert("a").when_not_matched_insert_all().execute(new_data)

expected = pa.table({"a": [1, 2, 3, 4], "b": ["a", "b", "c", "z"]})
assert table.to_arrow().sort_by("a") == expected

table.restore(version)

new_data = pa.table({"a": [2, 4], "b": ["x", "z"]})

# replace-range
table.merge_insert(
"a"
).when_matched_update_all().when_not_matched_insert_all().when_not_matched_by_source_delete(
"a > 2"
).execute(new_data)

expected = pa.table({"a": [1, 2, 4], "b": ["a", "x", "z"]})
assert table.to_arrow().sort_by("a") == expected

table.restore(version)

# replace-range no condition
table.merge_insert(
"a"
).when_matched_update_all().when_not_matched_insert_all().when_not_matched_by_source_delete().execute(
new_data
)

expected = pa.table({"a": [2, 4], "b": ["x", "z"]})
assert table.to_arrow().sort_by("a") == expected


def test_create_with_embedding_function(db):
class MyTable(LanceModel):
text: str
Expand Down