# Tutorial: Data Verification Pipeline User Guide

## Prerequisites

* If you opened this notebook from Azure Machine Learning studio, you need a compute instance to run the code. If you don't have a compute instance, select **Create compute** on the toolbar to first create one.  You can use all the default settings.  

    ![Create compute](./media/create-compute.png)

* If you're seeing this notebook elsewhere, complete [Create resources you need to get started](https://docs.microsoft.com/azure/machine-learning/quickstart-create-resources) to create an Azure Machine Learning workspace and a compute instance.

## Set your kernel

* If your compute instance is stopped, start it now.  
        
    ![Start compute](./media/start-compute.png)

* Once your compute instance is running, make sure that the kernel, found on the top right, is `Python 3.10 - SDK v2`.  If not, use the dropdown to select this kernel.

    ![Set the kernel](./media/set-kernel.png)


## Set up the pipeline resources

The Azure Machine Learning framework can be used from CLI, Python SDK, or studio interface. In this example, you use the Azure Machine Learning Python SDK v2 to create a pipeline. 

Before creating the pipeline, you need the following resources:

* The data asset for training
* The software environment to run the pipeline
* A compute resource to run the job

## Create handle to workspace

Before you deep dive into the code, you need a way to access the workspace. You'll create `ml_client` as a handle to the workspace. Then you can use the `ml_client` to manage resources and jobs.

In the next cell, please enter your Subscription ID, Resource Group name and Workspace name. To find these values:

1. In the upper right Azure Machine Learning studio toolbar, select your workspace name.
1. Copy the value for workspace, resource group and subscription ID into the code.
1. You'll need to copy one value, close the area and paste, then come back for the next one.

In [1]:
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

# from dotenv import load_dotenv
# import os
#
# load_dotenv("./.env")
# for k in os.environ.keys():
#     if "AZURE" in k:
#         print("#####:",k,os.environ.get(k))

# authenticate
credential = DefaultAzureCredential()
# # Get a handle to the workspace
ml_client = MLClient(
    credential=credential,
    subscription_id="<SUBSCRIPTION_ID>",
    resource_group_name="<RESOURCE_GROUP>",
    workspace_name="<AML_WORKSPACE_NAME>",
    cloud='AzureChinaCloud',
)

> [!NOTE]
> Creating MLClient will not connect to the workspace. The client initialization is lazy, it will wait for the first time it needs to make a call (this will happen when creating the `credit_data` data asset, two code cells from here).

## Register data from an upload file

