In [1]:
from azure.ai.ml import MLClient
from azure.identity import DefaultAzureCredential

# authenticate
credential = DefaultAzureCredential()

SUBSCRIPTION="bc3b3ce8-f4bb-4520-b098-ab59eb6b957e"
RESOURCE_GROUP="lv.developer-rg"
WS_NAME="iu_workspace"
# get a handle to the workspace
ml_client = MLClient(
    credential=credential,
    subscription_id=SUBSCRIPTION,
    resource_group_name=RESOURCE_GROUP,
    workspace_name=WS_NAME,
)

In [2]:
# verify that the handle works correctly
ws = ml_client.workspaces.get(WS_NAME)
print(ws.location,":", ws.resource_group)

eastus2 : lv.developer-rg


In [3]:
# get a handle of the data asset and print URI
chicago_crime_data = ml_client.data.get(name="chicago-crime", version="initial")
print(f"Data asset URI: {chicago_crime_data.path}")

Data asset URI: azureml://subscriptions/bc3b3ce8-f4bb-4520-b098-ab59eb6b957e/resourcegroups/lv.developer-rg/workspaces/iu_workspace/datastores/workspaceblobstore/paths/LocalUpload/6663458963178a7f00c5dfcc8f95824b/Crimes_-_2001_to_Present.csv


In [4]:
import os

dependencies_dir = "./dependencies"
os.makedirs(dependencies_dir, exist_ok=True)

In [5]:
%%writefile {dependencies_dir}/conda.yaml
name: model-env
channels:
  - conda-forge
dependencies:
  - python=3.8
  - numpy=1.21.2
  - pip=21.2.4
  - scikit-learn=0.24.2
  - scipy=1.7.1
  - pandas>=1.1,<1.2
  - pip:
    - inference-schema[numpy-support]==1.3.0
    - xlrd==2.0.1
    - mlflow== 2.4.1
    - azureml-mlflow==1.51.0

Overwriting ./dependencies/conda.yaml


In [6]:
from azure.ai.ml.entities import Environment

custom_env_name = "aml-scikit-learn"

pipeline_job_env = Environment(
    name=custom_env_name,
    description="Custom environment for Credit Card Defaults pipeline",
    tags={"scikit-learn": "0.24.2"},
    conda_file=os.path.join(dependencies_dir, "conda.yaml"),
    image="mcr.microsoft.com/azureml/openmpi4.1.0-ubuntu20.04:latest",
    version="0.2.0",
)
pipeline_job_env = ml_client.environments.create_or_update(pipeline_job_env)

print(
    f"Environment with name {pipeline_job_env.name} is registered to workspace, the environment version is {pipeline_job_env.version}"
)

Environment with name aml-scikit-learn is registered to workspace, the environment version is 0.2.0


In [7]:
import os

data_preprocessing_src_dir = "./components/data_preprocessing"
os.makedirs(data_preprocessing_src_dir, exist_ok=True)

In [8]:
%%writefile {data_preprocessing_src_dir}/data_preprocessing.py
import os
import argparse
import pandas as pd
import logging
import mlflow


def main():
    """Main function of the script."""

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--raw_data", type=str, help="path to input data")
    parser.add_argument("--preprocessed_data", type=str, help="path to output data")
    args = parser.parse_args()

    # start Logging
    mlflow.start_run()

    print(" ".join(f"{k}={v}" for k, v in vars(args).items()))

    print("input data:", args.raw_data)

    crime_df = pd.read_csv(args.raw_data)
    
    mlflow.log_metric("num_samples_original_data:", crime_df.shape[0])
    mlflow.log_metric("num_features_original_data", crime_df.shape[1])
    
    crime_df.drop(
        [
            'Case Number',
            'Ward',
            'Community Area',
            'District',
            'Location Description',
            'Description',
            'Location',
            'Block',
            'IUCR',
            'Beat',
            'FBI Code',
            'ID'
        ],
        axis=1,
        inplace=True
    )
    crime_df = crime_df[~crime_df['X Coordinate'].isna()]
    
    mlflow.log_metric("num_samples_preprocessed_data:", crime_df.shape[0])
    mlflow.log_metric("num_features_preprocessed_data", crime_df.shape[1])

    # output path mounted as folder -> add filename to the path
    crime_df.to_csv(os.path.join(args.preprocessed_data, "crime_data_preprocessed.csv"), index=False)

    # stop Logging
    mlflow.end_run()


if __name__ == "__main__":
    main()

Overwriting ./components/data_preprocessing/data_preprocessing.py


In [9]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

