In [5]:
import pathlib
import uuid
import os
from unittest import mock

import pytest

import mlflow
from mlflow.exceptions import MlflowException
from mlflow.utils.file_utils import path_to_local_file_uri, mkdir, local_file_uri_to_path
from collections import namedtuple
from mlflow.utils.os import is_windows


Artifact = namedtuple("Artifact", ["uri", "content"])


@pytest.fixture()
def run_with_artifact(tmp_path):
    artifact_path = "test"
    artifact_content = "content"
    local_path = tmp_path.joinpath("file.txt")
    local_path.write_text(artifact_content)
    with mlflow.start_run() as run:
        mlflow.log_artifact(local_path, artifact_path)

    return (run, artifact_path, artifact_content)


def test_download_artifacts_with_uri(run_with_artifact):
    run, artifact_path, artifact_content = run_with_artifact
    run_uri = f"runs:/{run.info.run_id}/{artifact_path}"
    actual_uri = str(pathlib.PurePosixPath(run.info.artifact_uri) / artifact_path)
    for uri in (run_uri, actual_uri):
        download_output_path = mlflow.artifacts.download_artifacts(artifact_uri=uri)
        downloaded_artifact_path = next(pathlib.Path(download_output_path).iterdir())
        assert downloaded_artifact_path.read_text() == artifact_content


def test_download_artifacts_with_run_id_and_path(run_with_artifact):
    run, artifact_path, artifact_content = run_with_artifact
    download_output_path = mlflow.artifacts.download_artifacts(
        run_id=run.info.run_id, artifact_path=artifact_path
    )
    downloaded_artifact_path = next(pathlib.Path(download_output_path).iterdir())
    assert downloaded_artifact_path.read_text() == artifact_content


def test_download_artifacts_with_run_id_no_path(run_with_artifact):
    run, artifact_path, _ = run_with_artifact
    artifact_relative_path_top_level_dir = pathlib.PurePosixPath(artifact_path).parts[0]
    downloaded_output_path = mlflow.artifacts.download_artifacts(run_id=run.info.run_id)
    downloaded_artifact_directory_name = next(pathlib.Path(downloaded_output_path).iterdir()).name
    assert downloaded_artifact_directory_name == artifact_relative_path_top_level_dir


@pytest.mark.parametrize("dst_subdir_path", [None, "doesnt_exist_yet/subdiir"])
def test_download_artifacts_with_dst_path(run_with_artifact, tmp_path, dst_subdir_path):
    run, artifact_path, _ = run_with_artifact
    dst_path = tmp_path / dst_subdir_path if dst_subdir_path else tmp_path

    download_output_path = mlflow.artifacts.download_artifacts(
        run_id=run.info.run_id, artifact_path=artifact_path, dst_path=dst_path
    )
    assert pathlib.Path(download_output_path).samefile(dst_path / artifact_path)


def test_download_artifacts_throws_for_invalid_arguments():
    with pytest.raises(MlflowException, match="Exactly one of"):
        mlflow.artifacts.download_artifacts(
            run_id="run_id", artifact_path="path", artifact_uri="uri"
        )

    with pytest.raises(MlflowException, match="Exactly one of"):
        mlflow.artifacts.download_artifacts()

    with pytest.raises(MlflowException, match="`artifact_path` cannot be specified"):
        mlflow.artifacts.download_artifacts(artifact_path="path", artifact_uri="uri")


@pytest.fixture()
def run_with_text_artifact():
    artifact_path = "test/file.txt"
    artifact_content = "This is a sentence"
    with mlflow.start_run() as run:
        mlflow.log_text(artifact_content, artifact_path)

    artifact_uri = str(pathlib.PurePosixPath(run.info.artifact_uri) / artifact_path)
    return Artifact(artifact_uri, artifact_content)


@pytest.fixture()
def run_with_json_artifact():
    artifact_path = "test/config.json"
    artifact_content = {"mlflow-version": "0.28", "n_cores": "10"}
    with mlflow.start_run() as run:
        mlflow.log_dict(artifact_content, artifact_path)

    artifact_uri = str(pathlib.PurePosixPath(run.info.artifact_uri) / artifact_path)
    return Artifact(artifact_uri, artifact_content)


@pytest.fixture()
def run_with_image_artifact():
    from PIL import Image

    artifact_path = "test/image.png"
    image = Image.new("RGB", (100, 100))
    with mlflow.start_run() as run:
        mlflow.log_image(image, artifact_path)

    artifact_uri = str(pathlib.PurePosixPath(run.info.artifact_uri) / artifact_path)
    return Artifact(artifact_uri, image)


