In [None]:
import pandas as pd
from azure.identity import DefaultAzureCredential
from azure.ai.ml import MLClient, dsl, Input, Output, command, spark
from azure.ai.ml.dsl import pipeline
from azure.ai.ml.entities import AmlCompute, UserIdentityConfiguration
from azure.ai.ml.constants import InputOutputModes
import os 
from dotenv import load_dotenv

# load the environment variables from .env
load_dotenv()

# authenticate
credential = DefaultAzureCredential()

# Get a handle to the workspace
ml_client = MLClient(
    credential=credential,
    subscription_id = os.environ.get('AZURE_SUBSCRIPTION_ID'),
    resource_group_name = os.environ.get('AZURE_RESOURCE_GROUP'),
    workspace_name = os.environ.get('AZURE_WORKSPACE'),
)


In [None]:

# Read sample data from the repository
df = pd.read_parquet('../data/policies.parquet')

# write to blob storage
container = os.environ.get('BLOB_CONTAINER')
storage_account_name = os.environ.get('STORAGE_ACCOUNT_NAME')
storage_account_key = os.environ.get('STORAGE_ACCOUNT_KEY')

file_uri = f'abfs://{container}@{storage_account_name}.dfs.core.windows.net'

#df.to_parquet(file_uri+'/policies.pq', 
              #engine="pyarrow", 
              #storage_options = {'account_key' : storage_account_key})

In [None]:
# Name assigned to the compute cluster
cpu_compute_target = "dev-cluster"

try:
    # let's see if the compute target already exists
    cpu_cluster = ml_client.compute.get(cpu_compute_target)
    print(
        f"You already have a cluster named {cpu_compute_target}, we'll reuse it as is."
    )

except Exception:
    print("Creating a new cpu compute target...")

    # Let's create the Azure Machine Learning compute object with the intended parameters
    cpu_cluster = AmlCompute(
        name=cpu_compute_target,
        # Azure Machine Learning Compute is the on-demand VM service
        type="amlcompute",
        # VM Family
        size="STANDARD_DS3_V2",
        # Minimum running nodes when there is no job running
        min_instances=0,
        # Nodes in cluster
        max_instances=4,
        # How many seconds will the node running after the job termination
        idle_time_before_scale_down=180,
        # Dedicated or LowPriority. The latter is cheaper but there is a chance of job termination
        tier="Dedicated",
    )
    print(
        f"AMLCompute with name {cpu_cluster.name} will be created, with compute size {cpu_cluster.size}"
    )
    # Now, we pass the object to MLClient's create_or_update method
    cpu_cluster = ml_client.compute.begin_create_or_update(cpu_cluster)

In [None]:
spark_data_engineering = spark(
    name="data_engineering_spark",
    inputs={
        "raw_data": Input(type="uri_file", mode="direct"),
    },
    outputs={
        "training_data": Output(type="uri_file", mode="direct"),
    },
    # The source folder of the component
    code="../src",
    entry={"file": "spark_feature_eng.py"},
    driver_cores=2,
    driver_memory="8g",
    executor_cores=2,
    executor_memory="8g",
    executor_instances=2,
    args="--raw_data ${{inputs.raw_data}} --training_data ${{outputs.training_data}}", 
)

model_training = command(
    name="model_training",
    display_name="Model training and registration",
    inputs={
        "data": Input(type="uri_file"),
        "test_train_ratio": Input(type="number", default=0.2),
        "n_estimators": Input(type="number", default=100),
        "learning_rate": Input(type="number", default=0.1),
        "registered_model_name": Input(type="string")
    },
    code='../src',
    command="""python train.py \
            --data ${{inputs.data}} 
            --test_train_ratio ${{inputs.test_train_ratio}}
            --n_estimators ${{inputs.n_estimators}}
            --learning_rate ${{inputs.learning_rate}}
            --registered_model_name ${{inputs.registered_model_name}}
            """,
    environment = "azureml://registries/azureml/environments/sklearn-1.0/labels/latest"
)

In [None]:
@dsl.pipeline(
    description="Model training pipeline for claims prediction",
)
def training_pipeline(spark_input_data):
    spark_step = spark_data_engineering(raw_data=spark_input_data)
    spark_step.inputs.raw_data.mode = InputOutputModes.DIRECT
    spark_step.outputs.training_data = Output(
        type="uri_file",
        path=f'abfss://{container}@{storage_account_name}.dfs.core.windows.net/training_dataset',
    )
    spark_step.outputs.training_data.mode = InputOutputModes.DIRECT
    spark_step.identity = UserIdentityConfiguration()
    spark_step.resources = {
        "instance_type": "standard_e16s_v3",
        "runtime_version": "3.3",
    }
    training_step = model_training(data=spark_step.outputs.training_data,
                                   registered_model_name="claims_prediction")
    training_step.compute=cpu_compute_target
    


pipeline = training_pipeline(
    spark_input_data=Input(
        type="uri_file",
        path=f'abfss://{container}@{storage_account_name}.dfs.core.windows.net/policies.pq',
    )
)

In [None]:
# submit the pipeline job
pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    # Project's name
    experiment_name="claims_training_pipeline",
)

In [None]:
# Wait until the job completes
ml_client.jobs.stream(pipeline_job.name)