Skip to content

Commit

Permalink
[Feature Store] Make verbose configurable (#1109)
Browse files Browse the repository at this point in the history
(cherry picked from commit ffec067)
  • Loading branch information
yaronha authored and Hedingber committed Jul 12, 2021
1 parent c4ce9f9 commit 7c8efc6
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 7 deletions.
10 changes: 8 additions & 2 deletions mlrun/feature_store/api.py
Expand Up @@ -337,6 +337,7 @@ def preview(
timestamp_key=None,
namespace=None,
options: InferOptions = None,
verbose=False,
) -> pd.DataFrame:
"""run the ingestion pipeline with local DataFrame/file data and infer features schema and stats
Expand All @@ -358,6 +359,7 @@ def preview(
:param timestamp_key: timestamp column name
:param namespace: namespace or module containing graph classes
:param options: schema and stats infer options (:py:class:`~mlrun.feature_store.InferOptions`)
:param verbose: verbose log
"""
options = options if options is not None else InferOptions.default()
if timestamp_key is not None:
Expand All @@ -384,7 +386,9 @@ def preview(
entity_columns,
InferOptions.get_common_options(options, InferOptions.Entities),
)
source = init_featureset_graph(source, featureset, namespace, return_df=True)
source = init_featureset_graph(
source, featureset, namespace, return_df=True, verbose=verbose
)

df = infer_from_static_df(source, featureset, entity_columns, options)
return df
Expand Down Expand Up @@ -415,6 +419,7 @@ def deploy_ingestion_service(
targets: List[DataTargetBase] = None,
name: str = None,
run_config: RunConfig = None,
verbose=False,
):
"""Start real-time ingestion service using nuclio function
Expand All @@ -433,6 +438,7 @@ def deploy_ingestion_service(
:param targets: list of data target objects
:param name: name name for the job/function
:param run_config: service runtime configuration (function object/uri, resources, etc..)
:param verbose: verbose log
"""
if isinstance(featureset, str):
featureset = get_feature_set_by_uri(featureset)
Expand Down Expand Up @@ -465,7 +471,7 @@ def deploy_ingestion_service(
function.spec.graph_initializer = (
"mlrun.feature_store.ingestion.featureset_initializer"
)
function.verbose = True
function.verbose = function.verbose or verbose
if run_config.local:
return function.to_mock_server(namespace=get_caller_globals())
return function.deploy()
Expand Down
4 changes: 2 additions & 2 deletions mlrun/feature_store/common.py
Expand Up @@ -67,7 +67,7 @@ def get_feature_set_by_uri(uri, project=None):
default_project = project or config.default_project

# parse store://.. uri
if mlrun.datastore.is_store_uri():
if mlrun.datastore.is_store_uri(uri):
prefix, new_uri = mlrun.datastore.parse_store_uri(uri)
if prefix != StorePrefix.FeatureSet:
raise mlrun.errors.MLRunInvalidArgumentError(
Expand All @@ -85,7 +85,7 @@ def get_feature_vector_by_uri(uri, project=None):
default_project = project or config.default_project

# parse store://.. uri
if mlrun.datastore.is_store_uri():
if mlrun.datastore.is_store_uri(uri):
prefix, new_uri = mlrun.datastore.parse_store_uri(uri)
if prefix != StorePrefix.FeatureVector:
raise mlrun.errors.MLRunInvalidArgumentError(
Expand Down
7 changes: 4 additions & 3 deletions mlrun/feature_store/ingestion.py
Expand Up @@ -32,7 +32,7 @@


def init_featureset_graph(
source, featureset, namespace, targets=None, return_df=True,
source, featureset, namespace, targets=None, return_df=True, verbose=False
):
"""create storey ingestion graph/DAG from feature set object"""

Expand All @@ -41,7 +41,7 @@ def init_featureset_graph(

# init targets (and table)
targets = targets or []
server = create_graph_server(graph=graph, parameters={}, verbose=True)
server = create_graph_server(graph=graph, parameters={}, verbose=verbose)
server.init_states(context=None, namespace=namespace, resource_cache=cache)

if graph.engine != "sync":
Expand Down Expand Up @@ -71,7 +71,8 @@ def init_featureset_graph(
target = get_target_driver(target, featureset)
size = target.write_dataframe(data)
target_status = target.update_resource_status("ready", size=size)
logger.info(f"wrote target: {target_status}")
if verbose:
logger.info(f"wrote target: {target_status}")

return data

Expand Down

0 comments on commit 7c8efc6

Please sign in to comment.