# Submit Pipelines
## AML Cluster

In [1]:
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

credential = DefaultAzureCredential()
ml_client = MLClient(
    credential=credential,
    subscription_id="49b5441f-dda4-47a9-81c4-13272430f4ff",
    resource_group_name="rg-pooya120-dev",
    workspace_name="amlwpooya120dev",
)

### Compute Resource
We search for an existing cluster. If it does not exist, we create a new one.

Check your Compute page in Azure ML to confirm the process is successful.

In [2]:
# Create a compute resource

from azure.ai.ml.entities import AmlCompute

# Name assigned to the compute cluster
cpu_compute_target = "cpu-cluster"

try:
    cpu_cluster = ml_client.compute.get(cpu_compute_target)
    print(f"You already have a cluster named {cpu_compute_target}, we'll reuse it as is.")

except Exception:
    print("Creating a new cpu compute target...")

    cpu_cluster = AmlCompute(
        name=cpu_compute_target,
        type="amlcompute",
        size="STANDARD_DS3_V2",
        min_instances=0,
        max_instances=4,
        idle_time_before_scale_down=180,
        tier="Dedicated",
    )
    print(f"{cpu_cluster.name} will be created... compute size {cpu_cluster.size}...", end='')
    cpu_cluster = ml_client.compute.begin_create_or_update(cpu_cluster)
    print("Done!")

You already have a cluster named cpu-cluster, we'll reuse it as is.


### Create a Job Environment
We create an environment that has the packages we need installed. We register it for future use.

Steps:

1. Create an empty folder for the config file.
2. Create a `conda.yml` file containing the requirements of your environment.
3. Create an environment according to the `conda.yml` file and register it to the workspace.

Check your Environments page in Azure ML to confirm the process is successful.

In [3]:
import os

dependencies_dir = "./05-pipeline/dependencies"
os.makedirs(dependencies_dir, exist_ok=True)

In [4]:
%%writefile {dependencies_dir}/conda.yml
name: model-env
channels:
  - conda-forge
dependencies:
  - python=3.8
  - numpy=1.21.2
  - pip=21.2.4
  - scikit-learn=0.24.2
  - scipy=1.7.1
  - pandas>=1.1,<1.2
  - pip:
    - inference-schema[numpy-support]==1.3.0
    - xlrd==2.0.1
    - mlflow== 1.26.1
    - azureml-mlflow==1.42.0
    - psutil>=5.8,<5.9
    - tqdm>=4.59,<4.60
    - ipykernel~=6.0
    - matplotlib

Overwriting ./05-pipeline/dependencies/conda.yml


In [5]:
from azure.ai.ml.entities import Environment

custom_env_name = "bike-share-train"

pipeline_job_env = Environment(
    name=custom_env_name,
    description="Custom environment for Bike Share pipeline",
    tags={"scikit-learn": "0.24.2"},
    conda_file=os.path.join(dependencies_dir, "conda.yml"),
    image="mcr.microsoft.com/azureml/openmpi3.1.2-ubuntu18.04:latest",
)
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

print(
    f"Environment {pipeline_job_env.name} registered to workspace. "
    f"Environment version is {pipeline_job_env.version}."
)

Environment bike-share-train registered to workspace. Environment version is 2.


### Data Registration
It is easier to pass registered data assets to pipeline steps, rather than pure imports (as shown in Notebook1).

Here, we create the same dataset, and register it to Azure ML.

Note that the file is not downloaded to Azure ML. It still resides at the source. Azure ML provides a uniform file path to access this data, as you will see later. 

Confirm the data asset is registered by visiting the "Data" page of your Azure ML workspace.

In [6]:
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes


### Pipeline Components
We create the following pipeline components:

1. Data preparation
2. Model training

Each component consists of one or more python files. The files for each component are hosted in separate folders.

For the sake of this demo, we create component 1 using pure python, and component 2 using python + `yml` file.
You can use whichever you like. Also, there is a third way (`yml`+ az cli), which we'll not discuss here.



In [7]:
import os

data_prep_src_dir = "./05-pipeline/components/01_data_prep"
os.makedirs(data_prep_src_dir, exist_ok=True)

train_src_dir = "./05-pipeline/components/02_train"
os.makedirs(train_src_dir, exist_ok=True)

#### Writing data_prep file

In [24]:
%%writefile {data_prep_src_dir}/data_prep.py
import os
import argparse
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
import logging
import mlflow


