Skip to content

Commit

Permalink
[Runtimes] Fix "pass" mode, add a way to pass task params into comman…
Browse files Browse the repository at this point in the history
…d line, enhance Mpijob runtime (#992)
  • Loading branch information
yaronha committed Jun 16, 2021
1 parent 85977b9 commit 5bce6e6
Show file tree
Hide file tree
Showing 15 changed files with 341 additions and 164 deletions.
112 changes: 73 additions & 39 deletions docs/runtimes/code-archive.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -16,50 +16,66 @@
"* Zip/Tar archives e.g. https://github.com/mlrun/mlrun/raw/run-cli/examples/archive.zip\n",
"* File share e.g. `/User/mycode` (this require adding a file share to the function) \n",
"\n",
"The archive will be set as the working dir for the function and the file to execute should be set using the `command` parameter (with the relative path inside the archive)"
"The archive will be set as the working dir for the function and the file/params to execute should be set using the `command` parameter (with the relative path inside the archive)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Examples:\n",
"\n",
"**Using the CLI:**"
"#### Run from zip using the CLI"
]
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 2,
"metadata": {},
"outputs": [],
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"> 2021-06-15 11:52:45,847 [warning] Failed resolving version info. Ignoring and using defaults\n",
"> 2021-06-15 11:52:48,460 [warning] Unable to parse server or client version. Assuming compatible: {'server_version': '0.6.4', 'client_version': 'unstable'}\n",
"> 2021-06-15 11:52:48,469 [info] starting run tst1 uid=ce4a3eab42ff43e0885d82ec27762949 DB=http://mlrun-api:8080\n",
"> 2021-06-15 11:52:48,612 [info] Job is running in the background, pod: tst1-6zpsv\n",
"> 2021-06-15 11:52:51,885 [info] extracting source from https://github.com/mlrun/mlrun/raw/development/examples/archive.zip to /mlrun/code\n",
"Run: tst1 (uid=ce4a3eab42ff43e0885d82ec27762949)\n",
"my line\n",
"got text: some text\n",
"> 2021-06-15 11:52:51,957 [info] run executed, status=completed\n",
"final state: completed\n",
"> 2021-06-15 11:52:54,715 [info] run executed, status=completed\n"
]
}
],
"source": [
"!python -m mlrun run --name tst1 --watch --source https://github.com/mlrun/mlrun/development/examples/archive.zip --handler handler --image mlrun/mlrun myfunc.py"
"!python -m mlrun run --name tst1 --watch --source https://github.com/mlrun/mlrun/raw/development/examples/archive.zip --handler handler --image mlrun/mlrun myfunc.py"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Using code from Git:**"
"#### Using code from Git"
]
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"> 2021-04-27 15:21:51,071 [info] starting run archive-my_func uid=29e426cba2774884af0f92f0070e7abd DB=http://mlrun-api:8080\n",
"> 2021-04-27 15:21:51,301 [info] Job is running in the background, pod: archive-my-func-59wlc\n",
"extracting source from git://github.com/mlrun/ci-demo.git#master to ./ (/mlrun/dump)\n",
"Run: archive-my_func (uid=29e426cba2774884af0f92f0070e7abd)\n",
"> 2021-06-15 11:58:59,002 [info] starting run archive-my_func uid=0b6195fbf1844880a829d61505bd9a38 DB=http://mlrun-api:8080\n",
"> 2021-06-15 11:58:59,468 [info] Job is running in the background, pod: archive-my-func-8frkp\n",
"> 2021-06-15 11:59:02,726 [info] extracting source from git://github.com/mlrun/ci-demo.git#master to /mlrun/code\n",
"Run: archive-my_func (uid=0b6195fbf1844880a829d61505bd9a38)\n",
"Params: p1=1, p2=a-string\n",
"> 2021-04-27 15:21:57,520 [info] running function\n",
"> 2021-04-27 15:21:57,591 [info] run executed, status=completed\n",
"> 2021-06-15 11:59:02,764 [info] running function\n",
"> 2021-06-15 11:59:02,797 [info] run executed, status=completed\n",
"final state: completed\n"
]
},
Expand Down Expand Up @@ -234,12 +250,12 @@
" <tbody>\n",
" <tr>\n",
" <td>default</td>\n",
" <td><div title=\"29e426cba2774884af0f92f0070e7abd\"><a href=\"https://dashboard.default-tenant.app.yh30.iguazio-c0.com/mlprojects/default/jobs/monitor/29e426cba2774884af0f92f0070e7abd/overview\" target=\"_blank\" >...070e7abd</a></div></td>\n",
" <td><div title=\"0b6195fbf1844880a829d61505bd9a38\"><a href=\"https://dashboard.default-tenant.app.yh35.iguazio-cd0.com/mlprojects/default/jobs/monitor/0b6195fbf1844880a829d61505bd9a38/overview\" target=\"_blank\" >...05bd9a38</a></div></td>\n",
" <td>0</td>\n",
" <td>Apr 27 15:21:57</td>\n",
" <td>Jun 15 11:59:02</td>\n",
" <td>completed</td>\n",
" <td>archive-my_func</td>\n",
" <td><div class=\"dictlist\">v3io_user=admin</div><div class=\"dictlist\">kind=job</div><div class=\"dictlist\">owner=admin</div><div class=\"dictlist\">host=archive-my-func-59wlc</div><div class=\"dictlist\">framework=sklearn</div></td>\n",
" <td><div class=\"dictlist\">v3io_user=admin</div><div class=\"dictlist\">kind=job</div><div class=\"dictlist\">owner=admin</div><div class=\"dictlist\">host=archive-my-func-8frkp</div><div class=\"dictlist\">framework=sklearn</div></td>\n",
" <td></td>\n",
" <td></td>\n",
" <td><div class=\"dictlist\">accuracy=2</div><div class=\"dictlist\">loss=3</div></td>\n",
Expand All @@ -248,12 +264,12 @@
" </tbody>\n",
"</table>\n",
"</div></div>\n",
" <div id=\"result4d0186fb-pane\" class=\"right-pane block hidden\">\n",
" <div id=\"resultbd29ada1-pane\" class=\"right-pane block hidden\">\n",
" <div class=\"pane-header\">\n",
" <span id=\"result4d0186fb-title\" class=\"pane-header-title\">Title</span>\n",
" <span onclick=\"closePanel(this)\" paneName=\"result4d0186fb\" class=\"close clickable\">&times;</span>\n",
" <span id=\"resultbd29ada1-title\" class=\"pane-header-title\">Title</span>\n",
" <span onclick=\"closePanel(this)\" paneName=\"resultbd29ada1\" class=\"close clickable\">&times;</span>\n",
" </div>\n",
" <iframe class=\"fileview\" id=\"result4d0186fb-body\"></iframe>\n",
" <iframe class=\"fileview\" id=\"resultbd29ada1-body\"></iframe>\n",
" </div>\n",
"</div>\n"
],
Expand All @@ -269,34 +285,24 @@
"output_type": "stream",
"text": [
"to track results use .show() or .logs() or in CLI: \n",
"!mlrun get run 29e426cba2774884af0f92f0070e7abd --project default , !mlrun logs 29e426cba2774884af0f92f0070e7abd --project default\n",
"> 2021-04-27 15:22:00,541 [info] run executed, status=completed\n"
"!mlrun get run 0b6195fbf1844880a829d61505bd9a38 --project default , !mlrun logs 0b6195fbf1844880a829d61505bd9a38 --project default\n",
"> 2021-06-15 11:59:05,633 [info] run executed, status=completed\n"
]
},
{
"data": {
"text/plain": [
"<mlrun.model.RunObject at 0x7ff2436a71d0>"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import mlrun\n",
"from mlrun.platforms import auto_mount\n",
"fn = mlrun.new_function('archive', kind='job', image='mlrun/mlrun', command='./myfunc.py')\n",
"fn.from_source_archive('git://github.com/mlrun/ci-demo.git#master')\n",
"fn = mlrun.new_function('archive', kind='job', image='mlrun/mlrun', command='./myfunc.py',\n",
" source='git://github.com/mlrun/ci-demo.git#master')\n",
"run = fn.run(handler='my_func')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Using code from file share:**"
"#### Using code from file share"
]
},
{
Expand Down Expand Up @@ -541,13 +547,41 @@
],
"source": [
"import mlrun\n",
"fn = mlrun.new_function('archive', kind='job', image='mlrun/mlrun', command='./code.py')\n",
"fn = mlrun.new_function('archive', kind='job', image='mlrun/mlrun', command='./code.py',\n",
" source='/User/sample2')\n",
"# add shared volume mount so the function will have access to the mounted code\n",
"fn.apply(auto_mount())\n",
"fn.from_source_archive('/User/sample2')\n",
"run = fn.run(handler='handler')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Inject parameters into command line\n",
"\n",
"the function `command` parameter is the command we execute inside the container (the archive path will be set as the working directory), users can pass arguments in the command and also inject the job/task parameters into the command at runtime (by using `{}` around the parameter).\n",
"\n",
"for example we can define a function with command template and pass a parameter during the run:\n",
"\n",
"```python\n",
"fn = mlrun.new_function('withargs', kind='job', image='mlrun/mlrun', command=\"main.py --myarg {myarg}\",\n",
" source='git://github.com/org/repo')\n",
"run = fn.run(params={'myarg': 'xx'})\n",
"```\n",
"\n",
"\n",
"#### Execute non Python code\n",
"\n",
"By default MLRun will try and execute Python code, users can run any other code by specifying the **Pass** (passthrough) mode (`mode=\"pass\"`), in the `pass` mode the `command` is used as is, for example:\n",
"\n",
"```python\n",
"fn = mlrun.new_function('withargs', kind='job', image='mlrun/mlrun', command=\"bash main.sh --myarg {myarg}\",\n",
" source='git://github.com/org/repo', mode='pass')\n",
"run = fn.run(params={'myarg': 'xx'})\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down
23 changes: 17 additions & 6 deletions docs/runtimes/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use in your projects.

**Functions** (function objects) can be created by using any of the following methods:

- **{py:func}`~mlrun.run.new_function`** - creates a function "from scratch" or from another function or from code repository/archive.
- **{py:func}`~mlrun.code_to_function`** - creates a function from local or remote source code (single file) or from a notebook (code will be embedded in the function object).
- **{py:func}`~mlrun.run.new_function`** - creates a function for local run or from container, from code repository/archive, from function spec.
- **{py:func}`~mlrun.code_to_function`** - creates a function from local or remote source code (single file) or from a notebook (code file will be embedded in the function object).
- **{py:func}`~mlrun.import_function`** - imports a function from a local or remote YAML function-configuration file or
from a function object in the MLRun database (using a DB address of the format `db://<project>/<name>[:<tag>]`)
or from the function marketplace (e.g. `hub://describe`).
Expand Down Expand Up @@ -86,23 +86,34 @@ the build configuration using the {py:meth}`~mlrun.runtimes.KubejobRuntime.build
fn = mlrun.new_function('archive', kind='job', command='./myfunc.py')
fn.build_config(base_image='mlrun/mlrun', source='git://github.com/org/repo.git#master',
commands=[pip install pandas])
# deploy (build the container with the extra build commands/packages)
fn.deploy()

# run the function (specify the function handler to execute)
run_results = fn.run(handler='my_func', params={"x": 100})

The `command='./myfunc.py'` specifies the command we execute in the function container/workdir, by default we call python
with the specified command, you can specify `mode="pass"` to execute the command as is (e.g. for binary code), you can
template (`{..}`) in the command to pass the task parameters as arguments for the execution command (e.g. `mycode.py --x {xparam}` will
substitute the `{xparam}` with the value of the `xparam` parameter)

when doing iterative development with multiple code files and packages the 3rd option is the most efficient, we want
to make small code changes and re-run our job without building containers etc.

the `local`, `job` and `remote-spark` runtimes support dynamic load from archive or file shares (other runtimes will
the `local`, `job`, `mpijob` and `remote-spark` runtimes support dynamic load from archive or file shares (other runtimes will
be added later), this is enabled by setting the `spec.build.source=<archive>` and `spec.build.load_source_on_run=True`
or simply by using the {py:meth}`~mlrun.runtimes.KubejobRuntime.with_source_archive` method. in the CLI we use the `--source` flag.
or simply by setting the `source` attribute in `new_function`). in the CLI we use the `--source` flag.

fn = mlrun.new_function('archive', kind='job', image='mlrun/mlrun', command='./myfunc.py')
fn.with_source_archive('git://github.com/mlrun/ci-demo.git#master')
fn = mlrun.new_function('archive', kind='job', image='mlrun/mlrun', command='./myfunc.py',
source='git://github.com/mlrun/ci-demo.git#master')
run_results = fn.run(handler='my_func', params={"x": 100})

see more details and examples on [**running jobs with code from Archives or shares**](./code-archive.ipynb)

For executing non-python code, set `mode="pass"` (passthrough) and specify the full execution `command`, e.g.:

new_function(... command="bash main.sh --myarg xx", mode="pass")

## Submitting Tasks/Jobs To Functions

MLRun batch Function objects support a {py:meth}`~mlrun.runtimes.BaseRuntime.run` method for invoking a job over them, the run method
Expand Down
65 changes: 42 additions & 23 deletions mlrun/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.

import json
import pathlib
import traceback
from ast import literal_eval
from base64 import b64decode, b64encode
Expand Down Expand Up @@ -117,7 +118,7 @@ def main():
@click.option(
"--handler", default="", help="invoke function handler inside the code file"
)
@click.option("--mode", help="special run mode noctx | pass")
@click.option("--mode", help="special run mode ('pass' for using the command as is)")
@click.option("--schedule", help="cron schedule")
@click.option("--from-env", is_flag=True, help="read the spec from the env var")
@click.option("--dump", is_flag=True, help="dump run results as YAML")
Expand Down Expand Up @@ -206,6 +207,17 @@ def run(
if db:
mlconf.dbpath = db

# remove potential quotes from command
eval_url = py_eval(url)
url = eval_url if isinstance(eval_url, str) else url
url_file = url
url_args = ""
if url:
split = url.split(maxsplit=1)
url_file = split[0]
if len(split) > 1:
url_args = split[1]

if func_url or kind or image:
if func_url:
runtime = func_url_to_runtime(func_url)
Expand All @@ -217,14 +229,14 @@ def run(
runtime = {"kind": kind, "spec": {"image": image}}

if kind not in ["", "local", "dask"] and url:
if path.isfile(url) and url.endswith(".py"):
with open(url) as fp:
if url_file and path.isfile(url_file):
with open(url_file) as fp:
body = fp.read()
based = b64encode(body.encode("utf-8")).decode("utf-8")
logger.info(f"packing code at {url}")
logger.info(f"packing code at {url_file}")
update_in(runtime, "spec.build.functionSourceCode", based)
url = ""
update_in(runtime, "spec.command", "")
url = f"main{pathlib.Path(url_file).suffix} {url_args}"
update_in(runtime, "spec.build.code_origin", url_file)
elif runtime:
runtime = py_eval(runtime)
if not isinstance(runtime, dict):
Expand All @@ -240,9 +252,30 @@ def run(
code = b64decode(code).decode("utf-8")
if kfp:
print(f"code:\n{code}\n")
with open("main.py", "w") as fp:
suffix = pathlib.Path(url_file).suffix if url else ".py"
if suffix != ".py" and mode != "pass" and url_file != "{codefile}":
print(
f"command/url ({url}) must specify a .py file when not in 'pass' mode"
)
exit(1)
if mode == "pass":
if "{codefile}" in url:
url_file = "codefile"
url = url.replace("{codefile}", url_file)
elif suffix == ".sh":
url_file = "codefile.sh"
url = f"bash {url_file} {url_args}".strip()
else:
print(
"error, command must be specified with '{codefile}' in it "
"(to determine the position of the code file)"
)
exit(1)
else:
url_file = "main.py"
url = f"{url_file} {url_args}".strip()
with open(url_file, "w") as fp:
fp.write(code)
url = url or "main.py"

if url:
if not name and not runtime:
Expand Down Expand Up @@ -284,25 +317,11 @@ def run(
pprint(runobj.to_dict())

try:
fn = new_function(runtime=runtime, kfp=kfp, mode=mode)
fn = new_function(runtime=runtime, kfp=kfp, mode=mode, source=source)
if workdir:
fn.spec.workdir = workdir
if auto_mount:
fn.apply(auto_mount_modifier())
if source:
supported_runtimes = [
"",
"local",
RuntimeKinds.job,
RuntimeKinds.remotespark,
]
if fn.kind not in supported_runtimes:
print(
f"source flag only works with the {','.join(supported_runtimes)} runtimes"
)
exit(1)
fn.spec.build.source = source
fn.spec.build.load_source_on_run = True
fn.is_child = from_env and not kfp
resp = fn.run(runobj, watch=watch, schedule=schedule, local=local)
if resp and dump:
Expand Down
4 changes: 2 additions & 2 deletions mlrun/kfpops.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ def mlrun_op(
:param in_path: default input path/url (prefix) for inputs
:param out_path: default output path/url (prefix) for artifacts
:param rundb: path for rundb (or use 'MLRUN_DBPATH' env instead)
:param mode: run mode, e.g. 'noctx' for pushing params as args
:param handler code entry-point/handler name
:param mode: run mode, e.g. 'pass' for using the command without mlrun wrapper
:param handler code entry-point/hanfler name
:param job_image name of the image user for the job
:param verbose: add verbose prints/logs
:param scrape_metrics: whether to add the `mlrun/scrape-metrics` label to this run's resources
Expand Down

0 comments on commit 5bce6e6

Please sign in to comment.