Skip to content

Commit

Permalink
[Feature Store] Align feature store to renaming in storey api (#891)
Browse files Browse the repository at this point in the history
  • Loading branch information
katyakats committed Apr 27, 2021
1 parent 70117e5 commit 44d5d5f
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 21 deletions.
8 changes: 4 additions & 4 deletions mlrun/datastore/sources.py
Expand Up @@ -55,7 +55,7 @@ def to_step(
if start_time or end_time:
raise NotImplementedError("BaseSource does not support filtering by time")

return storey.Source()
return storey.SyncEmitSource()

def get_table_object(self):
"""get storey Table object"""
Expand Down Expand Up @@ -102,7 +102,7 @@ def to_step(
raise NotImplementedError("CSVSource does not support filtering by time")

attributes = self.attributes or {}
return storey.ReadCSV(
return storey.CSVSource(
paths=self.path,
header=True,
build_dict=True,
Expand Down Expand Up @@ -143,7 +143,7 @@ def to_step(
import storey

attributes = self.attributes or {}
return storey.ReadParquet(
return storey.ParquetSource(
paths=self.path,
key_field=self.key_field or key_field,
time_field=self.time_field or time_field,
Expand Down Expand Up @@ -248,7 +248,7 @@ def to_step(
if start_time or end_time:
raise NotImplementedError("Source does not support filtering by time")

return storey.Source(
return storey.SyncEmitSource(
key_field=self.key_field or key_field,
time_field=self.time_field or time_field,
full_event=True,
Expand Down
20 changes: 10 additions & 10 deletions mlrun/datastore/targets.py
Expand Up @@ -390,10 +390,10 @@ def add_writer_state(
)

graph.add_step(
name=self.name or "WriteToParquet",
name=self.name or "ParquetTarget",
after=after,
graph_shape="cylinder",
class_name="storey.WriteToParquet",
class_name="storey.ParquetTarget",
path=self._target_path,
columns=column_list,
index_cols=key_columns,
Expand Down Expand Up @@ -434,10 +434,10 @@ def add_writer_state(
)

graph.add_step(
name=self.name or "WriteToCSV",
name=self.name or "CSVTarget",
after=after,
graph_shape="cylinder",
class_name="storey.WriteToCSV",
class_name="storey.CSVTarget",
path=self._target_path,
columns=column_list,
header=True,
Expand Down Expand Up @@ -489,10 +489,10 @@ def add_writer_state(
column_list = [col for col in column_list if col in aggregate_features]

graph.add_step(
name=self.name or "WriteToTable",
name=self.name or "NoSqlTarget",
after=after,
graph_shape="cylinder",
class_name="storey.WriteToTable",
class_name="storey.NoSqlTarget",
columns=column_list,
table=table,
**self.attributes,
Expand Down Expand Up @@ -546,10 +546,10 @@ def add_writer_state(
)

graph.add_step(
name=self.name or "WriteToStream",
name=self.name or "StreamTarget",
after=after,
graph_shape="cylinder",
class_name="storey.WriteToV3IOStream",
class_name="storey.StreamTarget",
columns=column_list,
storage=V3ioDriver(webapi=endpoint),
stream_path=uri,
Expand Down Expand Up @@ -581,8 +581,8 @@ def add_writer_state(
)

graph.add_step(
name=self.name or "WriteToTSDB",
class_name="storey.WriteToTSDB",
name=self.name or "TSDBTarget",
class_name="storey.TSDBTarget",
after=after,
graph_shape="cylinder",
path=uri,
Expand Down
4 changes: 2 additions & 2 deletions mlrun/feature_store/retrieval/online.py
Expand Up @@ -58,13 +58,13 @@ def _build_feature_vector_graph(

def init_feature_vector_graph(vector):
try:
from storey import Source
from storey import SyncEmitSource
except ImportError as exc:
raise ImportError(f"storey not installed, use pip install storey, {exc}")

feature_set_objects, feature_set_fields = vector.parse_features(False)
graph = _build_feature_vector_graph(vector, feature_set_fields, feature_set_objects)
graph.set_flow_source(Source())
graph.set_flow_source(SyncEmitSource())
server = create_graph_server(graph=graph, parameters={})

cache = ResourceCache()
Expand Down
4 changes: 2 additions & 2 deletions mlrun/serving/states.py
Expand Up @@ -869,7 +869,7 @@ def process_step(state, step, root):
if hasattr(state, "async_object"):
if state.kind == StateKinds.queue:
if state.path:
state._async_object = storey.WriteToV3IOStream(
state._async_object = storey.StreamTarget(
storey.V3ioDriver(), state.path
)
else:
Expand All @@ -891,7 +891,7 @@ def process_step(state, step, root):
self._wait_for_result = True

# todo: allow source array (e.g. data->json loads..)
source = self._source or storey.Source()
source = self._source or storey.SyncEmitSource()
for next_state in self._start_states:
next_step = source.to(next_state.async_object)
process_step(next_state, next_step, self)
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Expand Up @@ -51,4 +51,4 @@ fsspec~=0.9.0
v3iofs~=0.1.5
# 3.4 and above failed builidng in some images - see https://github.com/pyca/cryptography/issues/5771
cryptography~=3.3.2
storey~=0.4.5; python_version >= '3.7'
storey~=0.4.6; python_version >= '3.7'
4 changes: 2 additions & 2 deletions tests/system/feature_store/test_feature_store.py
Expand Up @@ -342,7 +342,7 @@ def test_right_not_ordered_pandas_asof_merge(self):
assert res.shape[0] == left.shape[0]

def test_read_csv(self):
from storey import ReadCSV, ReduceToDataFrame, build_flow
from storey import CSVSource, ReduceToDataFrame, build_flow

csv_path = str(self.results_path / _generate_random_name() / ".csv")
targets = [CSVTarget("mycsv", path=csv_path)]
Expand All @@ -354,7 +354,7 @@ def test_read_csv(self):
)

# reading csv file
controller = build_flow([ReadCSV(csv_path), ReduceToDataFrame()]).run()
controller = build_flow([CSVSource(csv_path), ReduceToDataFrame()]).run()
termination_result = controller.await_termination()

expected = pd.DataFrame(
Expand Down

0 comments on commit 44d5d5f

Please sign in to comment.