data_preprocessing_component = command(
    name="data_preprocessing_chicago_crime",
    display_name="Data preprocessing",
    description="remove unecessary columns",
    inputs={
        "raw_data": Input(type="uri_folder"),
    },
    outputs=dict(
        preprocessed_data=Output(type="uri_folder", mode="rw_mount"),
    ),
    # source folder of the component
    code=data_preprocessing_src_dir,
    command="""python data_preprocessing.py \
            --raw_data ${{inputs.raw_data}} \
            --preprocessed_data ${{outputs.preprocessed_data}} \
            """,
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
)

In [10]:
# register the component to the workspace
data_preprocessing_component = ml_client.create_or_update(data_preprocessing_component.component)

print(
    f"Component {data_preprocessing_component.name} with Version {data_preprocessing_component.version} is registered"
)

[32mUploading data_preprocessing (0.0 MBs): 100%|██████████| 3125/3125 [00:00<00:00, 42482.60it/s]
[39m



Component data_preprocessing_chicago_crime with Version 2024-04-30-19-59-40-8716971 is registered


In [11]:
import os

feature_engineering_src_dir = "./components/feature_engineering"
os.makedirs(feature_engineering_src_dir, exist_ok=True)

In [12]:
%%writefile {feature_engineering_src_dir}/feature_engineering.py
import os
import argparse
import pandas as pd
from sklearn import preprocessing
from sklearn.model_selection import train_test_split
import logging
import mlflow

def select_first_file(path):
    """Selects first file in folder, use under assumption there is only one file in folder
    Args:
        path (str): path to directory or file to choose
    Returns:
        str: full path of selected file
    """
    files = os.listdir(path)
    return os.path.join(path, files[0])


# start Logging
mlflow.start_run()


def main():
    """Main function of the script."""

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--preprocessed_data", type=str, help="path to input data")
    parser.add_argument("--train_data", type=str, help="path to train data")
    parser.add_argument("--test_data", type=str, help="path to test data")
    args = parser.parse_args()

    print(" ".join(f"{k}={v}" for k, v in vars(args).items()))

    print("input data:", args.preprocessed_data)
    
    crime_df = pd.read_csv(select_first_file(args.preprocessed_data))
    
    print(f"log 1 - crime_df.shape: {crime_df.shape}")
    print(f"log 2 - crime_df.columns: {crime_df.columns}")

    mlflow.log_metric("num_samples_preprocessed_data:", crime_df.shape[0])
    mlflow.log_metric("num_features_preprocessed_data", crime_df.shape[1])
    
    crime_df['Date'] = crime_df['Date'].str[0:10]
    crime_df['Datetime'] = pd.to_datetime(crime_df['Date'], format='%m/%d/%Y')
    crime_df['Day Name'] = crime_df['Datetime'].dt.day_name()
    
    crime_df['Day'] = crime_df['Date'].str[3:5]
    crime_df['Month'] = crime_df['Date'].str[0:2]
    
    crime_df['Updated On Day'] = crime_df['Updated On'].str[3:5]
    crime_df['Updated On Month'] = crime_df['Updated On'].str[0:2]
    crime_df['Updated On Year'] = crime_df['Updated On'].str[6:10]
    
    crime_df.drop(
        [
            'Date',
            'Updated On',
            'Datetime',
        ],
        axis=1,
        inplace=True
    )
    
    mlflow.log_metric("num_samples_feature_engineered_data:", crime_df.shape[0])
    mlflow.log_metric("num_features_feature_engineered_data", crime_df.shape[1])
    
    label_encoder = preprocessing.LabelEncoder()
    label_encoder.fit(crime_df['Primary Type'])
    crime_df['Primary Type Cat'] = label_encoder.transform(crime_df['Primary Type'])
    label_encoder.fit(crime_df['Arrest'])
    crime_df['Arrest Cat'] = label_encoder.transform(crime_df['Arrest'])
    label_encoder.fit(crime_df['Domestic'])
    crime_df['Domestic Cat'] = label_encoder.transform(crime_df['Domestic'])
    label_encoder.fit(crime_df['Day Name'])
    crime_df['Day Name Cat'] = label_encoder.transform(crime_df['Day Name'])
    
    df_hotencoded = crime_df[
        [
            'Primary Type Cat',
            'Domestic Cat',
            'Day Name Cat',
            'Day',
            'Month',
            'Year',
            'Updated On Day',
            'Updated On Month',
            'Updated On Year',
            'X Coordinate',
            'Y Coordinate',
            'Latitude',
            'Longitude',
            'Arrest Cat'
        ]
    ]
    
    mlflow.log_metric("num_samples_hotencoded_data:", crime_df.shape[0])
    mlflow.log_metric("num_features_hotencoded_data", crime_df.shape[1])
    
    train_df, test_df = train_test_split(
        df_hotencoded,
        test_size=0.2,
    )

    # output paths mounted as folder -> add filename to the path
    train_df.to_csv(os.path.join(args.train_data, "training_data.csv"), index=False)
    test_df.to_csv(os.path.join(args.test_data, "testing_data.csv"), index=False)

    # Stop Logging
    mlflow.end_run()


if __name__ == "__main__":
    main()

Overwriting ./components/feature_engineering/feature_engineering.py


In [13]:
from azure.ai.ml import command
from azure.ai.ml import Input, Output

feature_engineering_component = command(
    name="feature_engineering_chicago_crime",
    display_name="Feature engineering",
    description="create new relevant attributes",
    inputs={
        "preprocessed_data": Input(type="uri_folder"),
    },
    outputs=dict(
        train_data=Output(type="uri_folder", mode="rw_mount"),
        test_data=Output(type="uri_folder", mode="rw_mount"),
    ),
    # source folder of the component
    code=feature_engineering_src_dir,
    command="""python feature_engineering.py \
            --preprocessed_data ${{inputs.preprocessed_data}} \
            --train_data ${{outputs.train_data}} --test_data ${{outputs.test_data}} \
            """,
    environment=f"{pipeline_job_env.name}:{pipeline_job_env.version}",
)

In [14]:
# register the component to the workspace
feature_engineering_component = ml_client.create_or_update(feature_engineering_component.component)

print(
    f"Component {feature_engineering_component.name} with Version {feature_engineering_component.version} is registered"
)

[32mUploading feature_engineering (0.01 MBs): 100%|██████████| 5584/5584 [00:00<00:00, 74088.00it/s]
[39m



Component feature_engineering_chicago_crime with Version 2024-04-30-19-59-43-0964239 is registered


In [15]:
import os

train_src_dir = "./components/train"
os.makedirs(train_src_dir, exist_ok=True)

In [16]:
%%writefile {train_src_dir}/train.py
import argparse
from sklearn.preprocessing import StandardScaler
from sklearn.svm import LinearSVC
from sklearn.metrics import classification_report
import os
import pandas as pd
import mlflow


def select_first_file(path):
    """Selects first file in folder, use under assumption there is only one file in folder
    Args:
        path (str): path to directory or file to choose
    Returns:
        str: full path of selected file
    """
    files = os.listdir(path)
    return os.path.join(path, files[0])


# start Logging
mlflow.start_run()

# enable autologging
mlflow.sklearn.autolog()

os.makedirs("./outputs", exist_ok=True)


def main():
    """Main function of the script."""

    # input and output arguments
    parser = argparse.ArgumentParser()
    parser.add_argument("--train_data", type=str, help="path to train data")
    parser.add_argument("--test_data", type=str, help="path to test data")
    parser.add_argument("--registered_model_name", type=str, help="model name")
    parser.add_argument("--model", type=str, help="path to model file")
    args = parser.parse_args()

    # paths mounted as folder -> select file from folder
    train_df = pd.read_csv(select_first_file(args.train_data))
    test_df = pd.read_csv(select_first_file(args.test_data))

    X_train = train_df[
        [
            'Primary Type Cat',
            'Domestic Cat',
            'Day Name Cat',
            'Day',
            'Month',
            'Year',
            'Updated On Day',
            'Updated On Month',
            'Updated On Year',
            'X Coordinate',
            'Y Coordinate',
            'Latitude',
            'Longitude',
        ]
    ].to_numpy()
    
    y_train = train_df[
        [
           'Arrest Cat' 
        ]
    ].to_numpy()
    
    X_test = test_df[
        [
           'Primary Type Cat',
            'Domestic Cat',
            'Day Name Cat',
            'Day',
            'Month',
            'Year',
            'Updated On Day',
            'Updated On Month',
            'Updated On Year',
            'X Coordinate',
            'Y Coordinate',
            'Latitude',
            'Longitude',
        ]
    ].to_numpy()

    y_test = test_df[
        [
           'Arrest Cat' 
        ]
    ].to_numpy()
    
    y_train = y_train.ravel()
    y_test = y_test.ravel()
    
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.transform(X_test)
    
    
    svm = LinearSVC(C=0.0001)
    svm.fit(X_train, y_train)

    y_pred = svm.predict(X_test)

    print(classification_report(y_test, y_pred))

    # registering the model to the workspace
    print("Registering the model via MLFlow")
    mlflow.sklearn.log_model(
        sk_model=svm,
        registered_model_name=args.registered_model_name,
        artifact_path=args.registered_model_name,
    )

    # saving the model to a file
    mlflow.sklearn.save_model(
        sk_model=svm,
        path=os.path.join(args.model, "trained_model"),
    )

    # stop Logging
    mlflow.end_run()


if __name__ == "__main__":
    main()

Overwriting ./components/train/train.py


In [17]:
%%writefile {train_src_dir}/train.yml
# <component>
name: train_chicago_crime_model
display_name: Train Chicago Crime Model
# version: 1 # Not specifying a version will automatically update the version
type: command
inputs:
  train_data: 
    type: uri_folder
  test_data: 
    type: uri_folder    
  registered_model_name:
    type: string
outputs:
  model:
    type: uri_folder
code: .
environment:
  # use an AzureML curate environment
  azureml:AzureML-sklearn-1.0-ubuntu20.04-py38-cpu:1
command: >-
  python train.py 
  --train_data ${{inputs.train_data}} 
  --test_data ${{inputs.test_data}} 
  --registered_model_name ${{inputs.registered_model_name}} 
  --model ${{outputs.model}}
# </component>

Overwriting ./components/train/train.yml


In [18]:
# importing the Component Package
from azure.ai.ml import load_component

# loading the component from the yml file
train_component = load_component(source=os.path.join(train_src_dir, "train.yml"))

# register component to the workspace
train_component = ml_client.create_or_update(train_component)

print(
    f"Component {train_component.name} with Version {train_component.version} is registered"
)

[32mUploading train (0.0 MBs): 100%|██████████| 3760/3760 [00:00<00:00, 60618.32it/s]
[39m



Component train_chicago_crime_model with Version 2024-04-30-19-59-45-3124006 is registered


In [19]:
from azure.ai.ml import dsl, Input, Output

# dsl decorator tells the sdk that we are defining an Azure Machine Learning pipeline
@dsl.pipeline(
    compute="serverless",  # "serverless" value runs pipeline on serverless compute
    description="E2E data_perp-train pipeline",
)
def crime_chicago_pipeline(
    pipeline_job_data_preprocessing_input,
    pipeline_job_registered_model_name,
):
    # using data_preprocessing_function like a python call with its own inputs
    data_preprocessing_job = data_preprocessing_component(
        raw_data=pipeline_job_data_preprocessing_input,
    )
    
    # using feature_engineering_function like a python call with its own inputs and reading output of previous component
    feature_engineering_job = feature_engineering_component(
        preprocessed_data=data_preprocessing_job.outputs.preprocessed_data, # note: using outputs from previous step
    )

    # using train_func like a python call with its own inputs and reading output of previous component
    train_job = train_component(
        train_data=feature_engineering_job.outputs.train_data,  # note: using outputs from previous step
        test_data=feature_engineering_job.outputs.test_data,  # note: using outputs from previous step
        registered_model_name=pipeline_job_registered_model_name,
    )

    # pipeline returns a dictionary of outputs
    # keys will code for the pipeline output identifier
    return {
        "pipeline_job_train_data": feature_engineering_job.outputs.train_data,
        "pipeline_job_test_data": feature_engineering_job.outputs.test_data,
    }

In [20]:
registered_model_name = "chicago_crime_model"

# Let's instantiate the pipeline with the parameters of our choice
pipeline = crime_chicago_pipeline(
    pipeline_job_data_preprocessing_input=Input(type="uri_file", path=chicago_crime_data.path),
    pipeline_job_registered_model_name=registered_model_name,
)

In [21]:
# submit the pipeline job
pipeline_job = ml_client.jobs.create_or_update(
    pipeline,
    # Project's name
    experiment_name="e2e_registered_components",
)
ml_client.jobs.stream(pipeline_job.name)

Class AutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class AutoDeleteConditionSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseAutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class IntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class ProtectionLevelSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseIntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


RunId: gray_head_nq24vc4513
Web View: https://ml.azure.com/runs/gray_head_nq24vc4513?wsid=/subscriptions/bc3b3ce8-f4bb-4520-b098-ab59eb6b957e/resourcegroups/lv.developer-rg/workspaces/iu_workspace

Streaming logs/azureml/executionlogs.txt

[2024-04-30 19:59:50Z] Submitting 1 runs, first five are: 8d1b49c4:6026b606-7296-4f09-a452-9fc3e4d8db61
[2024-04-30 20:05:27Z] Completing processing run id 6026b606-7296-4f09-a452-9fc3e4d8db61.
[2024-04-30 20:05:27Z] Submitting 1 runs, first five are: 428442d0:e84c798b-f811-4222-9500-2000593054ef
[2024-04-30 20:12:09Z] Completing processing run id e84c798b-f811-4222-9500-2000593054ef.
[2024-04-30 20:12:09Z] Submitting 1 runs, first five are: d7962a9c:92703662-a09d-41b8-ae28-4e7f5a53a463
[2024-04-30 20:15:45Z] Completing processing run id 92703662-a09d-41b8-ae28-4e7f5a53a463.

Execution Summary
RunId: gray_head_nq24vc4513
Web View: https://ml.azure.com/runs/gray_head_nq24vc4513?wsid=/subscriptions/bc3b3ce8-f4bb-4520-b098-ab59eb6b957e/resourcegroups/lv