In [1]:
from azure.ai.ml import command, Input, MLClient, UserIdentityConfiguration, ManagedIdentityConfiguration
from azure.identity import DefaultAzureCredential
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes, InputOutputModes
from azure.ai.ml.dsl import pipeline
from dotenv import load_dotenv
import pandas as pd
import os

# specify the details of your subscription
SUBSCRIPTION_ID = "e5615bfe-b43b-41ce-bccb-b78867c2ce63"
RESOURCE_GROUP = "rg-dp100-demo-001"
WORKSPACE_NAME = "mlw-dp100-demo"
DATASTORE_NAME = "blobdatastore2"

# get a handle to the subscription
load_dotenv("python.env")

ml_client = MLClient(DefaultAzureCredential(), 
                     subscription_id=SUBSCRIPTION_ID, 
                     resource_group_name=RESOURCE_GROUP,
                     workspace_name=WORKSPACE_NAME)

In [2]:
envs = ml_client.environments.list()
# we can print the name and latest version of the environments
print([(env.name, env.latest_version) for env in envs])
env = ml_client.environments.get(name="titanic-env", version=2)

[('titanic-env', '5'), ('CliV2AnonymousEnvironment', '0'), ('pytorch-env', '1'), ('testenv-conda-002', '1'), ('testenv-conda', '1'), ('testenv', '1'), ('AzureML-AI-Studio-Development', '1'), ('AzureML-ACPT-pytorch-1.13-py38-cuda11.7-gpu', '10'), ('AzureML-ACPT-pytorch-1.12-py38-cuda11.6-gpu', '14'), ('AzureML-ACPT-pytorch-1.12-py39-cuda11.6-gpu', '14'), ('AzureML-ACPT-pytorch-1.11-py38-cuda11.5-gpu', '14'), ('AzureML-ACPT-pytorch-1.11-py38-cuda11.3-gpu', '17'), ('AzureML-responsibleai-0.21-ubuntu20.04-py38-cpu', '7'), ('AzureML-responsibleai-0.20-ubuntu20.04-py38-cpu', '9'), ('AzureML-tensorflow-2.5-ubuntu20.04-py38-cuda11-gpu', '27'), ('AzureML-tensorflow-2.6-ubuntu20.04-py38-cuda11-gpu', '26'), ('AzureML-tensorflow-2.7-ubuntu20.04-py38-cuda11-gpu', '26'), ('AzureML-sklearn-1.0-ubuntu20.04-py38-cpu', '36'), ('AzureML-pytorch-1.10-ubuntu18.04-py38-cuda11-gpu', '36'), ('AzureML-pytorch-1.9-ubuntu18.04-py37-cuda11-gpu', '44'), ('AzureML-pytorch-1.8-ubuntu18.04-py37-cuda11-gpu', '43'), ('

### 0. Try Reading the Data from Registered Datastore

- Main Reference: 
    - Using `command()`: https://learn.microsoft.com/en-us/azure/machine-learning/how-to-read-write-data-v2?view=azureml-api-2&tabs=python
    - Using Pandas: https://learn.microsoft.com/en-us/azure/machine-learning/tutorial-pipeline-python-sdk?view=azureml-api-2 
- The path to datastore must follow this format - note the "paths" constant: `azureml://datastores/<data_store_name>/paths/<subfolder/file.extension>`
- Note that the datatore was set up to connect to a specific container name, thus the container name is alrealdy treated as the root folder and not included in the path above. If the container name is specified, or any wrong path is provided, StreamNotFound error will be thrown, indicating that the data is not found with the (wrong) path.
- The compute target must be a compute cluster, else if using compute instance, the `UserError` of not being the owner of the compute will arise (unknown reason)
- For simplicity, if using a custom environment, use the `Environment` instance as the input to the command environment argument, instead of an address.

In [3]:
# # Method 1: with command - sending the reading of the file to the compute
# datastore_path = "azureml://datastores/blobdatastore2/paths/titanic_train.csv"
# "azureml://datastores/[a-zA-Z0-9_]+/paths/.*"
# data_type = AssetTypes.URI_FILE
# mode = InputOutputModes.RO_MOUNT
# identity = ManagedIdentityConfiguration()
# env = ml_client.environments.get(name="testenv", version=1)

# inputs = {
#     "input_data": Input(type=data_type, path=datastore_path, mode=mode)
# }
# # This command job uses the head Linux command to print the first 10 lines of the file
# job = command(
#     command="head ${{inputs.input_data}}",
#     inputs=inputs,
#     environment=env,
#     compute="vmcluster-ml-dev",
#     identity=identity,
# )
# # Submit the command
# ml_client.jobs.create_or_update(job)

In [4]:
# Method 2: we can directly read the data from the datastore using the long-form URI:
PATH = 'titanic.csv'
uri = f'azureml://subscriptions/{SUBSCRIPTION_ID}/resourcegroups/{RESOURCE_GROUP}/workspaces/{WORKSPACE_NAME}/datastores/{DATASTORE_NAME}/paths/{PATH}'
df = pd.read_csv(uri)
df

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.2500,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.9250,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1000,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.0500,,S
...,...,...,...,...,...,...,...,...,...,...,...,...
886,887,0,2,"Montvila, Rev. Juozas",male,27.0,0,0,211536,13.0000,,S
887,888,1,1,"Graham, Miss. Margaret Edith",female,19.0,0,0,112053,30.0000,B42,S
888,889,0,3,"Johnston, Miss. Catherine Helen ""Carrie""",female,,1,2,W./C. 6607,23.4500,,S
889,890,1,1,"Behr, Mr. Karl Howell",male,26.0,0,0,111369,30.0000,C148,C


### Getting the common env for all steps

In [5]:
pipeline_job_env = ml_client.environments.get(name="titanic-env", version=5)
f"{pipeline_job_env.name}:{pipeline_job_env.version}"

'titanic-env:5'

### 1. Creating the 1st Component - data_prep

In [6]:
from azure.ai.ml import Input, Output
from azure.ai.ml import command

data_prep_src_dir = "./dp100"
data_prep_component = command(name="data_prep_titanic_survival",
                              display_name="Data preparation for training",
                              description="reads input, split the input to train and test",

                              inputs={"data": Input(type="uri_folder"),
                                      "test_train_ratio": Input(type="number"),
                                     },

                              outputs={"train_data": Output(type="uri_folder", mode="rw_mount"),
                                        "test_data": Output(type="uri_folder", mode="rw_mount"),
                                      },

                              code=data_prep_src_dir,
                              
                              command="""python data_prep.py \
                                        --data ${{inputs.data}} --test_train_ratio ${{inputs.test_train_ratio}} \
                                        --train_data ${{outputs.train_data}} --test_data ${{outputs.test_data}} \
                                        """,
                              environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
                              )


In [7]:
#  Optional:
# Now we register the component to the workspace
data_prep_component = ml_client.create_or_update(data_prep_component.component)

# Create (register) the component in your workspace
print(
    f"Component {data_prep_component.name} with Version {data_prep_component.version} is registered"
)

[32mUploading dp100 (0.01 MBs): 100%|##########| 6515/6515 [00:00<00:00, 9861.54it/s] 
[39m



Component data_prep_titanic_survival with Version 2024-01-20-16-29-31-9840505 is registered


### 2. Create component 2: training (using yaml definition)

In [8]:
from azure.ai.ml import Input, Output
from azure.ai.ml import command


train_src_dir = "./dp100"
train_component = command(name = "train_model_titanic_survival",
                          display_name ="Training model",
                          description = "reads input, split the input to train and test",

                          inputs = {"train_data": Input(type="uri_folder"),
                                  "test_data": Input(type="uri_folder"),
                                  "C": Input(type="number"),
                                  "registered_model_name": Input(type="string")},

                          outputs = {"model": Output(type="uri_folder")},

    code = train_src_dir,

    command = """
                python train.py \
                --train_data ${{inputs.train_data}} \
                --test_data ${{inputs.test_data}} \
                --C ${{inputs.C}} \
                --registered_model_name ${{inputs.registered_model_name}} \
                --model ${{outputs.model}}
             """,

    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
)

In [9]:
#  Optional:
# Now we register the component to the workspace
train_component = ml_client.create_or_update(train_component.component)

# Create (register) the component in your workspace
print(f"Component {train_component.name} with Version {train_component.version} is registered")

Component train_model_titanic_survival with Version 2024-01-20-16-29-34-1196192 is registered


### 3. Create pipeline from the components

- `@dsl.pipeline` decorator identifies the subsequent function defines a Azure Machine Learning pipeline
- the pipeline function can return output ports. For more info about I/O ports of the components and the pipeline, refer to: https://learn.microsoft.com/en-us/azure/machine-learning/how-to-manage-inputs-outputs-pipeline?view=azureml-api-2&tabs=cli 

In [10]:
cluster = ml_client.compute.get("vmcluster-ml-dev")
cluster

AmlCompute({'type': 'amlcompute', 'created_on': None, 'provisioning_state': 'Succeeded', 'provisioning_errors': None, 'name': 'vmcluster-ml-dev', 'description': None, 'tags': None, 'properties': {}, 'print_as_yaml': True, 'id': '/subscriptions/e5615bfe-b43b-41ce-bccb-b78867c2ce63/resourceGroups/rg-dp100-demo-001/providers/Microsoft.MachineLearningServices/workspaces/mlw-dp100-demo/computes/vmcluster-ml-dev', 'Resource__source_path': None, 'base_path': 'd:\\Repositories\\GitHub\\dp-100', 'creation_context': None, 'serialize': <msrest.serialization.Serializer object at 0x000002259C67BE50>, 'resource_id': None, 'location': 'japaneast', 'size': 'STANDARD_D2_V3', 'min_instances': 0, 'max_instances': 2, 'idle_time_before_scale_down': 120.0, 'identity': None, 'ssh_public_access_enabled': False, 'ssh_settings': None, 'network_settings': None, 'tier': 'dedicated', 'enable_node_public_ip': True, 'subnet': None})

In [11]:
# the dsl decorator tells the sdk that we are defining an Azure Machine Learning pipeline
# the compute parameter specifies the compute target to run the pipeline on
# "serverless" if running on serverless compute, this is the default if not specified

from azure.ai.ml import dsl

@dsl.pipeline(
    compute=cluster,  
    description="[with project package] E2E dataprep-train pipeline",)

def titanic_survival_pipeline(pipeline_job_data_input,
                              pipeline_job_test_train_ratio,
                              pipeline_job_C,
                              pipeline_job_registered_model_name,):
    
    # calling the 2 components above in the correct order
    # the outputs of these components can be accessed as attributes of the component
    data_prep_job = data_prep_component(data=pipeline_job_data_input,
                                        test_train_ratio=pipeline_job_test_train_ratio,)

    train_job = train_component(train_data=data_prep_job.outputs.train_data, 
                                test_data=data_prep_job.outputs.test_data,
                                C=pipeline_job_C,
                                registered_model_name=pipeline_job_registered_model_name,)

    return {
        "pipeline_job_train_data": data_prep_job.outputs.train_data,
        "pipeline_job_test_data": data_prep_job.outputs.test_data,
        "pipeline_job_model": train_job.outputs.model,
    }

In [12]:
# get a handle of the data asset and print the URI
titanic_data = ml_client.data.get(name="titanic", version=1)
file_path = os.path.join(titanic_data.path, "titanic.csv")
print(f"Data asset file URI: {file_path}")

Data asset file URI: azureml://subscriptions/e5615bfe-b43b-41ce-bccb-b78867c2ce63/resourcegroups/rg-dp100-demo-001/workspaces/mlw-dp100-demo/datastores/blobdatastore2/paths/titanic.csv


In [13]:
registered_model_name = "titanic_survival_model_C_0.5"

# Let's instantiate the pipeline with the parameters of our choice
pipeline = titanic_survival_pipeline(
    pipeline_job_data_input=Input(type="uri_file", path=file_path),
    pipeline_job_test_train_ratio=0.2,
    pipeline_job_C=0.5,
    pipeline_job_registered_model_name=registered_model_name,
)

In [14]:
# submit the pipeline job
pipeline_job = ml_client.jobs.create_or_update(pipeline, experiment_name="titanic_survival_pipeline_on_cluster",)
ml_client.jobs.stream(pipeline_job.name)

Class AutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class AutoDeleteConditionSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseAutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class IntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class ProtectionLevelSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseIntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


RunId: wheat_beach_nbfwwr7xkj
Web View: https://ml.azure.com/runs/wheat_beach_nbfwwr7xkj?wsid=/subscriptions/e5615bfe-b43b-41ce-bccb-b78867c2ce63/resourcegroups/rg-dp100-demo-001/workspaces/mlw-dp100-demo

Streaming logs/azureml/executionlogs.txt

[2024-01-20 16:29:42Z] Submitting 1 runs, first five are: 61dd4ce7:4e8d1743-85cb-403b-b6ac-a5c918e39461
[2024-01-20 16:35:59Z] Completing processing run id 4e8d1743-85cb-403b-b6ac-a5c918e39461.
[2024-01-20 16:36:00Z] Submitting 1 runs, first five are: 30e6bde9:092ecb4f-2d70-4d51-bb30-e6c94b655c42
[2024-01-20 16:37:04Z] Completing processing run id 092ecb4f-2d70-4d51-bb30-e6c94b655c42.

Execution Summary
RunId: wheat_beach_nbfwwr7xkj
Web View: https://ml.azure.com/runs/wheat_beach_nbfwwr7xkj?wsid=/subscriptions/e5615bfe-b43b-41ce-bccb-b78867c2ce63/resourcegroups/rg-dp100-demo-001/workspaces/mlw-dp100-demo