def main():
    """Main function of the script."""

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--data", type=str, help="path to input data")
    parser.add_argument("--test_train_ratio", type=float, required=False, default=0.25)
    parser.add_argument("--train_data", type=str, help="path to train data")
    parser.add_argument("--test_data", type=str, help="path to test data")
    args = parser.parse_args()

    # Start Logging
    mlflow.start_run()

    print(" ".join(f"{k}={v}" for k, v in vars(args).items()))

    print("input data:", args.data)

    df = pd.read_csv(args.data, index_col=0)


      
    df['hr_sin'] =  df['hr'].apply(lambda x: np.sin(2 * np.pi * x / 24))
    df['hr_cos'] =  df['hr'].apply(lambda x: np.cos(2 * np.pi * x / 24))
    df['temp2'] = (df['temp'] - 0.6)**2
    # X = df[['hr', 'hr_sin', 'hr_cos', 'temp', 'temp2', 'hum', 'windspeed','workingday', 'weathersit' ]]
    # y = df['cnt']

    df = df[['hr', 'hr_sin', 'hr_cos', 'temp', 'temp2', 'hum', 'windspeed','workingday', 'weathersit', 'hr_sin','hr_cos','temp2','cnt']]

    mlflow.log_metric("num_samples", df.shape[0])
    mlflow.log_metric("num_features", df.shape[1] - 1)


    train_df, test_df = train_test_split(
        df,
        test_size=args.test_train_ratio,
    )

    # output paths are mounted as folder, therefore, we are adding a filename to the path
    train_df.to_csv(os.path.join(args.train_data, "data.csv"), index=False)

    test_df.to_csv(os.path.join(args.test_data, "data.csv"), index=False)

    # Stop Logging
    mlflow.end_run()


if __name__ == "__main__":
    main()

Overwriting ./05-pipeline/components/01_data_prep/data_prep.py


#### Writing train file

In [25]:
%%writefile {train_src_dir}/train.py
import argparse
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import classification_report, mean_squared_error, r2_score
import os
import pandas as pd
import mlflow
import numpy as np 


def select_first_file(path):
    """Selects first file in folder, use under assumption there is only one file in folder
    Args:
        path (str): path to directory or file to choose
    Returns:
        str: full path of selected file
    """
    files = os.listdir(path)
    return os.path.join(path, files[0])


# Start Logging
mlflow.start_run()

# enable autologging
mlflow.sklearn.autolog()

os.makedirs("./outputs", exist_ok=True)


def main():
    """Main function of the script."""

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--train_data", type=str, help="path to train data")
    parser.add_argument("--test_data", type=str, help="path to test data")
    parser.add_argument("--n_estimators", required=False, default=100, type=int)
    parser.add_argument("--learning_rate", required=False, default=0.1, type=float)
    parser.add_argument("--registered_model_name", type=str, help="model name")
    parser.add_argument("--model", type=str, help="path to model file")
    args = parser.parse_args()

    # paths are mounted as folder, therefore, we are selecting the file from folder
    train_df = pd.read_csv(select_first_file(args.train_data))
    test_df = pd.read_csv(select_first_file(args.test_data))

    y_train = train_df.pop('cnt')
    y_test = test_df.pop('cnt')
    X_train = train_df.values
    X_test = test_df.values

    print(f"Training with data of shape {X_train.shape}")

    # clf = GradientBoostingRegressor( n_estimators=n_estimators, learning_rate=learning_rate)
    clf = RandomForestRegressor( n_estimators=args.n_estimators, )
    clf.fit(X_train, y_train)
    y_pred = clf.predict(X_test)
    
    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    mlflow.log_metric('rmse',rmse)
    
    mlflow.log_metric('r2_score',r2_score(y_test, y_pred))

    # Registering the model to the workspace
    print("Registering the model via MLFlow")
    mlflow.sklearn.log_model(
        sk_model=clf,
        registered_model_name=args.registered_model_name,
        artifact_path=args.registered_model_name,
    )

    # Saving the model to a file
    mlflow.sklearn.save_model(
        sk_model=clf,
        path=os.path.join(args.model, "trained_model"),
    )

    # Stop Logging
    mlflow.end_run()


if __name__ == "__main__":
    main()

Overwriting ./05-pipeline/components/02_train/train.py


#### Writing train component `yml` file

In [26]:
%%writefile {train_src_dir}/train.yml
# <component>
name: train_bike_share_model
display_name: 'Bike Share Pipeline - Train Model'
# version: 1 # Not specifying a version will automatically update the version
type: command
inputs:
  train_data: 
    type: uri_folder
  test_data: 
    type: uri_folder
  learning_rate:
    type: number
  n_estimators:
    type: number   
  registered_model_name:
    type: string
