Skip to content

Commit

Permalink
[Feature store] Support purging targets (#1013)
Browse files Browse the repository at this point in the history
  • Loading branch information
urihoenig committed Jun 14, 2021
1 parent da048d1 commit 617ff28
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 3 deletions.
3 changes: 3 additions & 0 deletions mlrun/datastore/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,9 @@ def to_dict(self):
"options": self.options,
}

def rm(self, path, recursive=False, maxdepth=None):
self.get_filesystem().rm(path=path, recursive=recursive, maxdepth=maxdepth)


def _drop_reserved_columns(df):
cols_to_drop = []
Expand Down
9 changes: 6 additions & 3 deletions mlrun/datastore/targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ class BaseStoreTarget(DataTargetBase):

def __init__(
self,
name: str = kind,
name: str = "",
path=None,
attributes: typing.Dict[str, str] = None,
after_step=None,
Expand All @@ -284,7 +284,7 @@ def __init__(
)
after_step = after_step or after_state

self.name = name
self.name = name or self.kind
self.path = str(path) if path is not None else None
self.after_step = after_step
self.attributes = attributes or {}
Expand Down Expand Up @@ -446,6 +446,9 @@ def add_writer_state(
"""add storey writer state to graph"""
self.add_writer_step(graph, after, features, key_columns, timestamp_key)

def purge(self):
self._get_store().rm(self._target_path, recursive=True)

def as_df(
self,
columns=None,
Expand Down Expand Up @@ -477,7 +480,7 @@ class ParquetTarget(BaseStoreTarget):

def __init__(
self,
name: str = kind,
name: str = "",
path=None,
attributes: typing.Dict[str, str] = None,
after_step=None,
Expand Down
28 changes: 28 additions & 0 deletions mlrun/datastore/v3io.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,31 @@ def listdir(self, key):

# todo: full = key, size, last_modified
return [obj.key[subpath_length:] for obj in response.output.contents]

def rm(self, path, recursive=False, maxdepth=None):
""" Recursive rm file/folder
Workaround for v3io-fs not supporting recursive directory removal """

fs = self.get_filesystem()
if isinstance(path, str):
path = [path]
maxdepth = maxdepth if not maxdepth else maxdepth - 1
to_rm = set()
path = [fs._strip_protocol(p) for p in path]
for p in path:
if recursive:
find_out = fs.find(p, maxdepth=maxdepth, withdirs=True, detail=True)
rec = set(
sorted(
[
f["name"] + ("/" if f["type"] == "directory" else "")
for f in find_out.values()
]
)
)
to_rm |= rec
if p not in to_rm and (recursive is False or fs.exists(p)):
p = p + ("/" if fs.isdir(p) else "")
to_rm.add(p)
for p in reversed(list(sorted(to_rm))):
fs.rm_file(p)
6 changes: 6 additions & 0 deletions mlrun/feature_store/feature_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
TargetTypes,
default_target_names,
get_offline_target,
get_target_driver,
validate_target_list,
validate_target_placement,
)
Expand Down Expand Up @@ -343,6 +344,11 @@ def set_targets(
if default_final_step:
self.spec.graph.final_step = default_final_step

def purge(self):
for target in self.spec.targets:
driver = get_target_driver(target_spec=target, resource=self)
driver.purge()

def has_valid_source(self):
"""check if object's spec has a valid (non empty) source definition"""
source = self.spec.source
Expand Down
32 changes: 32 additions & 0 deletions tests/system/feature_store/test_feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
ParquetTarget,
TargetTypes,
get_default_prefix_for_target,
get_target_driver,
)
from mlrun.feature_store import Entity, FeatureSet
from mlrun.feature_store.feature_set import aggregates_step
Expand Down Expand Up @@ -869,6 +870,37 @@ def test_featureset_uri(self):
stocks_set.save()
fs.ingest(stocks_set.uri, stocks)

def test_purge(self):
key = "patient_id"
fset = fs.FeatureSet("purge", entities=[Entity(key)], timestamp_key="timestamp")
path = os.path.relpath(str(self.assets_path / "testdata.csv"))
source = CSVSource("mycsv", path=path, time_field="timestamp",)
fset.set_targets(
targets=[
CSVTarget(),
ParquetTarget(partitioned=True, partition_cols=["timestamp"]),
NoSqlTarget(),
],
with_defaults=False,
)
fs.ingest(fset, source)

verify_purge(fset)


def verify_purge(fset):
for target in fset.spec.targets:
driver = get_target_driver(target_spec=target, resource=fset)
filesystem = driver._get_store().get_filesystem(False)
assert filesystem.exists(driver._target_path)

fset.purge()

for target in fset.spec.targets:
driver = get_target_driver(target_spec=target, resource=fset)
filesystem = driver._get_store().get_filesystem(False)
assert not filesystem.exists(driver._target_path)


def verify_target_list_fail(targets, with_defaults=None):
feature_set = fs.FeatureSet(name="target-list-fail", entities=[fs.Entity("ticker")])
Expand Down

0 comments on commit 617ff28

Please sign in to comment.