Skip to content

Commit

Permalink
[Feature Store] Saving overwrite value for scheduled ingest (#1262)
Browse files Browse the repository at this point in the history
  • Loading branch information
katyakats committed Aug 29, 2021
1 parent 37c2075 commit 4b8d0f9
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 6 deletions.
14 changes: 10 additions & 4 deletions mlrun/feature_store/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ def ingest(
# remote job execution
run_config = run_config.copy() if run_config else RunConfig()
source, run_config.parameters = set_task_params(
featureset, source, targets, run_config.parameters, infer_options
featureset, source, targets, run_config.parameters, infer_options, overwrite
)
name = f"{featureset.metadata.name}_ingest"
return run_ingestion_job(
Expand All @@ -268,9 +268,13 @@ def ingest(
raise mlrun.errors.MLRunInvalidArgumentError(
"cannot specify mlrun_context with feature set or source"
)
featureset, source, targets, infer_options = context_to_ingestion_params(
mlrun_context
)
(
featureset,
source,
targets,
infer_options,
overwrite,
) = context_to_ingestion_params(mlrun_context)
if not source:
raise mlrun.errors.MLRunInvalidArgumentError(
"data source was not specified"
Expand Down Expand Up @@ -628,11 +632,13 @@ def set_task_params(
targets: List[DataTargetBase] = None,
parameters: dict = None,
infer_options: InferOptions = InferOptions.Null,
overwrite=None,
):
"""convert ingestion parameters to dict, return source + params dict"""
source = source or featureset.spec.source
parameters = parameters or {}
parameters["infer_options"] = infer_options
parameters["overwrite"] = overwrite
parameters["featureset"] = featureset.uri
if source:
parameters["source"] = source.to_dict()
Expand Down
3 changes: 2 additions & 1 deletion mlrun/feature_store/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,13 @@ def context_to_ingestion_params(context):
source = get_source_from_dict(source)
elif featureset.spec.source.to_dict():
source = get_source_from_dict(featureset.spec.source.to_dict())
overwrite = context.get_param("overwrite", None)

targets = context.get_param("targets", None)
if not targets:
targets = featureset.spec.targets
targets = [get_target_driver(target, featureset) for target in targets]
return featureset, source, targets, infer_options
return featureset, source, targets, infer_options, overwrite


def _add_data_steps(
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,4 @@ fsspec~=0.9.0
v3iofs~=0.1.7
# 3.4 and above failed builidng in some images - see https://github.com/pyca/cryptography/issues/5771
cryptography~=3.3.2
storey~=0.7.4; python_version >= '3.7'
storey~=0.7.5; python_version >= '3.7'
46 changes: 46 additions & 0 deletions tests/system/feature_store/test_feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -1057,6 +1057,52 @@ def test_schedule_on_filtered_by_time(self, partitioned):
assert len(resp.to_dataframe() == 4)
assert "uri" not in resp.to_dataframe() and "katya" not in resp.to_dataframe()

def test_overwrite_single_file(self):
data = pd.DataFrame(
{
"time": [
pd.Timestamp("2021-01-10 10:00:00"),
pd.Timestamp("2021-01-10 11:00:00"),
],
"first_name": ["moshe", "yosi"],
"data": [2000, 10],
}
)
# writing down a remote source
target2 = ParquetTarget()
data_set = fs.FeatureSet("data", entities=[Entity("first_name")])
fs.ingest(data_set, data, targets=[target2])

path = data_set.status.targets[0].path

# the job will be scheduled every minute
cron_trigger = "*/1 * * * *"

source = ParquetSource("myparquet", schedule=cron_trigger, path=path)

feature_set = fs.FeatureSet(
name="overwrite", entities=[fs.Entity("first_name")], timestamp_key="time",
)

targets = [ParquetTarget(path="v3io:///bigdata/bla.parquet")]

fs.ingest(
feature_set,
source,
overwrite=True,
run_config=fs.RunConfig(local=False).apply(mlrun.mount_v3io()),
targets=targets,
)
sleep(60)
features = ["overwrite.*"]
vec = fs.FeatureVector("svec", features)

# check offline
resp = fs.get_offline_features(vec)
assert len(resp.to_dataframe() == 2)

sleep(30)

@pytest.mark.parametrize(
"fixed_window_type",
[FixedWindowType.CurrentOpenWindow, FixedWindowType.LastClosedWindow],
Expand Down

0 comments on commit 4b8d0f9

Please sign in to comment.