diff --git a/docs/runtimes/functions.md b/docs/runtimes/functions.md index af1ab0a370c..70acd1b1a22 100644 --- a/docs/runtimes/functions.md +++ b/docs/runtimes/functions.md @@ -24,8 +24,8 @@ use in your projects. **Functions** (function objects) can be created by using any of the following methods: - **{py:func}`~mlrun.run.new_function`** - creates a function for local run or from container, from code repository/archive, from function spec. -- **{py:func}`~mlrun.code_to_function`** - creates a function from local or remote source code (single file) or from a notebook (code file will be embedded in the function object). -- **{py:func}`~mlrun.import_function`** - imports a function from a local or remote YAML function-configuration file or +- **{py:func}`~mlrun.run.code_to_function`** - creates a function from local or remote source code (single file) or from a notebook (code file will be embedded in the function object). +- **{py:func}`~mlrun.run.import_function`** - imports a function from a local or remote YAML function-configuration file or from a function object in the MLRun database (using a DB address of the format `db:///[:]`) or from the function marketplace (e.g. `hub://describe`). @@ -142,7 +142,7 @@ Run object has the following methods/properties: - `outputs` — returns a dictionary of the run results and artifact paths. - `logs(watch=True)` — returns the latest logs. Use `Watch=False` to disable the interactive mode in running jobs. -- `artifact(key)` — returns full artifact details for the provided key. +- `artifact(key)` — returns artifact for the provided key (as {py:class}`~mlrun.datastore.DataItem` object). - `output(key)` — returns a specific result or an artifact path for the provided key. - `wait_for_completion()` — wait for async run to complete - `refresh()` — refresh run state from the db/service diff --git a/docs/store/artifacts.md b/docs/store/artifacts.md index 7ec36a9642b..5f3b3097803 100644 --- a/docs/store/artifacts.md +++ b/docs/store/artifacts.md @@ -90,7 +90,7 @@ they host common and object specific metadata such as: * type specific attributes Artifacts can be obtained via the SDK through type specific APIs or using generic artifact APIs such as: -* {py:func}`~mlrun.run.get_data_item` - get the {py:class}`~mlrun.datastore.DataItem` object for reading/downloading the artifact content +* {py:func}`~mlrun.run.get_dataitem` - get the {py:class}`~mlrun.datastore.DataItem` object for reading/downloading the artifact content * {py:func}`~mlrun.datastore.get_store_resource` - get the artifact object example artifact URLs: @@ -162,13 +162,15 @@ get_data_run = run_local(name='get_data', artifact_path=artifact_path) ``` -The dataset location is returned in the `outputs` field, therefore you can get the location by calling `get_data_run.outputs['iris_dataset']` and use the `get_dataitem` function to get the dataset itself. +The dataset location is returned in the `outputs` field, therefore you can get the location by calling `get_data_run.artifact('iris_dataset')` to get the dataset itself. ``` python # Read your data set -from mlrun.run import get_dataitem -dataset = get_dataitem(get_data_run.outputs['iris_dataset']) +get_data_run.artifact('iris_dataset').as_df() + +# Visualize an artifact in Jupyter (image, html, df, ..) +get_data_run.artifact('confusion-matrix').show() ``` Call `dataset.meta.stats` to obtain the data statistics. You can also get the data as a Pandas Dataframe by calling the `dataset.as_df()`. diff --git a/docs/store/datastore.md b/docs/store/datastore.md index 3135e285752..af856ce4cae 100644 --- a/docs/store/datastore.md +++ b/docs/store/datastore.md @@ -56,19 +56,28 @@ Note that in order to call our function with an `input` we used the `inputs` dic a simple parameter we used the `params` dictionary attribute. the input value is the specific item uri (per data store schema) as explained above. +Reading the data results from our run: +we can easily get a run output artifact as a `DataItem` (allowing us to view/use the artifact) using: + +```python +# read the data locally as a Dataframe +prep_data_run.artifact('cleaned_data').as_df() +``` + The {py:class}`~mlrun.datastore.DataItem` support multiple convenience methods such as: -* **get**, **put** - to read/write data -* **download**, **upload** - to download/upload files -* **as_df** - to convert the data to a DataFrame object +* **get()**, **put()** - to read/write data +* **download()**, **upload()** - to download/upload files +* **as_df()** - to convert the data to a DataFrame object * **local** - to get a local file link to the data (will be downloaded locally if needed) -* **listdir**, **stat** - file system like methods +* **listdir()**, **stat** - file system like methods * **meta** - access to the artifact metadata (in case of an artifact uri) +* **show()** - will visualize the data in Jupyter (as image, html, etc.) Check the **{py:class}`~mlrun.datastore.DataItem`** class documentation for details -In order to get a DataItem object from a url use {py:func}`~mlrun.run.get_data_item` or -{py:func}`~mlrun.run.get_data_object` (returns the `DataItem.get()`), for example: +In order to get a DataItem object from a url use {py:func}`~mlrun.run.get_dataitem` or +{py:func}`~mlrun.run.get_object` (returns the `DataItem.get()`), for example: - df = mlrun.get_data_item('s3://demo-data/mydata.csv').as_df() - print(mlrun.get_data_object('https://my-site/data.json')) + df = mlrun.get_dataitem('s3://demo-data/mydata.csv').as_df() + print(mlrun.get_object('https://my-site/data.json')) diff --git a/docs/tutorial/02-model-training.ipynb b/docs/tutorial/02-model-training.ipynb index 489a9c7b567..d9bfa26ece8 100644 --- a/docs/tutorial/02-model-training.ipynb +++ b/docs/tutorial/02-model-training.ipynb @@ -672,8 +672,7 @@ ], "source": [ "# Display HTML output artifacts\n", - "from IPython.display import display, HTML\n", - "display(HTML(filename=train_run.outputs['confusion-matrix']))" + "train_run.artifact('confusion-matrix').show()" ] }, { @@ -696,7 +695,7 @@ } ], "source": [ - "display(HTML(filename=train_run.outputs['roc-multiclass']))" + "train_run.artifact('roc-multiclass').show()" ] }, { @@ -722,7 +721,7 @@ "outputs": [], "source": [ "# Read your data set\n", - "df = mlrun.run.get_dataitem(train_run.outputs['test_set']).as_df()" + "df = train_run.artifact('test_set').as_df()" ] }, { @@ -1416,7 +1415,7 @@ ], "source": [ "# Display the `histograms` artifact\n", - "display(HTML(describe_run.outputs['histograms']))" + "describe_run.artifact('histograms').show()" ] }, { @@ -1440,7 +1439,7 @@ ], "source": [ "# Display the `imbalance` artifact\n", - "display(HTML(filename=describe_run.outputs['imbalance']))" + "describe_run.artifact('imbalance').show()" ] }, { @@ -1464,7 +1463,7 @@ ], "source": [ "# Display the `correlation` artifact\n", - "display(HTML(filename=describe_run.outputs['correlation']))" + "describe_run.artifact('correlation').show()" ] }, { @@ -1821,8 +1820,8 @@ "print(f'Test Accuracy: {test_run.outputs[\"accuracy\"]}')\n", "\n", "# Display HTML output artifacts\n", - "display(HTML(filename=test_run.outputs['confusion-matrix']))\n", - "display(HTML(filename=test_run.outputs['roc-multiclass']))" + "test_run.outputs('confusion-matrix').show()\n", + "test_run.outputs('roc-multiclass').show()" ] }, { diff --git a/mlrun/datastore/base.py b/mlrun/datastore/base.py index 7d51e36b743..e472485c7c2 100644 --- a/mlrun/datastore/base.py +++ b/mlrun/datastore/base.py @@ -17,13 +17,14 @@ from tempfile import mktemp import fsspec +import orjson import pandas as pd import pyarrow.parquet as pq import requests import urllib3 import mlrun.errors -from mlrun.utils import logger +from mlrun.utils import is_ipython, logger verify_ssl = False if not verify_ssl: @@ -221,7 +222,29 @@ def rm(self, path, recursive=False, maxdepth=None): class DataItem: - """Data input/output class abstracting access to various local/remote data sources""" + """Data input/output class abstracting access to various local/remote data sources + + DataItem objects are passed into functions and can be used inside the function, when a function run completes + users can access the run data via the run.artifact(key) which returns a DataItem object. + users can also convert a data url (e.g. s3://bucket/key.csv) to a DataItem using `mlrun.get_dataitem(url)`. + + Example:: + + # using data item inside a function + def my_func(context, data: DataItem): + df = data.as_df() + + + # reading run results using DataItem (run.artifact()) + train_run = train_iris_func.run(inputs={'dataset': dataset}, + params={'label_column': 'label'}) + + train_run.artifact('confusion-matrix').show() + test_set = train_run.artifact('test_set').as_df() + + # create and use DataItem from uri + data = mlrun.get_dataitem('http://xyz/data.json').get() + """ def __init__( self, @@ -276,20 +299,38 @@ def url(self): """DataItem url e.g. /dir/path, s3://bucket/path""" return self._url - def get(self, size=None, offset=0): - """read all or a range and return the content""" - return self._store.get(self._path, size=size, offset=offset) + def get(self, size=None, offset=0, encoding=None): + """read all or a byte range and return the content + + :param size: number of bytes to get + :param offset: fetch from offset (in bytes) + :param encoding: encoding (e.g. "utf-8") for converting bytes to str + """ + body = self._store.get(self._path, size=size, offset=offset) + if encoding and isinstance(body, bytes): + body = body.decode(encoding) + return body def download(self, target_path): - """download to the target dir/path""" + """download to the target dir/path + + :param target_path: local target path for the downloaded item + """ self._store.download(self._path, target_path) def put(self, data, append=False): - """write/upload the data, append is only supported by some datastores""" + """write/upload the data, append is only supported by some datastores + + :param data: data (bytes/str) to write + :param append: append data to the end of the object, NOT SUPPORTED BY SOME OBJECT STORES! + """ self._store.put(self._path, data, append=append) def upload(self, src_path): - """upload the source file (src_path) """ + """upload the source file (src_path) + + :param src_path: source file path to read from and upload + """ self._store.upload(self._path, src_path) def stat(self): @@ -339,6 +380,38 @@ def as_df( **kwargs, ) + def show(self, format=None): + """show the data object content in Jupyter + + :param format: format to use (when there is no/wrong suffix), e.g. 'png' + """ + if not is_ipython: + logger.warning( + "Jupyter/IPython was not detected, .show() will only display inside Jupyter" + ) + return + + from IPython import display + + suffix = self.suffix.lower() + if format: + suffix = "." + format + + if suffix in [".jpg", ".png", ".gif"]: + display.display(display.Image(self.get(), format=suffix[1:])) + elif suffix in [".htm", ".html"]: + display.display(display.HTML(self.get(encoding="utf-8"))) + elif suffix in [".csv", ".pq", ".parquet"]: + display.display(self.as_df()) + elif suffix in [".yaml", ".txt", ".py"]: + display.display(display.Pretty(self.get(encoding="utf-8"))) + elif suffix == ".json": + display.display(display.JSON(orjson.loads(self.get()))) + elif suffix == ".md": + display.display(display.Markdown(self.get(encoding="utf-8"))) + else: + logger.error(f"unsupported show() format {suffix} for {self.url}") + def __str__(self): return self.url diff --git a/mlrun/db/httpdb.py b/mlrun/db/httpdb.py index d496e81940f..a842a8e8702 100644 --- a/mlrun/db/httpdb.py +++ b/mlrun/db/httpdb.py @@ -398,7 +398,7 @@ def list_runs( start_time_to: datetime = None, last_update_time_from: datetime = None, last_update_time_to: datetime = None, - ): + ) -> RunList: """ Retrieve a list of runs, filtered by various options. Example:: @@ -523,7 +523,7 @@ def list_artifacts( until=None, iter: int = None, best_iteration: bool = False, - ): + ) -> ArtifactList: """ List artifacts filtered by various parameters. Examples:: diff --git a/mlrun/lists.py b/mlrun/lists.py index 9a3524d5404..0285f464f5e 100644 --- a/mlrun/lists.py +++ b/mlrun/lists.py @@ -11,17 +11,21 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +from typing import List import pandas as pd +import mlrun + +from .artifacts import Artifact, dict_to_artifact from .config import config from .render import artifacts_to_html, runs_to_html -from .utils import flatten, get_in +from .utils import flatten, get_artifact_target, get_in class RunList(list): def to_rows(self): + """return the run list as flattened rows""" rows = [] head = [ "project", @@ -57,6 +61,7 @@ def to_rows(self): return [head] + rows def to_df(self, flat=False): + """convert the run list to a dataframe""" rows = self.to_rows() df = pd.DataFrame(rows[1:], columns=rows[0]) # .set_index('iter') df["start"] = pd.to_datetime(df["start"]) @@ -69,6 +74,7 @@ def to_df(self, flat=False): return df def show(self, display=True, classes=None, short=False): + """show the run list as a table in Jupyter""" html = runs_to_html(self.to_df(), display, classes=classes, short=short) if not display: return html @@ -80,6 +86,7 @@ def __init__(self, *args): self.tag = "" def to_rows(self): + """return the artifact list as flattened rows""" rows = [] head = { "tree": "", @@ -102,6 +109,7 @@ def to_rows(self): return [head.keys()] + rows def to_df(self, flat=False): + """convert the artifact list to a dataframe""" rows = self.to_rows() df = pd.DataFrame(rows[1:], columns=rows[0]) df["updated"] = pd.to_datetime(df["updated"]) @@ -113,6 +121,7 @@ def to_df(self, flat=False): return df def show(self, display=True, classes=None): + """show the artifact list as a table in Jupyter""" df = self.to_df() if self.tag != "*": df.drop("tree", axis=1, inplace=True) @@ -120,6 +129,19 @@ def show(self, display=True, classes=None): if not display: return html + def objects(self) -> List[Artifact]: + """return as a list of artifact objects""" + return [dict_to_artifact(artifact) for artifact in self] + + def dataitems(self) -> List["mlrun.DataItem"]: + """return as a list of DataItem objects""" + dataitems = [] + for item in self: + artifact = get_artifact_target(item) + if artifact: + dataitems.append(mlrun.get_dataitem(artifact)) + return dataitems + class FunctionList(list): def __init__(self): diff --git a/mlrun/model.py b/mlrun/model.py index 3c75cae34b5..04961c55f5c 100644 --- a/mlrun/model.py +++ b/mlrun/model.py @@ -692,6 +692,7 @@ def __init__( super().__init__(spec, metadata) self._status = None self.status = status + self.outputs_wait_for_completion = True @classmethod def from_template(cls, template: RunTemplate): @@ -707,9 +708,11 @@ def status(self, status): def output(self, key): """return the value of a specific result or artifact by key""" + if self.outputs_wait_for_completion: + self.wait_for_completion() if self.status.results and key in self.status.results: return self.status.results.get(key) - artifact = self.artifact(key) + artifact = self._artifact(key) if artifact: return get_artifact_target(artifact, self.metadata.project) return None @@ -726,6 +729,8 @@ def ui_url(self) -> str: def outputs(self): """return a dict of outputs, result values and artifact uris""" outputs = {} + if self.outputs_wait_for_completion: + self.wait_for_completion() if self.status.results: outputs = {k: v for k, v in self.status.results.items()} if self.status.artifacts: @@ -733,8 +738,19 @@ def outputs(self): outputs[a["key"]] = get_artifact_target(a, self.metadata.project) return outputs - def artifact(self, key): - """return artifact metadata by key""" + def artifact(self, key) -> "mlrun.DataItem": + """return artifact DataItem by key""" + if self.outputs_wait_for_completion: + self.wait_for_completion() + artifact = self._artifact(key) + if artifact: + uri = get_artifact_target(artifact, self.metadata.project) + if uri: + return mlrun.get_dataitem(uri) + return None + + def _artifact(self, key): + """return artifact DataItem by key""" if self.status.artifacts: for a in self.status.artifacts: if a["key"] == key: @@ -747,6 +763,8 @@ def uid(self): def state(self): """current run state""" + if self.status.state in mlrun.runtimes.constants.RunStates.terminal_states(): + return self.status.state self.refresh() return self.status.state or "unknown" @@ -787,7 +805,7 @@ def logs(self, watch=True, db=None): print(f"final state: {state}") return state - def wait_for_completion(self, sleep=3, timeout=0): + def wait_for_completion(self, sleep=3, timeout=0, raise_on_failure=True): """wait for async run to complete""" total_time = 0 while True: @@ -800,6 +818,11 @@ def wait_for_completion(self, sleep=3, timeout=0): raise mlrun.errors.MLRunTimeoutError( "Run did not reach terminal state on time" ) + if raise_on_failure and state != mlrun.runtimes.constants.RunStates.completed: + self.logs(watch=False) + raise mlrun.errors.MLRunRuntimeError( + f"task {self.metadata.name} did not complete (state={state})" + ) return state @staticmethod diff --git a/tests/rundb/test_httpdb.py b/tests/rundb/test_httpdb.py index 4d40f962324..e04dcd412e5 100644 --- a/tests/rundb/test_httpdb.py +++ b/tests/rundb/test_httpdb.py @@ -255,12 +255,14 @@ def test_artifacts(create_server): server: Server = create_server() db = server.conn prj, uid, key, body = "p9", "u19", "k802", "tomato" - artifact = Artifact(key, body) + artifact = Artifact(key, body, target_path="a.txt") db.store_artifact(key, artifact, uid, project=prj) db.store_artifact(key, artifact, uid, project=prj, iter=42) artifacts = db.list_artifacts(project=prj, tag="*") assert len(artifacts) == 2, "bad number of artifacts" + assert artifacts.objects()[0].key == key, "not a valid artifact object" + assert artifacts.dataitems()[0].url, "not a valid artifact dataitem" artifacts = db.list_artifacts(project=prj, tag="*", iter=0) assert len(artifacts) == 1, "bad number of artifacts" diff --git a/tests/test_run.py b/tests/test_run.py index 35e30af8993..8881d05c4b6 100644 --- a/tests/test_run.py +++ b/tests/test_run.py @@ -116,6 +116,7 @@ def test_with_params(): assert result.output("accuracy") == 16, "failed to run" assert result.status.artifacts[0].get("key") == "chart", "failed to run" + assert result.artifact("chart").url, "failed to return artifact data item" @pytest.mark.skipif(not has_secrets(), reason="no secrets")