In [None]:
import mlrun

project_name = "jw-proj"
project = mlrun.get_or_create_project(project_name, "./")

# Dask

In [None]:
dask_cluster_name = "dask-cluster"
dask_cluster = mlrun.new_function(dask_cluster_name, kind="dask", image="mlrun/mlrun")

dask_cluster.apply(mlrun.mount_v3io())
project.set_function(dask_cluster)

In [None]:
# set range for # of replicas with replicas and max_replicas
dask_cluster.spec.min_replicas = 1
dask_cluster.spec.max_replicas = 4

# set the use of dask remote cluster (distributed)
dask_cluster.spec.remote = True
dask_cluster.spec.service_type = "NodePort"

# set dask memory and cpu limits
dask_cluster.with_worker_requests(mem="2G", cpu="2")

In [None]:
def inc(x):
    return x + 2

In [None]:
def hndlr(context, x=1, y=2):
    context.logger.info("params: x={},y={}".format(x, y))
    x = context.dask_client.submit(inc, x)
    print(x.result())
    context.log_result("dask-try", x.result())

In [None]:
myrun = dask_cluster.run(
    handler=hndlr, name="dask-try", params={"x": 12, "y": 3}, watch=False
)

# Local

In [None]:
%%writefile data-prep.py

import pandas as pd
from sklearn.datasets import load_breast_cancer

import mlrun


@mlrun.handler(outputs=["dataset", "label_column"])
def breast_cancer_generator():
    """
    A function which generates the breast cancer dataset
    """
    breast_cancer = load_breast_cancer()
    breast_cancer_dataset = pd.DataFrame(
        data=breast_cancer.data, columns=breast_cancer.feature_names
    )
    breast_cancer_labels = pd.DataFrame(data=breast_cancer.target, columns=["label"])
    breast_cancer_dataset = pd.concat(
        [breast_cancer_dataset, breast_cancer_labels], axis=1
    )

    return breast_cancer_dataset, "label"

In [None]:
data_gen_fn = project.set_function(
    "data-prep.py",
    name="data-prep",
    kind="job",
    image="mlrun/mlrun",
    handler="breast_cancer_generator",
)

In [None]:
gen_data_run = project.run_function("data-prep", local=True, watch=False)

# Job

In [None]:
# Import the function
trainer = mlrun.import_function("hub://auto_trainer")

In [None]:
trainer_run = project.run_function(
    trainer,
    inputs={"dataset": gen_data_run.outputs["dataset"]},
    params={
        "model_class": "sklearn.ensemble.RandomForestClassifier",
        "train_test_split_size": 0.2,
        "label_columns": "label",
        "model_name": "cancer",
    },
    handler="train",
    watch=False,
)

Scheduling

In [None]:
# schedule for every hour
trainer_run = project.run_function(
    trainer,
    inputs={"dataset": gen_data_run.outputs["dataset"]},
    params={
        "model_class": "sklearn.ensemble.RandomForestClassifier",
        "train_test_split_size": 0.2,
        "label_columns": "label",
        "model_name": "cancer",
    },
    handler="train",
    schedule="0 * * * *",
)

# Serving

In [None]:
serving_fn = mlrun.new_function("serving", image="mlrun/mlrun", kind="serving")
serving_fn.add_model(
    "cancer-classifier",
    model_path=trainer_run.outputs["model"],
    class_name="mlrun.frameworks.sklearn.SklearnModelServer",
)

In [None]:
project.deploy_function(serving_fn)

In [None]:
sample = {
    "inputs": [
        [
            1.371e01,
            2.083e01,
            9.020e01,
            5.779e02,
            1.189e-01,
            1.645e-01,
            9.366e-02,
            5.985e-02,
            2.196e-01,
            7.451e-02,
            5.835e-01,
            1.377e00,
            3.856e00,
            5.096e01,
            8.805e-03,
            3.029e-02,
            2.488e-02,
            1.448e-02,
            1.486e-02,
            5.412e-03,
            1.706e01,
            2.814e01,
            1.106e02,
            8.970e02,
            1.654e-01,
            3.682e-01,
            2.678e-01,
            1.556e-01,
            3.196e-01,
            1.151e-01,
        ]
    ]
}

In [None]:
serving_fn.invoke(path=f"/v2/models/cancer-classifier/infer", body=sample)

# Handler

In [None]:
import pickle
from mlrun import new_task

!pip install xgboost
from xgboost import XGBClassifier

from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split

In [None]:
def handler(context):
    context.set_label("category", "tests")
    print("artifact_path={}".format(context.artifact_path))
    dataset = load_iris(as_frame=True)
    clf = XGBClassifier(
        n_estimators=2, max_depth=2, learning_rate=1, objective="binary:logistic"
    )
    X, y = dataset.data, dataset.target
    X_train, x_test, y_train, y_test = train_test_split(
        X, y, train_size=0.8, test_size=0.2, random_state=0
    )
    clf.fit(X_train, y_train)
    context.log_model(
        "clf_model",
        body=pickle.dumps(clf),
        model_file="clf.pkl",
        framework="xgboost",
        label_column="label",
        tag="fgvndgt1",
    )
    context.log_dataset("ds", df=x_test, tag="fgvndgt")

In [None]:
task = new_task(name="kmqrijzfki", handler=handler, project=project_name, **{})
func = mlrun.new_function(name="func", kind="job", image="mlrun/mlrun")
func.save()
run_object = mlrun.run_function(func, local=True, base_task=task, watch=False)

# Nuclio

In [None]:
%%writefile nuclio-func.py

import socket
import pandas as pd


