diff --git a/mlrun/datastore/sources.py b/mlrun/datastore/sources.py index 88e84f59d65..0299c9b5c53 100644 --- a/mlrun/datastore/sources.py +++ b/mlrun/datastore/sources.py @@ -55,7 +55,7 @@ def to_step( if start_time or end_time: raise NotImplementedError("BaseSource does not support filtering by time") - return storey.Source() + return storey.SyncEmitSource() def get_table_object(self): """get storey Table object""" @@ -102,7 +102,7 @@ def to_step( raise NotImplementedError("CSVSource does not support filtering by time") attributes = self.attributes or {} - return storey.ReadCSV( + return storey.CSVSource( paths=self.path, header=True, build_dict=True, @@ -143,7 +143,7 @@ def to_step( import storey attributes = self.attributes or {} - return storey.ReadParquet( + return storey.ParquetSource( paths=self.path, key_field=self.key_field or key_field, time_field=self.time_field or time_field, @@ -248,7 +248,7 @@ def to_step( if start_time or end_time: raise NotImplementedError("Source does not support filtering by time") - return storey.Source( + return storey.SyncEmitSource( key_field=self.key_field or key_field, time_field=self.time_field or time_field, full_event=True, diff --git a/mlrun/datastore/targets.py b/mlrun/datastore/targets.py index 38653774525..3270b9c91f8 100644 --- a/mlrun/datastore/targets.py +++ b/mlrun/datastore/targets.py @@ -390,10 +390,10 @@ def add_writer_state( ) graph.add_step( - name=self.name or "WriteToParquet", + name=self.name or "ParquetTarget", after=after, graph_shape="cylinder", - class_name="storey.WriteToParquet", + class_name="storey.ParquetTarget", path=self._target_path, columns=column_list, index_cols=key_columns, @@ -434,10 +434,10 @@ def add_writer_state( ) graph.add_step( - name=self.name or "WriteToCSV", + name=self.name or "CSVTarget", after=after, graph_shape="cylinder", - class_name="storey.WriteToCSV", + class_name="storey.CSVTarget", path=self._target_path, columns=column_list, header=True, @@ -489,10 +489,10 @@ def add_writer_state( column_list = [col for col in column_list if col in aggregate_features] graph.add_step( - name=self.name or "WriteToTable", + name=self.name or "NoSqlTarget", after=after, graph_shape="cylinder", - class_name="storey.WriteToTable", + class_name="storey.NoSqlTarget", columns=column_list, table=table, **self.attributes, @@ -546,10 +546,10 @@ def add_writer_state( ) graph.add_step( - name=self.name or "WriteToStream", + name=self.name or "StreamTarget", after=after, graph_shape="cylinder", - class_name="storey.WriteToV3IOStream", + class_name="storey.StreamTarget", columns=column_list, storage=V3ioDriver(webapi=endpoint), stream_path=uri, @@ -581,8 +581,8 @@ def add_writer_state( ) graph.add_step( - name=self.name or "WriteToTSDB", - class_name="storey.WriteToTSDB", + name=self.name or "TSDBTarget", + class_name="storey.TSDBTarget", after=after, graph_shape="cylinder", path=uri, diff --git a/mlrun/feature_store/retrieval/online.py b/mlrun/feature_store/retrieval/online.py index 242fffd98ea..22f9570372c 100644 --- a/mlrun/feature_store/retrieval/online.py +++ b/mlrun/feature_store/retrieval/online.py @@ -58,13 +58,13 @@ def _build_feature_vector_graph( def init_feature_vector_graph(vector): try: - from storey import Source + from storey import SyncEmitSource except ImportError as exc: raise ImportError(f"storey not installed, use pip install storey, {exc}") feature_set_objects, feature_set_fields = vector.parse_features(False) graph = _build_feature_vector_graph(vector, feature_set_fields, feature_set_objects) - graph.set_flow_source(Source()) + graph.set_flow_source(SyncEmitSource()) server = create_graph_server(graph=graph, parameters={}) cache = ResourceCache() diff --git a/mlrun/serving/states.py b/mlrun/serving/states.py index 46be2177e7c..31dfcd5597a 100644 --- a/mlrun/serving/states.py +++ b/mlrun/serving/states.py @@ -869,7 +869,7 @@ def process_step(state, step, root): if hasattr(state, "async_object"): if state.kind == StateKinds.queue: if state.path: - state._async_object = storey.WriteToV3IOStream( + state._async_object = storey.StreamTarget( storey.V3ioDriver(), state.path ) else: @@ -891,7 +891,7 @@ def process_step(state, step, root): self._wait_for_result = True # todo: allow source array (e.g. data->json loads..) - source = self._source or storey.Source() + source = self._source or storey.SyncEmitSource() for next_state in self._start_states: next_step = source.to(next_state.async_object) process_step(next_state, next_step, self) diff --git a/requirements.txt b/requirements.txt index b0eaace63b8..6f686962ee2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -51,4 +51,4 @@ fsspec~=0.9.0 v3iofs~=0.1.5 # 3.4 and above failed builidng in some images - see https://github.com/pyca/cryptography/issues/5771 cryptography~=3.3.2 -storey~=0.4.5; python_version >= '3.7' +storey~=0.4.6; python_version >= '3.7' diff --git a/tests/system/feature_store/test_feature_store.py b/tests/system/feature_store/test_feature_store.py index 390cc429702..c6634bc7720 100644 --- a/tests/system/feature_store/test_feature_store.py +++ b/tests/system/feature_store/test_feature_store.py @@ -342,7 +342,7 @@ def test_right_not_ordered_pandas_asof_merge(self): assert res.shape[0] == left.shape[0] def test_read_csv(self): - from storey import ReadCSV, ReduceToDataFrame, build_flow + from storey import CSVSource, ReduceToDataFrame, build_flow csv_path = str(self.results_path / _generate_random_name() / ".csv") targets = [CSVTarget("mycsv", path=csv_path)] @@ -354,7 +354,7 @@ def test_read_csv(self): ) # reading csv file - controller = build_flow([ReadCSV(csv_path), ReduceToDataFrame()]).run() + controller = build_flow([CSVSource(csv_path), ReduceToDataFrame()]).run() termination_result = controller.await_termination() expected = pd.DataFrame(