def test_load_text(run_with_text_artifact):
    artifact = run_with_text_artifact
    assert mlflow.artifacts.load_text(artifact.uri) == artifact.content


def test_load_dict(run_with_json_artifact):
    artifact = run_with_json_artifact
    assert mlflow.artifacts.load_dict(artifact.uri) == artifact.content


def test_load_json_invalid_json(run_with_text_artifact):
    artifact = run_with_text_artifact
    with pytest.raises(mlflow.exceptions.MlflowException, match="Unable to form a JSON object"):
        mlflow.artifacts.load_dict(artifact.uri)


def test_load_image(run_with_image_artifact):
    from PIL import Image

    artifact = run_with_image_artifact
    assert isinstance(mlflow.artifacts.load_image(artifact.uri), Image.Image)


def test_load_image_invalid_image(run_with_text_artifact):
    artifact = run_with_text_artifact
    with pytest.raises(
        mlflow.exceptions.MlflowException, match="Unable to form a PIL Image object"
    ):
        mlflow.artifacts.load_image(artifact.uri)


@pytest.fixture()
def text_artifact(tmp_path):
    artifact_name = "test.txt"
    artifacts_root_tmp = mkdir(tmp_path.joinpath(str(uuid.uuid4())))
    test_artifact_path = artifacts_root_tmp.joinpath(artifact_name)
    test_artifact_path.write_text("test")
    artifact_return_type = namedtuple(
        "artifact_return_type", ["tmp_path", "artifact_path", "artifact_name"]
    )
    return artifact_return_type(artifacts_root_tmp, test_artifact_path, artifact_name)


def _assert_artifact_uri(tracking_uri, expected_artifact_uri, test_artifact, run_id):
    mlflow.log_artifact(test_artifact.artifact_path)
    artifact_uri = mlflow.artifacts.download_artifacts(
        run_id=run_id, artifact_path=test_artifact.artifact_name, tracking_uri=tracking_uri
    )
    assert artifact_uri == expected_artifact_uri


def test_default_relative_artifact_uri_resolves(text_artifact):
    tracking_uri = path_to_local_file_uri(text_artifact.tmp_path.joinpath("mlruns"))
    mlflow.set_tracking_uri(tracking_uri)
    experiment_id = mlflow.create_experiment("test_exp_a", "test_artifacts_root")
    with mlflow.start_run(experiment_id=experiment_id) as run:
        _assert_artifact_uri(
            tracking_uri,
            str(
                pathlib.Path.cwd().joinpath(
                    "test_artifacts_root",
                    run.info.run_id,
                    "artifacts",
                    text_artifact.artifact_name,
                )
            ),
            text_artifact,
            run.info.run_id,
        )


def test_custom_relative_artifact_uri_resolves(text_artifact):
    tracking_uri = path_to_local_file_uri(text_artifact.tmp_path.joinpath("tracking"))
    artifacts_root_path = text_artifact.tmp_path.joinpath("test_artifacts")
    artifacts_root_uri = path_to_local_file_uri(artifacts_root_path)
    mlflow.set_tracking_uri(tracking_uri)
    experiment_id = mlflow.create_experiment("test_exp_b", artifacts_root_uri)
    with mlflow.start_run(experiment_id=experiment_id) as run:
        _assert_artifact_uri(
            tracking_uri,
            str(
                artifacts_root_path.joinpath(
                    run.info.run_id, "artifacts", text_artifact.artifact_name
                )
            ),
            text_artifact,
            run.info.run_id,
        )


def test_artifact_logging_resolution_works_with_non_root_working_directory(text_artifact):
    original_cwd = pathlib.Path.cwd()
    new_cwd = text_artifact.tmp_path.joinpath("some_location")
    new_cwd.mkdir()
    tracking_uri = mlflow.get_tracking_uri()
    experiment_id = mlflow.create_experiment("test_exp_c", "some_path")
    os.chdir(new_cwd)

    with mlflow.start_run(experiment_id=experiment_id) as run:
        _assert_artifact_uri(
            tracking_uri,
            str(
                original_cwd.joinpath(
                    "some_path",
                    run.info.run_id,
                    "artifacts",
                    text_artifact.artifact_name,
                )
            ),
            text_artifact,
            run.info.run_id,
        )
    os.chdir(original_cwd)