def hyper_func2(context, data, p1, p2, p3):
    print(data.as_df().head())
    context.logger.info(f"p2={p2}, p3={p3}, r1={p2 * p3} at {socket.gethostname()}")
    context.log_result("r1", p2 * p3)
    raw_data = {
        "first_name": ["Jason", "Molly", "Tina", "Jake", "Amy"],
        "age": [42, 52, 36, 24, 73],
        "testScore": [25, 94, 57, 62, 70],
    }
    df = pd.DataFrame(raw_data, columns=["first_name", "age", "testScore"])
    context.log_dataset("mydf", df=df, stats=True)

In [None]:
fn = mlrun.code_to_function(
    filename="nuclio-func.py",
    handler="hyper_func2",
    name="hyper-tst2",
    kind="nuclio:mlrun",
    image="mlrun/mlrun",
)
# replicas * workers need to match or exceed parallel_runs
fn.spec.replicas = 2
fn.with_http(workers=2)
fn.deploy()

In [None]:
import nest_asyncio

nest_asyncio.apply()

In [None]:
grid_params = {"p2": [2, 1, 4, 1], "p3": [10, 20]}
task = mlrun.new_task(
    params={"p1": 8},
    inputs={"data": "https://s3.wasabisys.com/iguazio/data/iris/iris_dataset.csv"},
)
task.with_hyper_params(
    grid_params, selector="r1", strategy="grid", parallel_runs=4, max_errors=3
)
run = fn.run(task, watch=False)

# Mpi

In [None]:
%%writefile mpi-func.py

import time

def do_nothing():
	time.sleep(0)
	return "nevermind"

In [None]:
mpijob = mlrun.code_to_function(
    name="mpijob-func",
    filename="mpi-func.py",
    kind="mpijob",
    image="mlrun/mlrun",
    handler="do_nothing",
)

In [None]:
mpijob.spec.replicas = 3

In [None]:
mpijob.run(watch=False)

Scheduling

In [None]:
# schedule for once a week
mpijob.run(schedule="0 0 * * 0")

# Application

In [None]:
# Specify pre built vizro image
application = project.set_function(
    name="vizro", kind="application", image="kharchukt/vizro"
)
application.set_internal_application_port(8050)
# Deploy to nuclio
application.deploy()

In [None]:
# Test invocation
application.invoke("/", verify=False)

# Spark

In [None]:
def my_spark(context=None):
    from time import sleep

    sleep(0)

In [None]:
from mlrun.run import new_function

function_pbvtwxlwcd = new_function(
    kind="spark",
    command="/v3io/bigdata/naipi_files/pyspark_basic_tests.py",
    name="tqrcj",
)
if None:
    function_pbvtwxlwcd.set_state_thresholds(None)
function_pbvtwxlwcd.metadata.tag = ""

function_pbvtwxlwcd.with_executor_limits(cpu="2200m")
function_pbvtwxlwcd.with_driver_limits(cpu="2500m")
project.set_function(function_pbvtwxlwcd)
function_pbvtwxlwcd.with_executor_requests(mem="1G", cpu="0.1")
function_pbvtwxlwcd.with_driver_requests(mem="1G", cpu="0.1")
function_pbvtwxlwcd.spec.replicas = 2
function_pbvtwxlwcd.with_igz_spark()
function_pbvtwxlwcd.spec.args = ["--json-path=/v3io/bigdata/imdb_movies.json"]
function_pbvtwxlwcd.spec.deps["jars"] += [
    "local:///spark/3rd_party/mysql-connector-java-8.0.13.jar"
]
sr = function_pbvtwxlwcd.run(
    artifact_path="/User/artifacts", watch=False, notifications=[]
)

Scheduling

In [None]:
# schedule for once a month
sr = function_pbvtwxlwcd.run(
    artifact_path="/User/artifacts", notifications=[], schedule="0 0 1 * *"
)

# Databriks

In [None]:
import os
from mlrun.runtimes.function_reference import FunctionReference

In [None]:
# If using a Databricks data store, for example, set the credentials:
os.environ["DATABRICKS_HOST"] = "DATABRICKS_HOST"
os.environ["DATABRICKS_TOKEN"] = "DATABRICKS_TOKEN"
os.environ["DATABRICKS_CLUSTER_ID"] = "DATABRICKS_CLUSTER_ID"

In [None]:
def add_databricks_env(function):
    job_env = {
        "DATABRICKS_HOST": os.environ["DATABRICKS_HOST"],
        "DATABRICKS_CLUSTER_ID": os.environ.get("DATABRICKS_CLUSTER_ID"),
    }

    for name, val in job_env.items():
        function.spec.env.append({"name": name, "value": val})

In [None]:
secrets = {"DATABRICKS_TOKEN": os.environ["DATABRICKS_TOKEN"]}

project.set_secrets(secrets)

code = """
def print_kwargs(**kwargs):
    print(f"kwargs: {kwargs}")
"""

function_ref = FunctionReference(
    kind="databricks",
    code=code,
    image="mlrun/mlrun",
    name="databricks-function",
)

function = function_ref.to_function()

add_databricks_env(function=function)

run = function.run(
    handler="print_kwargs",
    project=project_name,
    params={
        "param1": "value1",
        "param2": "value2",
        # "task_parameters": {"timeout_minutes": 1},
    },
    watch=False,
)

Scheduling

In [None]:
# schedule for once a year
run = function.run(
    handler="print_kwargs",
    project=project_name,
    params={
        "param1": "value1",
        "param2": "value2",
        # "task_parameters": {"timeout_minutes": 1},
    },
    schedule="0 0 1 1 *",
)