outputs:
  model:
    type: uri_folder
code: .
environment:
  # for this step, we'll use an AzureML curated environment
  azureml:AzureML-sklearn-1.0-ubuntu20.04-py38-cpu:1
command: >-
  python train.py 
  --train_data ${{inputs.train_data}} 
  --test_data ${{inputs.test_data}} 
  --learning_rate ${{inputs.learning_rate}}
  --registered_model_name ${{inputs.registered_model_name}} 
  --model ${{outputs.model}}
version: 1.0.4
# </component>

Overwriting ./05-pipeline/components/02_train/train.yml


#### Creating and Registering Components

In [27]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

DataPrepComponent = command(
    name="bike_share_data_prep",
    display_name="Bike Share Pipeline - Data preparation",
    description="reads a csv input, split the input to train and test",
    inputs={
        "data": Input(type="uri_folder"),
        "test_train_ratio": Input(type="number", default=0.3),
    },
    outputs={
        "train_data": Output(type="uri_folder", mode="rw_mount"),
        "test_data": Output(type="uri_folder", mode="rw_mount"),
    },
    # The source folder of the component
    code=data_prep_src_dir,
    command="""python data_prep.py \
            --data ${{inputs.data}} --test_train_ratio ${{inputs.test_train_ratio}} \
            --train_data ${{outputs.train_data}} --test_data ${{outputs.test_data}} \
            """,
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
    version="2.0.6",
).component # `command` is a component builder, to get the actual component, you must use command(...).component


# importing the Component Package
from azure.ai.ml import load_component

# Loading the component from the yml file
TrainComponent = load_component(source=os.path.join(train_src_dir, "train.yml"))

In [28]:
# Now we register the components to the workspace
DataPrepComponent = ml_client.create_or_update(DataPrepComponent)
TrainComponent = ml_client.create_or_update(TrainComponent)

for c in [DataPrepComponent, TrainComponent]:
    print(f"Component {c.name} with Version {c.version} is registered")