@pytest.mark.skipif(not is_windows(), reason="This test only passes on Windows")
def test_log_artifact_windows_path_with_hostname(text_artifact):
    experiment_test_1_artifact_location = r"\\my_server\my_path\my_sub_path\1"
    experiment_test_1_id = mlflow.create_experiment(
        "test_exp_d", experiment_test_1_artifact_location
    )
    with mlflow.start_run(experiment_id=experiment_test_1_id) as run:
        with mock.patch("shutil.copyfile") as copyfile_mock, mock.patch(
            "os.path.exists", return_value=True
        ) as exists_mock:
            mlflow.log_artifact(text_artifact.artifact_path)
            copyfile_mock.assert_called_once()
            exists_mock.assert_called_once()
            local_path = mlflow.artifacts.download_artifacts(
                run_id=run.info.run_id, artifact_path=text_artifact.artifact_name
            )
            assert (
                rf"{experiment_test_1_artifact_location}\{run.info.run_id}"
                rf"\artifacts\{text_artifact.artifact_name}" == local_path
            )

    experiment_test_2_artifact_location = "file://my_server/my_path/my_sub_path"
    experiment_test_2_id = mlflow.create_experiment(
        "test_exp_e", experiment_test_2_artifact_location
    )
    with mlflow.start_run(experiment_id=experiment_test_2_id) as run:
        with mock.patch("shutil.copyfile") as copyfile_mock, mock.patch(
            "os.path.exists", return_value=True
        ) as exists_mock:
            mlflow.log_artifact(text_artifact.artifact_path)
            copyfile_mock.assert_called_once()
            exists_mock.assert_called_once()
            local_path = mlflow.artifacts.download_artifacts(
                run_id=run.info.run_id, artifact_path=text_artifact.artifact_name
            )
            assert (
                local_file_uri_to_path(experiment_test_2_artifact_location)
                + rf"\{run.info.run_id}\artifacts\{text_artifact.artifact_name}"
                == local_path
            )

ModuleNotFoundError: No module named 'mlflow.utils.os'

In [None]:
import pathlib
import uuid
import os
from unittest import mock
import pathlib
import uuid
import os
from unittest import mock

import pytest

import mlflow
from mlflow.exceptions import MlflowException
from mlflow.utils.file_utils import path_to_local_file_uri, mkdir, local_file_uri_to_path
from collections import namedtuple
from mlflow.utils.os import is_windows
import pytest

import mlflow
from mlflow.exceptions import MlflowException
from mlflow.utils.file_utils import path_to_local_file_uri, mkdir, local_file_uri_to_path
from collections import namedtuple
from mlflow.utils.os import is_windows

In [3]:
!pip install pathlib
!pip install uuid
!pip install os 

Collecting uuid
  Downloading uuid-1.30.tar.gz (5.8 kB)
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Building wheels for collected packages: uuid
  Building wheel for uuid (setup.py): started
  Building wheel for uuid (setup.py): finished with status 'done'
  Created wheel for uuid: filename=uuid-1.30-py3-none-any.whl size=6480 sha256=27c8fb4072ed62551f6d5cc79a9325e5238c564cf9e90eea8634a443d9e122e3
  Stored in directory: c:\users\nallag\appdata\local\pip\cache\wheels\05\d7\b4\4795d29c6decfffbf64c63e58b6c8b8bbfd4751488617dcd7a
Successfully built uuid
Installing collected packages: uuid
Successfully installed uuid-1.30


ERROR: Could not find a version that satisfies the requirement os (from versions: none)
ERROR: No matching distribution found for os


In [4]:
!pip install mlflow

Collecting mlflow
  Downloading mlflow-2.1.1-py3-none-any.whl (16.7 MB)
     --------------------------------------- 16.7/16.7 MB 11.3 MB/s eta 0:00:00
Collecting protobuf<5,>=3.12.0
  Downloading protobuf-4.22.0-cp39-cp39-win_amd64.whl (420 kB)
     -------------------------------------- 420.6/420.6 kB 8.7 MB/s eta 0:00:00
Collecting databricks-cli<1,>=0.8.7
  Downloading databricks-cli-0.17.4.tar.gz (82 kB)
     ---------------------------------------- 82.3/82.3 kB 4.5 MB/s eta 0:00:00
  Preparing metadata (setup.py): started
  Preparing metadata (setup.py): finished with status 'done'
Collecting alembic<2
  Downloading alembic-1.9.4-py3-none-any.whl (210 kB)
     -------------------------------------- 210.5/210.5 kB 6.5 MB/s eta 0:00:00
Collecting querystring-parser<2
  Downloading querystring_parser-1.2.4-py2.py3-none-any.whl (7.9 kB)
Collecting gitpython<4,>=2.1.0
  Downloading GitPython-3.1.31-py3-none-any.whl (184 kB)
     ------------------------------------- 184.3/184.3 kB 11.

In [7]:
!pip install utils.os

ERROR: Could not find a version that satisfies the requirement utils.os (from versions: none)
ERROR: No matching distribution found for utils.os