If you have been following along with the other tutorials in this series and already registered the data, you can fetch the same dataset from the workspace using `credit_dataset = ml_client.data.get("<DATA ASSET NAME>", version='<VERSION>')`. Then you may skip this section. To learn about data more in depth or if you would rather complete the data tutorial first, see [Upload, access and explore your data in Azure Machine Learning](https://learn.microsoft.com/azure/machine-learning/tutorial-explore-data).

* Azure Machine Learning uses a `Data` object to register a reusable definition of data, and consume data within a pipeline. In the next section, you consume some data from upload file as one example. Data from other sources can be created as well.

In [2]:
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes


TSR_origin_data = Data(
    name="TSR_origin_data",
    path='./data/label_train.txt', # path to origin data
    type=AssetTypes.URI_FILE,
    description="TSR metadata",
    tags={"source_type": "file"},
    version="1.0.0",
)


Requirments_data = Data(
    name="TSR_requirements_data",
    path='./data/Data-TSRClassifier.json', # path to requirements_data
    type=AssetTypes.URI_FILE,
    description="TSR requirement data",
    tags={"source_type": "file"},
    version="1.0.0",
)

Label_data = Data(
    name="TSR_label_data",
    path='./data/class.txt', # path to requirements_data
    type=AssetTypes.URI_FILE,
    description="TSR label data",
    tags={"source_type": "file"},
    version="1.0.0",
)

# credit_data = ml_client.data.get(name="creditcard_defaults",version="1.0.1")

## Create a compute resource to run your pipeline

Each step of an Azure Machine Learning pipeline can use a different compute resource for running the specific job of that step. It can be single or multi-node machines with Linux or Windows OS, or a specific compute fabric like Spark.

In this section, you provision a Linux  [compute cluster](https://docs.microsoft.com/azure/machine-learning/how-to-create-attach-compute-cluster?tabs=python). See the [full list on VM sizes and prices](https://azure.microsoft.com/en-ca/pricing/details/machine-learning/) .

For this tutorial, you only need a basic cluster so use a Standard_DS3_v2 model with 2 vCPU cores, 7-GB RAM and create an Azure Machine Learning Compute.
> [!TIP]
> If you already have a compute cluster, replace "cpu-cluster" in the next code block with the name of your cluster.  This will keep you from creating another one.


In [3]:
from azure.ai.ml.entities import AmlCompute

# Name assigned to the compute cluster
cpu_compute_target = "cpu-cluster"

try:
    # let's see if the compute target already exists
    cpu_cluster = ml_client.compute.get(cpu_compute_target)
    print(
        f"You already have a cluster named {cpu_compute_target}, we'll reuse it as is."
    )

except Exception:
    print("Creating a new cpu compute target...")

    # Let's create the Azure Machine Learning compute object with the intended parameters
    cpu_cluster = AmlCompute(
        name=cpu_compute_target,
        # Azure Machine Learning Compute is the on-demand VM service
        type="amlcompute",
        # VM Family
        size="STANDARD_DS3_V2",
        # Minimum running nodes when there is no job running
        min_instances=0,
        # Nodes in cluster
        max_instances=4,
        # How many seconds will the node running after the job termination
        idle_time_before_scale_down=180,
        # Dedicated or LowPriority. The latter is cheaper but there is a chance of job termination
        tier="Dedicated",
    )
    print(
        f"AMLCompute with name {cpu_cluster.name} will be created, with compute size {cpu_cluster.size}"
    )
    # Now, we pass the object to MLClient's create_or_update method
    cpu_cluster = ml_client.compute.begin_create_or_update(cpu_cluster)

You already have a cluster named cpu-cluster, we'll reuse it as is.


## Create a job environment for pipeline steps

So far, you've created a development environment on the compute instance, your development machine. You also need an environment to use for each step of the pipeline. Each step can have its own environment, or you can use some common environments for multiple steps.

In this example, you create a conda environment for your jobs, using a conda yaml file.
First, create a directory to store the file in.

In [4]:
import os

dependencies_dir = "./dependencies"
os.makedirs(dependencies_dir, exist_ok=True)

Now, create the file in the dependencies directory.

In [5]:
%%writefile {dependencies_dir}/conda.yml
name: model-env
channels:
  - conda-forge
dependencies:
  - python=3.10
  - numpy=1.25.0
  - pip=23.1.2
  - requests==2.25.1
  - pip:
    - mlflow== 2.4.1
    - azureml-mlflow==1.52.0

Overwriting ./dependencies/conda.yml


Use the yaml file to create and register this custom environment in your workspace:

In [6]:
from azure.ai.ml.entities import Environment

custom_env_name = "aml-data-verification"

pipeline_job_env = Environment(
    name=custom_env_name,
    description="Custom environment for data verification pipeline",
    tags={"requests": "2.25.1"},
    conda_file=os.path.join(dependencies_dir, "conda.yml"),
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
    version="0.1.0",
)
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

print(
    f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)

Environment with name aml-data-verification is registered to workspace, the environment version is 0.1.0


## Build the pipeline

Now that you have all assets required to run your pipeline, it's time to build the pipeline itself.

Azure Machine Learning pipelines are reusable ML workflows that usually consist of several components. The typical life of a component is:

- Write the yaml specification of the component, or create it programmatically using `ComponentMethod`.
- Optionally, register the component with a name and version in your workspace, to make it reusable and shareable.
- Load that component from the pipeline code.
- Implement the pipeline using the component's inputs, outputs and parameters.
- Submit the pipeline

### Create component 1: data prepare (using programmatic definition)

Let's start by creating the first component. This component handles the preprocessing of the data. The preprocessing task is performed in the *data_prepare.py* Python file.

First create a source folder for the data_prep component:

In [7]:
import os

data_prep_src_dir = "./components/data_prepare"
os.makedirs(data_prep_src_dir, exist_ok=True)

This script converts the original metadata into the format required by the ProSafeAI template. Azure Machine Learning mounts datasets as folders to the computes, therefore, we created an auxiliary `select_first_file` function to access the data file inside the mounted input folder.

[MLFlow](https://learn.microsoft.com/articles/machine-learning/concept-mlflow) is used to log the parameters and metrics during our pipeline run.

In [8]:
%%writefile {data_prep_src_dir}/data_prepare.py
import os
import json
import random
import argparse
import mlflow


def select_first_file(path):
    """Selects first file in folder, use under assumption there is only one file in folder
    Args:
        path (str): path to directory or file to choose
    Returns:
        str: full path of selected file
    """
    files = os.listdir(path)
    return os.path.join(path, files[0])

def scan_data():

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--data", type=str, help="path to input data")
    parser.add_argument("--format_data", type=str, help="path to format data")
    args = parser.parse_args()

    # Start Logging
    mlflow.start_run()

    print("input data:", args.data)

    with open(args.data, "r", encoding="utf-8") as f:
        data = [line.strip().split("\t") for line in f.readlines()]

    results = []

    mlflow.log_metric("num_samples", len(data))

    for pic_name, value in data:
        sub_dict = json.loads(value)

        info = {
            "image_name": pic_name,
            "image_format": pic_name.split(".")[1],
            "class": sub_dict.get("class"),
            "Snowfall_intensity": sub_dict.get("Snowfall_intensity"),
            "Fog_intensity": sub_dict.get("Fog_intensity"),
            "Rain_quantity": sub_dict.get("Rain_quantity"),
            "dataset": sub_dict.get("dataset"),
            "augmentation": sub_dict.get("augmentation"),
            "Illuminance": sub_dict.get("Illuminance"),
        }

        results.append(info)

    with open(os.path.join(args.format_data, "TSR_format_metadata.json"), "w", encoding="utf-8") as fw:
        json.dump(results, fw)

    print('format_data: ', os.path.join(args.format_data, "TSR_format_metadata.json"))

    # Stop Logging
    mlflow.end_run()


if __name__ == "__main__":
    scan_data()

Overwriting ./components/data_prepare/data_prepare.py


Now that you have a script that can perform the desired task, create an Azure Machine Learning Component from it.

Use the general purpose `CommandComponent` that can run command line actions. This command line action can directly call system commands or run a script. The inputs/outputs are specified on the command line via the `${{ ... }}` notation.

In [9]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

data_prepare_component = command(
    name="data_prep_metadata",
    display_name="Data preparation for metadata",
    description="converts the original metadata into the format required by the ProSafeAI template",
    inputs={
        "data": Input(type="uri_folder"),
    },
    outputs=dict(
        format_data=Output(type="uri_folder", mode="rw_mount"),
    ),
    # The source folder of the component
    code=data_prep_src_dir,
    command="""python data_prepare.py \
            --data ${{inputs.data}}  --format_data ${{outputs.format_data}} \
            """,
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
)

Optionally, register the component in the workspace for future reuse.

In [10]:
# Now we register the component to the workspace
data_prepare_component = ml_client.create_or_update(data_prepare_component.component)

# Create (register) the component in your workspace
print(
    f"Component {data_prepare_component.name} with Version {data_prepare_component.version} is registered"
)

Component data_prep_metadata with Version 2023-11-30-09-18-09-0237058 is registered


### Create component 2: data_import_verification (using yaml definition)

The second component that you import the format data into the ProSafeAI backend, and return the data version, then, you run data verification task, and return the report. Use Azure Machine Learning logging capabilities to record and visualize the progress.


You used the `CommandComponent` class to create your first component. This time you use the yaml definition to define the second component. Each method has its own advantages. A yaml definition can actually be checked-in along the code, and would provide a readable history tracking. The programmatic method using `CommandComponent` can be easier with built-in class documentation and code completion.

Create the directory for this component:

In [11]:
import os

import_verification_src_dir = "./components/data_import_verification"
os.makedirs(import_verification_src_dir, exist_ok=True)

Create the import script in the directory:

In [12]:
%%writefile {import_verification_src_dir}/data_import_verification.py

import mlflow
import argparse
import os

# print(os.getcwd())
# /mnt/azureml/cr/j/17e5ad93670542eca36bcdfacb8571ea/exe/wd

from prosafeAI_sdk.data_verification import DataVerification

def select_first_file(path):
    """Selects first file in folder, use under assumption there is only one file in folder
    Args:
        path (str): path to directory or file to choose
    Returns:
        str: full path of selected file
    """
    files = os.listdir(path)
    print(files[0])

    return os.path.join(path, files[0])


def main():
    """Main function of the script."""

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--format_data", type=str, help="path to format data")
    parser.add_argument("--label_data", type=str, help="path to label data")
    parser.add_argument("--table_id", type=int, help="table id in ProsafeAI backend")
    parser.add_argument("--version_comments", type=str, help="version comments in ProsafeAI backend")
    parser.add_argument("--username", type=str, help="username in ProsafeAI backend")
    parser.add_argument("--password", type=str, help="password")
    parser.add_argument("--requirements_data", type=str, help="path to requirememts json")
    parser.add_argument("--task_name", type=str, help="task_name in ProsafeAI backend")
    parser.add_argument("--task_report", type=str, help="path to task report")
    args = parser.parse_args()


    # Start Logging
    mlflow.start_run()

    data_verification = DataVerification(username=args.username, password=args.password)
    data_version = data_verification.import_metadata(
            args.table_id, select_first_file(args.format_data), 
            args.label_data, version_comments=args.version_comments
        )

    if isinstance(data_version, int):
        task_result = data_verification.run_data_verification(
            args.table_id,
            data_version,
            args.requirements_data,
            task_name=args.task_name,
            task_report=args.task_report
        )

    # mlflow.log_metric("table_id", args.table_id)
    # mlflow.log_metric("table_version", data_version)
    # mlflow.log_metric("version_comments", args.version_comments)

    # Stop Logging
    mlflow.end_run()


if __name__ == "__main__":
    main()

Overwriting ./components/data_import_verification/data_import_verification.py


First, create the yaml file describing the component:

In [13]:
%%writefile {import_verification_src_dir}/data_import_verification.yml
# <component>
name: data_import_verification_task
display_name: Import format metadata into ProsafeAI backend and run data verification task
# version: 1 # Not specifying a version will automatically update the version
type: command
inputs:
  format_data:
    type: uri_folder
  table_id:
    type: number
  label_data:
    type: uri_folder     
  version_comments:
    type: string
  username:
    type: string
  password:
    type: string
  requirements_data:
    type: uri_folder
  task_name: 
    type: string
outputs:
  task_report:
    type: uri_folder
code: .
environment:
  # for this step, we'll use an AzureML curate environment
  azureml:aml-data-verification:0.1.0
command: >-
  python data_import_verification.py 
  --format_data ${{inputs.format_data}} 
  --label_data ${{inputs.label_data}}
  --username ${{inputs.username}} 
  --password ${{inputs.password}}
  --version_comments ${{inputs.version_comments}}
  --table_id ${{inputs.table_id}}
  --requirements_data ${{inputs.requirements_data}}
  --task_name ${{inputs.task_name}}
  --task_report ${{outputs.task_report}}

Overwriting ./components/data_import_verification/data_import_verification.yml


Now create and register the component. Registering it allows you to re-use it in other pipelines. Also, anyone else with access to your workspace can use the registered component.

In [14]:
# importing the Component Package
from azure.ai.ml import load_component

# Loading the component from the yml file
data_import_verification_component = load_component(source=os.path.join(import_verification_src_dir, "data_import_verification.yml"))

# Now we register the component to the workspace
data_import_verification_component = ml_client.create_or_update(data_import_verification_component)

# Create (register) the component in your workspace
print(
    f"Component {data_import_verification_component.name} with Version {data_import_verification_component.version} is registered"
)

[32mUploading data_import_verification (0.01 MBs): 100%|██████████| 14500/14500 [00:00<00:00, 83779.19it/s]
[39m



Component data_import_verification_task with Version 2023-11-30-09-18-11-9654202 is registered


## Create the pipeline from components

Now that both your components are defined and registered, you can start implementing the pipeline.

The Python functions returned by `load_component()` work as any regular Python function that we use within a pipeline to call each step.

To code the pipeline, you use a specific `@dsl.pipeline` decorator that identifies the Azure Machine Learning pipelines. In the decorator, we can specify the pipeline description and default resources like compute and storage. Like a Python function, pipelines can have inputs. You can then create multiple instances of a single pipeline with different inputs.

Here, we used *input data* as input variables. We then call the components and connect them via their inputs/outputs identifiers. The outputs of each step can be accessed via the `.outputs` property.

In [15]:
# the dsl decorator tells the sdk that we are defining an Azure Machine Learning pipeline
from azure.ai.ml import dsl, Input, Output


@dsl.pipeline(
    compute=cpu_compute_target,
    description="E2E data_perp-import-verification pipeline",
)
def data_verification_pipeline(
    pipeline_job_data_input,
    pipeline_job_label_data,
    pipeline_job_table_id,
    pipeline_job_version_comments,
    pipeline_job_username,
    pipeline_job_password,
    pipeline_job_requirements_json,
    pipeline_job_task_name
):
    # using data_prep_function like a python call with its own inputs
    data_prepare_job = data_prepare_component(
        data=pipeline_job_data_input,
    )

    # using data_import_func like a python call with its own inputs
    data_import_verification_job = data_import_verification_component(
        format_data=data_prepare_job.outputs.format_data,  # note: using outputs from previous step
        table_id=pipeline_job_table_id, 
        label_data=pipeline_job_label_data, 
        version_comments=pipeline_job_version_comments,  # note: using a pipeline input as parameter
        username=pipeline_job_username,
        password=pipeline_job_password,
        requirements_data=pipeline_job_requirements_json,
        task_name=pipeline_job_task_name
    )

    # a pipeline returns a dictionary of outputs
    # keys will code for the pipeline output identifier
    return {
        "pipeline_job_format_data": data_prepare_job.outputs.format_data,
        # "pipeline_job_data_version": data_import_job.outputs.data_version
    }

Now use your pipeline definition to instantiate a pipeline with your dataset.

In [16]:
# Let's instantiate the pipeline with the parameters of our choice
pipeline = data_verification_pipeline(
    pipeline_job_data_input=Input(type="uri_file", path=TSR_origin_data.path),
    pipeline_job_label_data=Input(type="uri_file", path=Label_data.path),
    pipeline_job_table_id=1,
    pipeline_job_version_comments="MLOps_pipeline",
    pipeline_job_username="<USERNAME>",
    pipeline_job_password="<PASSWORD>",
    pipeline_job_requirements_json=Input(type="uri_file", path=Requirments_data.path),
    pipeline_job_task_name="MLOps_pipeline"
)

## Submit the job 

It's now time to submit the job to run in Azure Machine Learning. This time you use `create_or_update`  on `ml_client.jobs`.

Here you also pass an experiment name. An experiment is a container for all the iterations one does on a certain project. All the jobs submitted under the same experiment name would be listed next to each other in Azure Machine Learning studio.

Once completed, the pipeline registers a model in your workspace as a result of training.

In [17]:
# submit the pipeline job
pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    # Project's name
    experiment_name="E2E data_perp-import-verification pipeline",
)
ml_client.jobs.stream(pipeline_job.name)

Class AutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class AutoDeleteConditionSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseAutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class IntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class ProtectionLevelSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseIntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


RunId: frank_animal_kfnx3m6bmf
Web View: https://studio.ml.azure.cn/runs/frank_animal_kfnx3m6bmf?wsid=/subscriptions/e0b78dee-edf0-469b-8785-8056794f5e74/resourcegroups/rg-prosafeai-cn/workspaces/rg-prosafeai-cn

Streaming logs/azureml/executionlogs.txt

[2023-11-30 09:18:18Z] Submitting 1 runs, first five are: 64ce4fa4:7cba5140-adb7-4268-bc68-ecc5b1b3c6ff
[2023-11-30 09:18:23Z] Completing processing run id 7cba5140-adb7-4268-bc68-ecc5b1b3c6ff.
[2023-11-30 09:18:24Z] Submitting 1 runs, first five are: 430108b7:fefaf68f-a0f4-495a-9308-831738fc5493
[2023-11-30 09:20:26Z] Completing processing run id fefaf68f-a0f4-495a-9308-831738fc5493.

Execution Summary
RunId: frank_animal_kfnx3m6bmf
Web View: https://studio.ml.azure.cn/runs/frank_animal_kfnx3m6bmf?wsid=/subscriptions/e0b78dee-edf0-469b-8785-8056794f5e74/resourcegroups/rg-prosafeai-cn/workspaces/rg-prosafeai-cn

