## Setup

In [None]:
import pandas as pd
import numpy as np
import os

from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient, Input
from azure.ai.ml.dsl import pipeline
from azure.ai.ml import load_component

# To access files better
os.chdir("../")
print(os.getcwd())

In [None]:
try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

In [None]:
# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)

# Retrieve an already attached Azure Machine Learning Compute.
cluster_name = "cpu-cluster"
# print(ml_client.compute.get(cluster_name))

In [None]:
ml_client

In [None]:
from azure.ai.ml.entities import Environment, AmlCompute

env_docker_conda = Environment(
    image="mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04",
    conda_file="./SDK-V2/conda_yamls/env_cli.yml",
    name="data-model-drift-env",
    description="Environment created from a Docker image plus Conda environment.",
)
ml_client.environments.create_or_update(env_docker_conda)

env_name = env_docker_conda.name

In [None]:
from azure.ai.ml import Input

parent_dir = os.getcwd()

# # Retrieve files from a remote location such as the Blob storage
# pred_maintenance_input_remote = Input(
#     path="azureml://datastores/workspaceblobstore/paths/data_drift/inputs/", #this path needs to be adjusted to your datastore path
#     type= "uri_folder"
# )

# Retrieve files from location location 
pred_maintenance_input_local =  Input(
      type="uri_folder", 
      path = parent_dir + "/data/data_raw/predictive_maintenance")

In [None]:
parent_dir

In [None]:
# Paths to your custom defined components
prep_yml = "/SDK-V2/prep_data.yml"
drift_yml = "/SDK-V2/data_drift.yml"
drift_db_yml = "/SDK-V2/data_drift_db.yml"

print(f"{parent_dir}{prep_yml}")
# 1. Load components
prepare_data = load_component(source=f"{parent_dir}{prep_yml}")
measure_data_drift = load_component(source=f"{parent_dir}{drift_yml}")
collect_data_drift_values = load_component(source=f"{parent_dir}{drift_db_yml}")


# 2. Construct pipeline
@pipeline()
def data_drift_preprocess(pipeline_job_input):
    # the parameters come from the respectove .yml file step. E.g. "input_path" is under inputs
    transform_data = prepare_data(input_path=pipeline_job_input)
    # the input for this pipeline is the output of the previous pipeline which is called "output_path"
    drift_detect = measure_data_drift(
        transformed_data_path=transform_data.outputs.output_path,
        threshold = 0.01
    )
    save_drift_db = collect_data_drift_values(
        transformed_data_path=transform_data.outputs.output_path,
        threshold = 0.01
    )
    return {
        "pipeline_job_prepped_data": transform_data.outputs.output_path,
        "pipeline_job_detect_data_drift": drift_detect.outputs.drift_plot_path,
        "pipeline_job_store_data_drift": save_drift_db.outputs.drift_db_path,

    }

# Define the input of your pipeline. In this example we only have one input which is the path to where the input data resides
pipeline_job = data_drift_preprocess(pred_maintenance_input_local)


# demo how to change pipeline output settings
pipeline_job.outputs.pipeline_job_prepped_data.mode = "upload" # "rw_mount"
pipeline_job.outputs.pipeline_job_detect_data_drift.mode = "upload" 
pipeline_job.outputs.pipeline_job_store_data_drift.mode = "upload" 


# set pipeline level compute
pipeline_job.settings.default_compute="cpu-cluster"
# set pipeline level datastore
pipeline_job.settings.default_datastore="workspaceblobstore"

In [None]:
# submit job to workspace
experiment_name = "data_drift_experiment"

pipeline_job = ml_client.jobs.create_or_update(
    pipeline_job, experiment_name=experiment_name
)
pipeline_job

## Register component

In [None]:
parent_dir = os.getcwd()

# Paths to your custom defined components
prep_yml = "/SDK-V2/prep_data.yml"
prepare_data = load_component(source=f"{parent_dir}{prep_yml}")
prepare_data_comp = ml_client.components.create_or_update(prepare_data)


In [None]:
# Re-load components to avoid "reserved name" error  

parent_dir = os.getcwd()

# Paths to your custom defined components
prep_yml = "/SDK-V2/prep_data.yml"
drift_yml = "/SDK-V2/data_drift.yml"
drift_db_yml = "/SDK-V2/data_drift_db.yml"

# 1. Load components
prepare_data = load_component(source=f"{parent_dir}{prep_yml}")
measure_data_drift = load_component(source=f"{parent_dir}{drift_yml}")
collect_data_drift_values = load_component(source=f"{parent_dir}{drift_db_yml}")

# Now we register the component to the workspace

# try:
#     # try get back the component
#     prepare_data_comp = ml_client.components.get(name="prepare_drift_data", version="1")
#     measure_data_drift_comp = ml_client.components.get(name="measure_data_drift", version="1")
#     collect_data_drift_values_comp = ml_client.components.get(name="save_data_drift_values", version="1")
# except:
    # if not exists, register component using following code
prepare_data_comp = ml_client.components.create_or_update(prepare_data)
measure_data_drift_comp = ml_client.components.create_or_update(measure_data_drift)
collect_data_drift_values_comp = ml_client.components.create_or_update(collect_data_drift_values)

# Create (register) the component in your workspace
print(
    f"Component {prepare_data.name} with Version {prepare_data.version} is registered",
    "\n",
    f"Component {measure_data_drift.name} with Version {measure_data_drift.version} is registered",
    "\n",
    f"Component {collect_data_drift_values.name} with Version {collect_data_drift_values.version} is registered"
)

In [None]:
# create a cron schedule start from current time and fire at minute 0,10 of every hour within the AEST TZ
from datetime import datetime
from dateutil import tz
from azure.ai.ml.constants import TimeZone
from azure.ai.ml.entities import (
    CronSchedule,
    RecurrenceSchedule,
    RecurrencePattern,
    ScheduleStatus,
)

schedule_start_time = datetime.now(tz=tz.gettz())
cron_schedule = CronSchedule(
    expression="0,10 * * * *",
    start_time=schedule_start_time,
    time_zone=TimeZone.AUS_EASTERN_STANDARD_TIME,
    status=ScheduleStatus.ENABLED,
)
# pipeline_job.schedule = cron_schedule

## Submit Jobs via CLI V2

**PLEASE EXECUTE THE FOLLOWING COMMANDS IN THE TERMINAL OUTSIDE THIS NOTEBOOK**

If you have an error "the refresh token has expired", use `az login` to athenticate in the CLI.

In [None]:
#!conda activate azureml_py310_sdkv2
#!az ml job create --file pipeline.yml