[32mUploading 01_data_prep (0.0 MBs):   0%|          | 0/2112 [00:00<?, ?it/s][32mUploading 01_data_prep (0.0 MBs): 100%|██████████| 2112/2112 [00:00<00:00, 70334.11it/s]
[39m



Component bike_share_data_prep with Version 2.0.6 is registered
Component train_bike_share_model with Version 1.0.4 is registered


### Pipelines 

Now that components are ready, we can create a pipeline. 

A pipeline consists of steps (jobs). Each job is an instance of a component. 

For instance, you can create a component, `Component1`, which takes two numbers `a` and `b` and returns their sum. 

```py
Component1 = command(
    inputs={
        "a": Input(type="number"),
        "b": Input(type="number"),
    },
    outputs={
        "result": Output(type="number"),
    },
    command="python calc.py -a ${{inputs.a}} -b ${{inputs.b}}", ...
)
```

You can create jobs from this component, by calling the component and providing the inputs:
```py
job1 = Component1(a=5, b=6)
job2 = Component1(a=7, b=8)
```

You can then create a pipeline that consists of these two jobs.

```py
@dsl.pipeline(compute=cpu_compute_target)
def MyPipeline():
    job1 = Component1(a=5, b=6)
    job2 = Component1(a=7, b=8)
    return {
        "job1": job1.outputs.result,
        "job2": job2.outputs.result,
    }
```

In this pipeline, jobs `job1` and `job2` are independent and can be executed in parallel.

But you can get more creative. Let's say, your pipeline consists of `job1` that adds `5` and `6`. And `job2` that adds result of `job1` with `9`.

```py
@dsl.pipeline(compute=cpu_compute_target)
def MyPipeline():
    job1 = Component1(a=5, b=6)
    job2 = Component1(a=job1.outputs.result, b=8)
    return {
        "job1": job1.outputs.result,
        "job2": job2.outputs.result,
    }
```

If your initial values must be customized per pipeline, you can modify the above code to:
```py
@dsl.pipeline(compute=cpu_compute_target)
def MyPipeline(n1, n2, n3):
    job1 = Component1(a=n1, b=n2)
    job2 = Component1(a=job1.outputs.result, b=n3)
    return {
        "job1": job1.outputs.result,
        "job2": job2.outputs.result,
    }
```

Finally to submit a pipeline for execution, create an instance of the pipeline, with your inputs:
```py
my_pipeline = MyPipeline(5,6,8)
my_pipeline_job = ml_client.jobs.create_or_update(my_pipeline,...)
```


### Create Pipeline

In [29]:
# the dsl decorator tells the sdk that we are defining an Azure ML pipeline
from azure.ai.ml import dsl, Input, Output


@dsl.pipeline(
    compute=cpu_compute_target,
    description="Pipeline: Bike Share",
)
def BikeSharePipeline(
    pipeline_job_data_input,
    pipeline_job_test_train_ratio,
    pipeline_job_learning_rate,
    pipeline_job_n_estimators,
    pipeline_job_registered_model_name,
):
    data_prep_job = DataPrepComponent(
        data=pipeline_job_data_input,
        test_train_ratio=pipeline_job_test_train_ratio,
    )

    train_job = TrainComponent(
        train_data=data_prep_job.outputs.train_data, 
        test_data=data_prep_job.outputs.test_data, 
        learning_rate=pipeline_job_learning_rate, 
        n_estimators=pipeline_job_n_estimators,
        registered_model_name=pipeline_job_registered_model_name,
    )

    # a pipeline returns a dictionary of outputs
    # keys will code for the pipeline output identifier
    return {
        "pipeline_job_train_data": data_prep_job.outputs.train_data,
        "pipeline_job_test_data": data_prep_job.outputs.test_data,
    }

### Instantiate and Submit
Create a pipeline instance and submit it.

After the job is submitted, you can check the status by going to the Experiments page of Azure ML, or to the printed link.

In [30]:
registered_model_name = "pipeline_bike_share_model"

pipeline = BikeSharePipeline(
    pipeline_job_data_input=Input(type="uri_file", path='azureml:hourly:1'),
    pipeline_job_test_train_ratio=0.25,
    pipeline_job_learning_rate=0.05,
    pipeline_job_n_estimators=80,
    pipeline_job_registered_model_name=registered_model_name,
)



# submit the pipeline job
pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    experiment_name="bike-share-exp",
)
print(pipeline_job.studio_url)

https://ml.azure.com/runs/affable_oxygen_qn0rf6p1dv?wsid=/subscriptions/49b5441f-dda4-47a9-81c4-13272430f4ff/resourcegroups/rg-pooya120-dev/workspaces/amlwpooya120dev&tid=16b3c013-d300-468d-ac64-7eda0820b6d3


### Pipeline with Hyper-parameter Sweep

In [15]:
# the dsl decorator tells the sdk that we are defining an Azure ML pipeline
from azure.ai.ml import dsl, Input, Output
from azure.ai.ml.sweep import Uniform, Choice

@dsl.pipeline(
    compute=cpu_compute_target,
    description="Bike Share Pipeline: Hyper Param Sweep",
)
def CreditDefaultsPipelineWithSweep(
    pipeline_job_data_input,
    pipeline_job_test_train_ratio,
    pipeline_job_registered_model_name,
):
    data_prep_job = DataPrepComponent(
        data=pipeline_job_data_input,
        test_train_ratio=pipeline_job_test_train_ratio,
    )

    train_job = TrainComponent(
        train_data=data_prep_job.outputs.train_data, 
        test_data=data_prep_job.outputs.test_data, 
        learning_rate=0.1, 
        n_estimators=Choice([10,20,50,100,150]), 
        registered_model_name=pipeline_job_registered_model_name,
    )

    sweep_step = train_job.sweep(
        primary_metric="training_f1_score",
        goal="minimize",
        sampling_algorithm="random",
        compute="cpu-cluster",
    )
    sweep_step.set_limits(max_total_trials=5, max_concurrent_trials=5, timeout=7200)


    # a pipeline returns a dictionary of outputs
    # keys will code for the pipeline output identifier
    return {
        "sweep_result": sweep_step.outputs.model
    }

In [16]:
registered_model_name = "pipeline_credit_defaults_model_sweep"

pipeline = CreditDefaultsPipelineWithSweep(
    pipeline_job_data_input=Input(type="uri_file", path='azureml:hourly:1'),
    pipeline_job_test_train_ratio=0.25,
    pipeline_job_registered_model_name=registered_model_name,
)

# submit the pipeline job
pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    experiment_name="bike-share-exp",
)
print(pipeline_job.studio_url)

https://ml.azure.com/runs/crimson_peach_77fj9t0837?wsid=/subscriptions/49b5441f-dda4-47a9-81c4-13272430f4ff/resourcegroups/rg-pooya120-dev/workspaces/amlwpooya120dev&tid=16b3c013-d300-468d-ac64-7eda0820b6d3


### Next Steps
If you are interested in deploying this model as an endpoint, [click here](https://github.com/Azure/azureml-examples/blob/main/tutorials/e2e-ds-experience/e2e-ml-workflow.ipynb).