diff --git a/mlrun/feature_store/api.py b/mlrun/feature_store/api.py index eb63cbbad4e..ca894c989ab 100644 --- a/mlrun/feature_store/api.py +++ b/mlrun/feature_store/api.py @@ -337,6 +337,7 @@ def preview( timestamp_key=None, namespace=None, options: InferOptions = None, + verbose=False, ) -> pd.DataFrame: """run the ingestion pipeline with local DataFrame/file data and infer features schema and stats @@ -358,6 +359,7 @@ def preview( :param timestamp_key: timestamp column name :param namespace: namespace or module containing graph classes :param options: schema and stats infer options (:py:class:`~mlrun.feature_store.InferOptions`) + :param verbose: verbose log """ options = options if options is not None else InferOptions.default() if timestamp_key is not None: @@ -384,7 +386,9 @@ def preview( entity_columns, InferOptions.get_common_options(options, InferOptions.Entities), ) - source = init_featureset_graph(source, featureset, namespace, return_df=True) + source = init_featureset_graph( + source, featureset, namespace, return_df=True, verbose=verbose + ) df = infer_from_static_df(source, featureset, entity_columns, options) return df @@ -415,6 +419,7 @@ def deploy_ingestion_service( targets: List[DataTargetBase] = None, name: str = None, run_config: RunConfig = None, + verbose=False, ): """Start real-time ingestion service using nuclio function @@ -433,6 +438,7 @@ def deploy_ingestion_service( :param targets: list of data target objects :param name: name name for the job/function :param run_config: service runtime configuration (function object/uri, resources, etc..) + :param verbose: verbose log """ if isinstance(featureset, str): featureset = get_feature_set_by_uri(featureset) @@ -465,7 +471,7 @@ def deploy_ingestion_service( function.spec.graph_initializer = ( "mlrun.feature_store.ingestion.featureset_initializer" ) - function.verbose = True + function.verbose = function.verbose or verbose if run_config.local: return function.to_mock_server(namespace=get_caller_globals()) return function.deploy() diff --git a/mlrun/feature_store/common.py b/mlrun/feature_store/common.py index 0e47f0412ac..600f9a2766d 100644 --- a/mlrun/feature_store/common.py +++ b/mlrun/feature_store/common.py @@ -67,7 +67,7 @@ def get_feature_set_by_uri(uri, project=None): default_project = project or config.default_project # parse store://.. uri - if mlrun.datastore.is_store_uri(): + if mlrun.datastore.is_store_uri(uri): prefix, new_uri = mlrun.datastore.parse_store_uri(uri) if prefix != StorePrefix.FeatureSet: raise mlrun.errors.MLRunInvalidArgumentError( @@ -85,7 +85,7 @@ def get_feature_vector_by_uri(uri, project=None): default_project = project or config.default_project # parse store://.. uri - if mlrun.datastore.is_store_uri(): + if mlrun.datastore.is_store_uri(uri): prefix, new_uri = mlrun.datastore.parse_store_uri(uri) if prefix != StorePrefix.FeatureVector: raise mlrun.errors.MLRunInvalidArgumentError( diff --git a/mlrun/feature_store/ingestion.py b/mlrun/feature_store/ingestion.py index 051f390e520..71828f0eaa0 100644 --- a/mlrun/feature_store/ingestion.py +++ b/mlrun/feature_store/ingestion.py @@ -32,7 +32,7 @@ def init_featureset_graph( - source, featureset, namespace, targets=None, return_df=True, + source, featureset, namespace, targets=None, return_df=True, verbose=False ): """create storey ingestion graph/DAG from feature set object""" @@ -41,7 +41,7 @@ def init_featureset_graph( # init targets (and table) targets = targets or [] - server = create_graph_server(graph=graph, parameters={}, verbose=True) + server = create_graph_server(graph=graph, parameters={}, verbose=verbose) server.init_states(context=None, namespace=namespace, resource_cache=cache) if graph.engine != "sync": @@ -71,7 +71,8 @@ def init_featureset_graph( target = get_target_driver(target, featureset) size = target.write_dataframe(data) target_status = target.update_resource_status("ready", size=size) - logger.info(f"wrote target: {target_status}") + if verbose: + logger.info(f"wrote target: {target_status}") return data