Skip to content

Commit

Permalink
[Runtimes] Support loading nuclio functions from external code entry (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sahare92 committed May 24, 2021
1 parent 6a61959 commit bb70cad
Show file tree
Hide file tree
Showing 6 changed files with 271 additions and 1 deletion.
1 change: 1 addition & 0 deletions mlrun/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
"dbpath": "", # db/api url
# url to nuclio dashboard api (can be with user & token, e.g. https://username:password@dashboard-url.com)
"nuclio_dashboard_url": "",
"default_nuclio_runtime": "python:3.7",
"nest_asyncio_enabled": "", # enable import of nest_asyncio for corner cases with old jupyter, set "1"
"ui_url": "", # remote/external mlrun UI url (for hyperlinks) (This is deprecated in favor of the ui block)
"remote_host": "",
Expand Down
1 change: 1 addition & 0 deletions mlrun/datastore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from ..utils import logger
from .base import DataItem
from .datastore import StoreManager, in_memory_store, uri_to_ipython
from .s3 import parse_s3_bucket_and_key
from .store_resources import get_store_resource, get_store_uri, is_store_uri

store_manager = StoreManager()
Expand Down
15 changes: 15 additions & 0 deletions mlrun/datastore/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import boto3
import fsspec

import mlrun.errors

from .base import DataStore, FileStats, get_range


Expand Down Expand Up @@ -87,3 +89,16 @@ def listdir(self, key):
key_length = len(key)
bucket = self.s3.Bucket(self.endpoint)
return [obj.key[key_length:] for obj in bucket.objects.filter(Prefix=key)]


def parse_s3_bucket_and_key(s3_path):
try:
path_parts = s3_path.replace("s3://", "").split("/")
bucket = path_parts.pop(0)
key = "/".join(path_parts)
except Exception as exc:
raise mlrun.errors.MLRunInvalidArgumentError(
f"failed to parse s3 bucket and key. {exc}"
)

return bucket, key
1 change: 1 addition & 0 deletions mlrun/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ def __init__(
):
self.functionSourceCode = functionSourceCode #: functionSourceCode
self.codeEntryType = "" #: codeEntryType
self.codeEntryAttributes = "" #: codeEntryAttributes
self.source = source #: source
self.code_origin = code_origin #: code_origin
self.image = image #: image
Expand Down
154 changes: 153 additions & 1 deletion mlrun/runtimes/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import json
import typing
from datetime import datetime
from os import environ
from os import environ, getenv
from time import sleep

import nuclio
Expand All @@ -26,6 +26,8 @@
from nuclio.deploy import deploy_config, find_dashboard_url, get_deploy_status
from nuclio.triggers import V3IOStreamTrigger

import mlrun.errors
from mlrun.datastore import parse_s3_bucket_and_key
from mlrun.db import RunDBError

from ..config import config as mlconf
Expand Down Expand Up @@ -167,6 +169,113 @@ def add_trigger(self, name, spec):
self.spec.config[f"spec.triggers.{name}"] = spec
return self

def with_source_archive(
self, source, handler="", runtime="", secrets=None,
):
"""Load nuclio function from remote source
:param source: a full path to the nuclio function source (code entry) to load the function from
:param handler: a path to the function's handler, including path inside archive/git repo
:param runtime: (optional) the runtime of the function (defaults to python:3.7)
:param secrets: a dictionary of secrets to be used to fetch the function from the source.
(can also be passed using env vars). options:
["V3IO_ACCESS_KEY",
"GIT_USERNAME",
"GIT_PASSWORD",
"AWS_ACCESS_KEY_ID",
"AWS_SECRET_ACCESS_KEY",
"AWS_SESSION_TOKEN"]
Examples::
git:
("git://github.com/org/repo#my-branch",
handler="path/inside/repo#main:handler",
secrets={"GIT_PASSWORD": "my-access-token"})
s3:
("s3://my-bucket/path/in/bucket/my-functions-archive",
handler="path/inside/functions/archive#main:Handler",
runtime="golang",
secrets={"AWS_ACCESS_KEY_ID": "some-id", "AWS_SECRET_ACCESS_KEY": "some-secret"})
"""
code_entry_type = self._resolve_code_entry_type(source)
if code_entry_type == "":
raise mlrun.errors.MLRunInvalidArgumentError(
"Couldn't resolve code entry type from source"
)

code_entry_attributes = {}

# resolve work_dir and handler
work_dir, handler = self._resolve_work_dir_and_handler(handler)
if work_dir != "":
code_entry_attributes["workDir"] = work_dir

if secrets is None:
secrets = {}

# set default runtime if not specified otherwise
if runtime == "":
runtime = mlrun.config.config.default_nuclio_runtime

# archive
if code_entry_type == "archive":
if source.startswith("v3io"):
source = f"http{source[len('v3io'):]}"

v3io_access_key = secrets.get(
"V3IO_ACCESS_KEY", getenv("V3IO_ACCESS_KEY", "")
)
if v3io_access_key:
code_entry_attributes["headers"] = {
"headers": {"X-V3io-Session-Key": v3io_access_key}
}

# s3
if code_entry_type == "s3":
bucket, item_key = parse_s3_bucket_and_key(source)

code_entry_attributes["s3Bucket"] = bucket
code_entry_attributes["s3ItemKey"] = item_key

code_entry_attributes["s3AccessKeyId"] = secrets.get(
"AWS_ACCESS_KEY_ID", getenv("AWS_ACCESS_KEY_ID", "")
)
code_entry_attributes["s3SecretAccessKey"] = secrets.get(
"AWS_SECRET_ACCESS_KEY", getenv("AWS_SECRET_ACCESS_KEY", "")
)
code_entry_attributes["s3SessionToken"] = secrets.get(
"AWS_SESSION_TOKEN", getenv("AWS_SESSION_TOKEN", "")
)

# git
if code_entry_type == "git":

# change git:// to https:// as nuclio expects it to be
if source.startswith("git://"):
source = source.replace("git://", "https://")

source, reference = self._resolve_git_reference_from_source(source)
if reference:
code_entry_attributes["reference"] = reference

code_entry_attributes["username"] = secrets.get("GIT_USERNAME", "")
code_entry_attributes["password"] = secrets.get(
"GIT_PASSWORD", getenv("GITHUB_TOKEN", "")
)

# update handler in function_handler
self.spec.function_handler = handler

# populate spec with relevant fields
config = nuclio.config.new_config()
update_in(config, "spec.handler", handler)
update_in(config, "spec.runtime", runtime)
update_in(config, "spec.build.path", source)
update_in(config, "spec.build.codeEntryType", code_entry_type)
update_in(config, "spec.build.codeEntryAttributes", code_entry_attributes)
self.spec.base_spec = config

return self

def with_v3io(self, local="", remote=""):
for key in ["V3IO_FRAMESD", "V3IO_USERNAME", "V3IO_ACCESS_KEY", "V3IO_API"]:
if key in environ:
Expand Down Expand Up @@ -353,6 +462,49 @@ def _get_state(
raise ValueError("function or deploy process not found")
return self.status.state, text, last_log_timestamp

@staticmethod
def _resolve_git_reference_from_source(source):
split_source = source.split("#")

# no reference was passed
if len(split_source) != 2:
return source

reference = split_source[1]
if reference.startswith("refs"):
return split_source, reference

return split_source[0], f"refs/heads/{reference}"

def _resolve_work_dir_and_handler(self, handler):
"""
Resolves a nuclio function working dir and handler inside an archive/git repo
:param handler: a path describing working dir and handler of a nuclio function
:return: (wokring_dir, handler) tuple, as nuclio expects to get it
Example: ("a/b/c#main:Handler) -> ("a/b/c", "main:Handler")
"""
if handler == "":
return "", self.spec.function_handler or "main:handler"

split_handler = handler.split("#")
if len(split_handler) == 1:
return "", handler

return "/".join(split_handler[:-1]), split_handler[-1]

@staticmethod
def _resolve_code_entry_type(source):
if source.startswith("s3://"):
return "s3"
if source.startswith("git://"):
return "git"

for archive_prefix in ["http://", "https://", "v3io://", "v3ios://"]:
if source.startswith(archive_prefix):
return "archive"
return ""

def _get_runtime_env(self):
# for runtime specific env var enrichment (before deploy)
runtime_env = {
Expand Down
100 changes: 100 additions & 0 deletions tests/api/runtimes/test_nuclio.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,3 +235,103 @@ def test_deploy_with_v3io(self, db: Session, client: TestClient):
deploy_nuclio_function(function)
self._assert_deploy_called_basic_config()
self._assert_nuclio_v3io_mount(local_path, remote_path)

def test_load_function_with_source_archive_git(self):
fn = self._generate_runtime("nuclio")
fn.with_source_archive(
"git://github.com/org/repo#my-branch",
handler="path/inside/repo#main:handler",
secrets={"GIT_PASSWORD": "my-access-token"},
)

assert fn.spec.base_spec == {
"apiVersion": "nuclio.io/v1",
"kind": "Function",
"metadata": {"name": "notebook", "labels": {}, "annotations": {}},
"spec": {
"runtime": "python:3.7",
"handler": "main:handler",
"env": [],
"volumes": [],
"build": {
"commands": [],
"noBaseImagesPull": True,
"path": "https://github.com/org/repo",
"codeEntryType": "git",
"codeEntryAttributes": {
"workDir": "path/inside/repo",
"reference": "refs/heads/my-branch",
"username": "",
"password": "my-access-token",
},
},
},
}

def test_load_function_with_source_archive_s3(self):
fn = self._generate_runtime("nuclio")
fn.with_source_archive(
"s3://my-bucket/path/in/bucket/my-functions-archive",
handler="path/inside/functions/archive#main:Handler",
runtime="golang",
secrets={
"AWS_ACCESS_KEY_ID": "some-id",
"AWS_SECRET_ACCESS_KEY": "some-secret",
},
)

assert fn.spec.base_spec == {
"apiVersion": "nuclio.io/v1",
"kind": "Function",
"metadata": {"name": "notebook", "labels": {}, "annotations": {}},
"spec": {
"runtime": "golang",
"handler": "main:Handler",
"env": [],
"volumes": [],
"build": {
"commands": [],
"noBaseImagesPull": True,
"path": "s3://my-bucket/path/in/bucket/my-functions-archive",
"codeEntryType": "s3",
"codeEntryAttributes": {
"workDir": "path/inside/functions/archive",
"s3Bucket": "my-bucket",
"s3ItemKey": "path/in/bucket/my-functions-archive",
"s3AccessKeyId": "some-id",
"s3SecretAccessKey": "some-secret",
"s3SessionToken": "",
},
},
},
}

def test_load_function_with_source_archive_v3io(self):
fn = self._generate_runtime("nuclio")
fn.with_source_archive(
"v3ios://host.com/container/my-functions-archive.zip",
handler="path/inside/functions/archive#main:handler",
secrets={"V3IO_ACCESS_KEY": "ma-access-key"},
)

assert fn.spec.base_spec == {
"apiVersion": "nuclio.io/v1",
"kind": "Function",
"metadata": {"name": "notebook", "labels": {}, "annotations": {}},
"spec": {
"runtime": "python:3.7",
"handler": "main:handler",
"env": [],
"volumes": [],
"build": {
"commands": [],
"noBaseImagesPull": True,
"path": "https://host.com/container/my-functions-archive.zip",
"codeEntryType": "archive",
"codeEntryAttributes": {
"workDir": "path/inside/functions/archive",
"headers": {"headers": {"X-V3io-Session-Key": "ma-access-key"}},
},
},
},
}

0 comments on commit bb70cad

Please sign in to comment.