In [None]:
# %load_ext autoreload
# %autoreload 2
import os
import sys

sys.path.append("")

from src.train import eval_model

from dotenv import load_dotenv

load_dotenv("../config/prod.env")

import yaml

with open("../config/config.yaml") as f:
    cfg = yaml.load(f, Loader=yaml.FullLoader)

from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential

import pandas as pd
import mlflow

from azure.ai.ml import MLClient, command, Input, dsl
from azure.ai.ml.entities import JobService

from datetime import datetime

from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split

# enter details of your AML workspace
subscription_id = os.getenv("subscription_id")
resource_group = os.getenv("resource_group")
workspace = os.getenv("workspace")

credential = InteractiveBrowserCredential()  # DefaultAzureCredential()
# get a handle to the workspace
ml_client = MLClient(credential, subscription_id, resource_group, workspace)

# Single step

In [None]:
import yaml

with open("../config/config.yaml") as f:
    cfg = yaml.load(f, Loader=yaml.FullLoader)

In [None]:
df = pd.read_csv("../data/training_data_sample.csv")
df.head(2).T

In [None]:
# create the command
train_job = command(
    code="../src",  # local path where the code is stored
    command="python train.py --input-csv ${{inputs.input_csv}} --random-state ${{inputs.random_state}} --predictor-cols ${{inputs.predictor_cols}} --target-col ${{inputs.target_col}}",
    inputs={
        "input_csv": Input(
            type="uri_file",
            path="../data/training_data_sample.csv",  # "https://azuremlexamples.blob.core.windows.net/datasets/iris.csv",
        ),
        "random_state": 8,
        "predictor_cols": "A,B,C",
        "target_col": cfg["model"]["target_col"],
    },
    environment=ml_client.environments.get(
        name=cfg["environment"]["conda"]["name"],
        version=cfg["environment"]["conda"]["version"],
    ),
    display_name="model-training",
    compute=cfg["model"]["compute"]["name"],
    # experiment_name
    # description
)

In [None]:
# TODO this doesn't work for some reason
# Now we register the component to the workspace
train_job_component = ml_client.components.create_or_update(train_job.component)

In [None]:
# Launch job
returned_job = ml_client.create_or_update(train_job)

# Spark single step

In [None]:
from azure.ai.ml import spark, Input, Output

import yaml

with open("../config/config.yaml") as f:
    cfg = yaml.load(f, Loader=yaml.FullLoader)

In [None]:
datastore_name = ""
path_on_datastore = ""
input_uri = f"azureml://subscriptions/{subscription_id}/resourcegroups/{resource_group}/workspaces/{workspace}/datastores/{datastore_name}/paths/{path_on_datastore}"
output_uri_folder = f"azureml://subscriptions/{subscription_id}/resourcegroups/{resource_group}/workspaces/{workspace}/datastores/{datastore_name}/paths/spark/output"

In [None]:
spark_job = spark(
    display_name="Test job from serverless Spark with VNet using Data Store",
    code="../src/components/spark_step",
    entry={"file": "spark_step.py"},
    driver_cores=1,
    driver_memory="2g",
    executor_cores=1,
    executor_memory="1g",
    executor_instances=1,
    resources={
        "instance_type": "Standard_E4S_V3",
        "runtime_version": "3.3.0",
    },
    inputs={
        "input_uri": Input(
            type="uri_file",
            path=input_uri,
            mode="direct",
        ),
    },
    outputs={
        "output": Output(
            type="uri_folder",
            path=output_uri_folder,
            mode="direct",
        ),
    },
    args="--input_uri ${{inputs.input_uri}} --output ${{outputs.output}}",
    # environment=,
)

returned_spark_job = ml_client.jobs.create_or_update(spark_job)
print(returned_spark_job.id)

# With pipeline job

In [None]:
from azure.ai.ml.dsl import pipeline
from azure.ai.ml import command
from azure.ai.ml import Input, Output
from azure.ai.ml import load_component

components_src_dir = "../src/components/"

In [None]:
# Load components
spark_data_prep = load_component(
    source=components_src_dir + "spark_step/spark_step.yaml"
)
data_prep_component = load_component(
    source=os.path.join(components_src_dir, "data_prep/data_prep.yaml")
)
train_component = load_component(
    source=os.path.join(components_src_dir, "train/train.yaml")
)

In [None]:
# Now we register the component to the workspace
spark_data_prep_component = ml_client.create_or_update(spark_data_prep)
data_prep_component = ml_client.create_or_update(data_prep_component)
train_component = ml_client.create_or_update(train_component)

In [None]:
@dsl.pipeline(
    compute="serverless",  # "serverless" value runs pipeline on serverless compute
    description="",
)
def model_training_pipeline(
    pipeline_job_data_input,
    pipeline_job_predictor_cols,
    pipeline_job_target_col,
    pipeline_job_random_state,
):
    # using data_prep_function like a python call with its own inputs
    data_prep_job = data_prep_component(
        input_csv=pipeline_job_data_input,
    )

    # using train_func like a python call with its own inputs
    train_job = train_component(
        train_data=data_prep_job.outputs.train_data,  # note: using outputs from previous step
        test_data=data_prep_job.outputs.test_data,  # note: using outputs from previous step
        predictor_cols=pipeline_job_predictor_cols,  # note: using a pipeline input as parameter
        target_col=pipeline_job_target_col,
        random_state=pipeline_job_random_state,
    )

    # a pipeline returns a dictionary of outputs
    # keys will code for the pipeline output identifier
    return {
        "pipeline_job_model": train_job.outputs.model,
    }

In [None]:
training_data = ml_client.data.get(
    name=cfg["data"]["training"]["name"], version=cfg["data"]["training"]["version"]
)

# Let's instantiate the pipeline with the parameters of our choice
pipeline_job = model_training_pipeline(
    pipeline_job_data_input=Input(type="uri_file", path=training_data.path),
    pipeline_job_random_state=8,
)

In [None]:
# submit job to workspace
pipeline_job = ml_client.jobs.create_or_update(pipeline_job, experiment_name="")
pipeline_job

In [None]:
# Wait until the job completes
ml_client.jobs.stream(pipeline_job.name)