In [73]:
from azure.ai.ml import MLClient
from azure.ai.ml.entities import Workspace
import os
from azure.identity import DefaultAzureCredential
from azure.identity import AzureCliCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.ai.ml.entities import AmlCompute
from azure.ai.ml.entities import Environment
from azure.ai.ml import load_component
from azure.ai.ml import dsl, Input, Output
# from azureml.core import Workspace
# from azureml.core import Experiment

In [74]:
custom_env = 'mlflow-env'
dependencies_dir = './dependencies'
cpu_cluster_name = 'cpu-cluster-00'
workspace_name = 'de-mlops'
subscription_id = "Your Subscription ID"
resource_group = "MLOPS"
credential = AzureCliCredential()

# Creating Resource Group 

In [17]:
resource_client = ResourceManagementClient(credential, subscription_id)

In [18]:
rg_result = resource_client.resource_groups.create_or_update(
    resource_group,
    {
        "location": "eastus"
    }
)

In [19]:
print(f"Updated resource group {rg_result.name} with tags")

Updated resource group MLOPS with tags


# Creating workspace

In [5]:
ml_client = MLClient(credential, subscription_id, resource_group)

In [None]:
ws_basic = Workspace(
    name=workspace_name,
    location="eastus",
    display_name = workspace_name,
    description="DE Assignment",
    hbi_workspace=False,
#     tags=dict(purpose="demo"),
)

ws_basic = ml_client.workspaces.begin_create(ws_basic).result()
print(ws_basic)

# Create Pipeline

## Get Compute

In [79]:
ml_client = MLClient(
    credential=credential,
    subscription_id= subscription_id,
    resource_group_name= resource_group,
    workspace_name= workspace_name,
)

In [4]:
try:
    cpu_cluster = ml_client.compute.get(cpu_cluster_name) # checkin
    print("reused compute instance")
except:
    cpu_cluster = AmlCompute(
        name=cpu_cluster_name,
        type="amlcompute",
        size="STANDARD_DS3_v2",
        location="eastus",
        min_instances=0,
        max_instances=2,
        idle_time_before_scale_down=120,
    )
    cpu_cluster = ml_client.begin_create_or_update(cpu_cluster)
    print("New compute generated")

reused compute instance


## Creating environment

In [60]:
pipeline_job_env = Environment(
    name=custom_env,
    description="Custom environment for Credit Card Defaults pipeline",
    conda_file=os.path.join(dependencies_dir, "conda.yml"),
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
    version="0.1.7",
)
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

print(
    f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)

Environment with name mlflow-env is registered to workspace, the environment version is 0.1.7


## Getting Directory for training

In [61]:
import os

train_src_dir = "./components/train"
os.makedirs(train_src_dir, exist_ok=True)

In [62]:
train_component = load_component(source=os.path.join(train_src_dir, "train.yml"))

In [63]:
# Now we register the component to the workspace
train_component = ml_client.create_or_update(train_component)

# Create (register) the component in your workspace
print(
    f"Component {train_component.name} with Version {train_component.version} is registered"
)

[32mUploading train (1.28 MBs): 100%|##########| 1278182/1278182 [00:08<00:00, 156492.62it/s]
[39m



Component train_diabetes_model with Version 2023-11-05-09-14-17-3079252 is registered


In [64]:
# the dsl decorator tells the sdk that we are defining an Azure ML pipeline

@dsl.pipeline(
    compute="serverless",
    description="E2E data_perp-train pipeline",
)
def diabetes_pipeline(
    pipeline_job_data_input,
    pipeline_job_learning_rate,
    pipeline_job_registered_model_name,
):

    # using train_func like a python call with its own inputs
    train_job = train_component(
        training_data=pipeline_job_data_input,  # note: using outputs from previous step
        reg_rate=pipeline_job_learning_rate,  # note: using a pipeline input as parameter
        registered_model_name=pipeline_job_registered_model_name,
    )

    # a pipeline returns a dictionary of outputs
    # keys will code for the pipeline output identifier
    return {
        "pipeline_job_model": train_job.outputs.model,
    }

In [65]:
registered_model_name = "diabetes_model"
uri_path = 'URI path to csv file'
# Let's instantiate the pipeline with the parameters of our choice
pipeline = diabetes_pipeline(
    pipeline_job_data_input=Input(type="uri_file", path=uri_path),
    pipeline_job_learning_rate=0.1,
    pipeline_job_registered_model_name=registered_model_name,
)

## Running the Job

In [66]:
pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    # Project's name
    experiment_name="e2e_registered_components",
)

In [None]:
ml_client.jobs.stream(pipeline_job.name)