Skip to content

Commit

Permalink
[Feature Store] Fix remote ingest parquet filters (#5631)
Browse files Browse the repository at this point in the history
  • Loading branch information
tomerm-iguazio committed May 30, 2024
1 parent 11e54a8 commit e452539
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 35 deletions.
38 changes: 16 additions & 22 deletions mlrun/datastore/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import math
import operator
import os
import warnings
Expand All @@ -31,6 +30,7 @@
import mlrun
from mlrun.config import config
from mlrun.datastore.snowflake_utils import get_snowflake_spark_options
from mlrun.datastore.utils import transform_list_filters_to_tuple
from mlrun.secrets import SecretsStore

from ..model import DataSource
Expand Down Expand Up @@ -313,12 +313,13 @@ def __init__(
schedule: str = None,
start_time: Optional[Union[datetime, str]] = None,
end_time: Optional[Union[datetime, str]] = None,
additional_filters: Optional[list[tuple]] = None,
additional_filters: Optional[list[Union[tuple, list]]] = None,
):
if additional_filters:
attributes = copy(attributes) or {}
additional_filters = transform_list_filters_to_tuple(additional_filters)
attributes["additional_filters"] = additional_filters
self.validate_additional_filters(additional_filters)

super().__init__(
name,
path,
Expand Down Expand Up @@ -359,25 +360,6 @@ def _convert_to_datetime(time):
else:
return time

@staticmethod
def validate_additional_filters(additional_filters):
if not additional_filters:
return
for filter_tuple in additional_filters:
if not filter_tuple:
continue
col_name, op, value = filter_tuple
if isinstance(value, float) and math.isnan(value):
raise mlrun.errors.MLRunInvalidArgumentError(
"using NaN in additional_filters is not supported"
)
elif isinstance(value, (list, tuple, set)):
for sub_value in value:
if isinstance(sub_value, float) and math.isnan(sub_value):
raise mlrun.errors.MLRunInvalidArgumentError(
"using NaN in additional_filters is not supported"
)

def to_step(
self,
key_field=None,
Expand All @@ -393,6 +375,7 @@ def to_step(
attributes.pop("additional_filters", None)
if context:
attributes["context"] = context
additional_filters = transform_list_filters_to_tuple(additional_filters)
data_item = mlrun.store_manager.object(self.path)
store, path, url = mlrun.store_manager.get_or_create_store(self.path)
return storey.ParquetSource(
Expand All @@ -406,6 +389,16 @@ def to_step(
**attributes,
)

@classmethod
def from_dict(cls, struct=None, fields=None, deprecated_fields: dict = None):
new_obj = super().from_dict(
struct=struct, fields=fields, deprecated_fields=deprecated_fields
)
new_obj.attributes["additional_filters"] = transform_list_filters_to_tuple(
new_obj.additional_filters
)
return new_obj

def get_spark_options(self):
store, path, _ = mlrun.store_manager.get_or_create_store(self.path)
spark_options = store.get_spark_options()
Expand All @@ -428,6 +421,7 @@ def to_dataframe(
additional_filters=None,
):
reader_args = self.attributes.get("reader_args", {})
additional_filters = transform_list_filters_to_tuple(additional_filters)
return mlrun.store_manager.object(url=self.path).as_df(
columns=columns,
df_module=df_module,
Expand Down
3 changes: 2 additions & 1 deletion mlrun/datastore/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import mlrun.utils.helpers
from mlrun.config import config
from mlrun.datastore.snowflake_utils import get_snowflake_spark_options
from mlrun.datastore.utils import transform_list_filters_to_tuple
from mlrun.model import DataSource, DataTarget, DataTargetBase, TargetPathObject
from mlrun.utils import logger, now_date
from mlrun.utils.helpers import to_parquet
Expand Down Expand Up @@ -999,7 +1000,7 @@ def as_df(
start_time=start_time,
end_time=end_time,
time_column=time_column,
additional_filters=additional_filters,
additional_filters=transform_list_filters_to_tuple(additional_filters),
**kwargs,
)
if not columns:
Expand Down
42 changes: 42 additions & 0 deletions mlrun/datastore/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import math
import tarfile
import tempfile
import typing
Expand Down Expand Up @@ -180,3 +181,44 @@ def get_kafka_brokers_from_dict(options: dict, pop=False) -> typing.Optional[str
FutureWarning,
)
return kafka_bootstrap_servers


def transform_list_filters_to_tuple(additional_filters):
tuple_filters = []
if not additional_filters:
return tuple_filters
validate_additional_filters(additional_filters)
for additional_filter in additional_filters:
tuple_filters.append(tuple(additional_filter))
return tuple_filters


def validate_additional_filters(additional_filters):
nan_error_message = "using NaN in additional_filters is not supported"
if additional_filters in [None, [], ()]:
return
for filter_tuple in additional_filters:
if filter_tuple == () or filter_tuple == []:
continue
if not isinstance(filter_tuple, (list, tuple)):
raise mlrun.errors.MLRunInvalidArgumentError(
f"mlrun supports additional_filters only as a list of tuples."
f" Current additional_filters: {additional_filters}"
)
if isinstance(filter_tuple[0], (list, tuple)):
raise mlrun.errors.MLRunInvalidArgumentError(
f"additional_filters does not support nested list inside filter tuples except in -in- logic."
f" Current filter_tuple: {filter_tuple}."
)
if len(filter_tuple) != 3:
raise mlrun.errors.MLRunInvalidArgumentError(
f"illegal filter tuple length, {filter_tuple} in additional filters:"
f" {additional_filters}"
)
col_name, op, value = filter_tuple
if isinstance(value, float) and math.isnan(value):
raise mlrun.errors.MLRunInvalidArgumentError(nan_error_message)
elif isinstance(value, (list, tuple)):
for sub_value in value:
if isinstance(sub_value, float) and math.isnan(sub_value):
raise mlrun.errors.MLRunInvalidArgumentError(nan_error_message)
39 changes: 32 additions & 7 deletions tests/datastore/test_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import pathlib
import re

Expand Down Expand Up @@ -107,14 +108,38 @@ def test_timestamp_format_inference(rundb_mock):


@pytest.mark.parametrize(
"additional_filters",
[[("age", "=", float("nan"))], [("age", "in", [10, float("nan")])]],
"additional_filters, message",
[
([("x", "=", 3)], ""),
(
[[("x", "=", 3), ("x", "=", 4), ("x", "=", 5)]],
"additional_filters does not support nested list inside filter tuples except in -in- logic.",
),
],
)
def test_nan_additional_filters(additional_filters):
with pytest.raises(
mlrun.errors.MLRunInvalidArgumentError,
match="using NaN in additional_filters is not supported",
):
def test_transform_list_filters_to_tuple(additional_filters, message):
back_from_json_serialization = json.loads(json.dumps(additional_filters))

if message:
with pytest.raises(mlrun.errors.MLRunInvalidArgumentError, match=message):
ParquetSource(
"parquet_source",
path="path/to/file",
additional_filters=additional_filters,
)
with pytest.raises(mlrun.errors.MLRunInvalidArgumentError, match=message):
ParquetSource(
"parquet_source",
path="path/to/file",
additional_filters=back_from_json_serialization,
)
else:
ParquetSource(
"parquet_source", path="path/to/file", additional_filters=additional_filters
)
parquet_source = ParquetSource(
"parquet_source",
path="path/to/file",
additional_filters=back_from_json_serialization,
)
assert parquet_source.additional_filters == additional_filters
18 changes: 18 additions & 0 deletions tests/datastore/test_targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os

import pandas as pd
Expand Down Expand Up @@ -111,3 +112,20 @@ def test_write_with_too_many_partitions():
match="Maximum number of partitions exceeded. To resolve this.*",
):
parquet_target.write_dataframe(df)


def test_transform_list_filters_to_tuple():
additional_filters = [[("x", "=", 3), ("x", "=", 4), ("x", "=", 5)]]
parquet_target = ParquetTarget("parquet_target", path="path/to/file")
back_from_json_serialization = json.loads(json.dumps(additional_filters))

with pytest.raises(
mlrun.errors.MLRunInvalidArgumentError,
match="additional_filters does not support nested list inside filter tuples except in -in- logic.",
):
parquet_target.as_df(additional_filters=additional_filters)
with pytest.raises(
mlrun.errors.MLRunInvalidArgumentError,
match="additional_filters does not support nested list inside filter tuples except in -in- logic.",
):
parquet_target.as_df(additional_filters=back_from_json_serialization)
66 changes: 66 additions & 0 deletions tests/datastore/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright 2024 Iguazio
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import json

import pytest

import mlrun.datastore
import mlrun.datastore.wasbfs
from mlrun.datastore.utils import transform_list_filters_to_tuple


@pytest.mark.parametrize(
"additional_filters, message",
[
([("x", "=", 3)], ""),
(
[[("x", "=", 3), ("x", "=", 4), ("x", "=", 5)]],
"additional_filters does not support nested list inside filter tuples except in -in- logic.",
),
(
[[("x", "=", 3), ("x", "=", 4)]],
"additional_filters does not support nested list inside filter tuples except in -in- logic.",
),
(("x", "=", 3), "mlrun supports additional_filters only as a list of tuples."),
([("x", "in", [3, 4]), ("y", "in", [3, 4])], ""),
([0], "mlrun supports additional_filters only as a list of tuples."),
(
[("age", "=", float("nan"))],
"using NaN in additional_filters is not supported",
),
(
[("age", "in", [10, float("nan")])],
"using NaN in additional_filters is not supported",
),
([("x", "=", "=", 3), ("y", "in", [3, 4])], "illegal filter tuple length"),
([()], ""),
([], ""),
],
)
def test_transform_list_filters_to_tuple(additional_filters, message):
back_from_json_serialization = json.loads(json.dumps(additional_filters))

if message:
with pytest.raises(mlrun.errors.MLRunInvalidArgumentError, match=message):
transform_list_filters_to_tuple(additional_filters)
with pytest.raises(mlrun.errors.MLRunInvalidArgumentError, match=message):
transform_list_filters_to_tuple(
additional_filters=back_from_json_serialization
)
else:
transform_list_filters_to_tuple(additional_filters)
result = transform_list_filters_to_tuple(back_from_json_serialization)
assert result == additional_filters
17 changes: 14 additions & 3 deletions tests/system/feature_store/test_feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -4818,8 +4818,11 @@ def test_merge_different_number_of_entities(self):
).to_dataframe()
assert_frame_equal(expected_all, df, check_dtype=False)

@pytest.mark.parametrize("local", [True, False])
@pytest.mark.parametrize("engine", ["local", "dask"])
def test_parquet_filters(self, engine):
def test_parquet_filters(self, engine, local):
config_parameters = {} if local else {"image": "mlrun/mlrun"}
run_config = fstore.RunConfig(local=local, **config_parameters)
parquet_path = os.path.relpath(str(self.assets_path / "testdata.parquet"))
df = pd.read_parquet(parquet_path)
filtered_df = df.query('department == "01e9fe31-76de-45f0-9aed-0f94cc97bca0"')
Expand Down Expand Up @@ -4849,8 +4852,10 @@ def test_parquet_filters(self, engine):
partitioned=True,
partition_cols=["department"],
)
feature_set.ingest(source=parquet_source, targets=[target])
result = target.as_df(additional_filters=("room", "=", 1)).reset_index()
feature_set.ingest(
source=parquet_source, targets=[target], run_config=run_config
)
result = target.as_df(additional_filters=[("room", "=", 1)]).reset_index()
# We want to include patient_id in the comparison,
# sort the columns alphabetically, and sort the rows by patient_id values.
result = sort_df(result, "patient_id")
Expand All @@ -4860,12 +4865,18 @@ def test_parquet_filters(self, engine):
vec = fstore.FeatureVector(
name="test-fs-vec", features=["parquet-filters-fs.*"]
)
vec.save()
target = ParquetTarget(
path=f"v3io:///projects/{self.project_name}/get_offline_features_{run_uuid}",
)
result = (
fstore.get_offline_features(
feature_vector=vec,
additional_filters=[("bad", "=", 95)],
with_indexes=True,
engine=engine,
run_config=run_config,
target=target,
)
.to_dataframe()
.reset_index()
Expand Down
4 changes: 2 additions & 2 deletions tests/system/feature_store/test_spark_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ def test_parquet_filters(self):
name="test-fs-vec", features=["parquet-filters-fs.*"]
)
vec.save()
get_offline_target = ParquetTarget(
target = ParquetTarget(
"mytarget", path=f"{self.output_dir()}-get_offline_features"
)
kind = None if self.run_local else "remote-spark"
Expand All @@ -391,7 +391,7 @@ def test_parquet_filters(self):
("movements", "<", 6),
],
with_indexes=True,
target=get_offline_target,
target=target,
engine="spark",
run_config=fstore.RunConfig(local=self.run_local, kind=kind),
spark_service=self.spark_service,
Expand Down

0 comments on commit e452539

Please sign in to comment.