Skip to content

Commit

Permalink
[API] Fix bugs when using feature vectors and in real time ingestion (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
yaronha committed Jun 30, 2021
1 parent 2297e22 commit 6612bb4
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 8 deletions.
15 changes: 11 additions & 4 deletions mlrun/execution.py
Expand Up @@ -233,7 +233,14 @@ def get_meta(self):

@classmethod
def from_dict(
cls, attrs: dict, rundb="", autocommit=False, tmp="", host=None, log_stream=None
cls,
attrs: dict,
rundb="",
autocommit=False,
tmp="",
host=None,
log_stream=None,
is_api=False,
):
"""create execution context from dict"""

Expand Down Expand Up @@ -269,15 +276,15 @@ def from_dict(

self._init_dbs(rundb)

if spec:
# init data related objects (require DB & Secrets to be set first)
if spec and not is_api:
# init data related objects (require DB & Secrets to be set first), skip when running in the api service
self._data_stores.from_dict(spec)
if inputs and isinstance(inputs, dict):
for k, v in inputs.items():
if v:
self._set_input(k, v)

if host:
if host and not is_api:
self.set_label("host", host)

start = get_in(attrs, "status.start_time")
Expand Down
2 changes: 1 addition & 1 deletion mlrun/feature_store/api.py
Expand Up @@ -433,7 +433,7 @@ def deploy_ingestion_service(
featureset, source, targets, run_config.parameters
)

name = name or f"{featureset.metadata.name}_ingest"
name = name or f"{featureset.metadata.name}-ingest"
if not run_config.function:
function_ref = featureset.spec.function.copy()
if function_ref.is_empty():
Expand Down
4 changes: 3 additions & 1 deletion mlrun/runtimes/base.py
Expand Up @@ -435,7 +435,9 @@ def run(
"warning!, Api url not set, " "trying to exec remote runtime locally"
)

execution = MLClientCtx.from_dict(runspec.to_dict(), db, autocommit=False)
execution = MLClientCtx.from_dict(
runspec.to_dict(), db, autocommit=False, is_api=self._is_api_server
)
self._pre_run(runspec, execution) # hook for runtime specific prep

# create task generator (for child runs) from spec
Expand Down
18 changes: 16 additions & 2 deletions mlrun/runtimes/function.py
Expand Up @@ -371,6 +371,8 @@ def add_vault_config_to_spec(self):
def deploy(
self, dashboard="", project="", tag="", verbose=False,
):
# todo: verify that the function name is normalized

verbose = verbose or self.verbose
if verbose:
self.set_env("MLRUN_LOG_LEVEL", "DEBUG")
Expand Down Expand Up @@ -738,6 +740,11 @@ def deploy_nuclio_function(function: RemoteRuntime, dashboard="", watch=False):
spec.set_config("spec.resources", function.spec.resources)
if function.spec.no_cache:
spec.set_config("spec.build.noCache", True)
if function.spec.build.functionSourceCode:
spec.set_config(
"spec.build.functionSourceCode", function.spec.build.functionSourceCode
)

if function.spec.replicas:
spec.set_config("spec.minReplicas", function.spec.replicas)
spec.set_config("spec.maxReplicas", function.spec.replicas)
Expand All @@ -746,9 +753,16 @@ def deploy_nuclio_function(function: RemoteRuntime, dashboard="", watch=False):
spec.set_config("spec.maxReplicas", function.spec.max_replicas)

dashboard = dashboard or mlconf.nuclio_dashboard_url
if function.spec.base_spec:
if function.spec.base_spec or function.spec.build.functionSourceCode:
config = function.spec.base_spec
if not config:
# if base_spec was not set (when not using code_to_function) and we have base64 code
# we create the base spec with essential attributes
config = nuclio.config.new_config()
update_in(config, "spec.handler", handler or "main:handler")

config = nuclio.config.extend_config(
function.spec.base_spec, spec, tag, function.spec.build.code_origin
config, spec, tag, function.spec.build.code_origin
)
update_in(config, "metadata.name", function.metadata.name)
update_in(config, "spec.volumes", function.spec.generate_nuclio_volumes())
Expand Down

0 comments on commit 6612bb4

Please sign in to comment.