# 1. Pipeline

Do odpalenia kodu wymagana jest biblioteka **azure-ai-ml**. Jeżeli nie jest zainstalowana należy wywołać komendę *pip install azure-ai-ml*

In [1]:
pip show azure-ai-ml

Name: azure-ai-ml
Version: 1.5.0
Summary: Microsoft Azure Machine Learning Client Library for Python
Home-page: https://github.com/Azure/azure-sdk-for-python
Author: Microsoft Corporation
Author-email: azuresdkengsysadmins@microsoft.com
License: MIT License
Location: /anaconda/envs/azureml_py310_sdkv2/lib/python3.10/site-packages
Requires: azure-common, azure-core, azure-mgmt-core, azure-storage-blob, azure-storage-file-datalake, azure-storage-file-share, colorama, isodate, jsonschema, marshmallow, msrest, opencensus-ext-azure, pydash, pyjwt, pyyaml, strictyaml, tqdm, typing-extensions
Required-by: 
Note: you may need to restart the kernel to use updated packages.


### Łączenie się z workspace
Kiedy kod wykonywany jest za pomocą wirtualnej maszyny zarządzanej przez Azure ML używamy domyślnych wartości do połączenia się.

In [2]:
from azure.identity import DefaultAzureCredential, InteractiveBrowserCredential
from azure.ai.ml import MLClient

try:
    credential = DefaultAzureCredential()
    # Check if given credential can get token successfully.
    credential.get_token("https://management.azure.com/.default")
except Exception as ex:
    # Fall back to InteractiveBrowserCredential in case DefaultAzureCredential not work
    credential = InteractiveBrowserCredential()

In [3]:
# Get a handle to workspace
ml_client = MLClient.from_config(credential=credential)

Found the config file in: /config.json


### Tworzenie skryptów
Poniższy kod tworzy folder src, w którym znajdą się skrypty pipeline'u. 
Kolejne 2 bloki kodu zawierają skrypty, które przygotują dane oraz wytrenują model.
Przy trenowaniu modelu używam biblioteki mlflow, aby śledzić modele oraz ich artefakty.

In [4]:
import os

# create a folder for the script files
script_folder = 'src'
os.makedirs(script_folder, exist_ok=True)
print(script_folder, 'folder created')

src folder created


In [47]:
%%writefile $script_folder/prep-data.py
# import libraries
import argparse
import pandas as pd
import numpy as np
from pathlib import Path
from sklearn.preprocessing import MinMaxScaler

def main(args):
    # read data
    df = get_data(args.input_data)

    cleaned_data = clean_data(df)

    scaled_data = scale_data(cleaned_data)

    output_df = scaled_data.to_csv((Path(args.output_data) / "rain.csv"), index = False)

# function that reads the data
def get_data(path):
    df = pd.read_csv(path)
    df.Date = df.Date.astype(np.datetime64)
    df.set_index('Date', inplace=True) 

    # Count the rows and print the result
    row_count = (len(df))
    print('Preparing {} rows of data'.format(row_count))
    
    return df

# function that removes missing values
def clean_data(df):
    location_df = df.query("Location == 'Darwin'")  # Wybieram tylko miasto Darwin z całego datasetu, żeby szybciej się liczyło
    location_df = location_df.drop(columns='Location')  # Wszędzie jest lokacja darwin, więc wyrzucam tą kolumnę
    location_df = location_df.dropna(subset=['RainTomorrow'])  # Wyrzucza wiersze z brakującą wartością zmiennej objaśnianej
    location_df = location_df.replace({"No": False, "Yes": True}).astype(
        {'RainToday': bool, 'RainTomorrow': bool})  # Zmienia typ na bool
    location_df = location_df.ffill()  # Uzupełniam brakujące dane za pomocą danych z poprzedniego dnia
    location_df = pd.get_dummies(
        data=location_df)  # One hot encoding - czyli zamieniam dane kategoryczne na zera i jedynki
    return location_df

def scale_data(df):
    # Skalowanie danych - zamienia wartości liczbowe na takie ze średnią w 0 i odchyleniem standardowym 1
    scaler = MinMaxScaler()
    num_cols = ['MinTemp','MaxTemp','Rainfall','Evaporation','Sunshine','WindGustSpeed','WindSpeed9am', 
    'WindSpeed3pm', 'Humidity9am', 'Humidity3pm', 'Pressure9am', 'Pressure3pm', 'Cloud9am', 'Cloud3pm',
    'Temp9am', 'Temp3pm']
    df[num_cols] = scaler.fit_transform(df[num_cols])

    return df


def parse_args():
    # setup arg parser
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument("--input_data", dest='input_data',
                        type=str)
    parser.add_argument("--output_data", dest='output_data',
                        type=str)

    # parse args
    args = parser.parse_args()

    # return args
    return args

# run script
if __name__ == "__main__":
    # add space in logs
    print("\n\n")
    print("*" * 60)

    # parse args
    args = parse_args()

    # run main function
    main(args)

    # add space in logs
    print("*" * 60)
    print("\n\n")

Overwriting src/prep-data.py


In [48]:
%%writefile $script_folder/train-model.py
# import libraries
import mlflow
import glob
import argparse
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score
from sklearn.metrics import roc_curve
import matplotlib.pyplot as plt

def main(args):
    # enable autologging
    mlflow.autolog()

    # read data
    df = get_data(args.training_data)

    # split data
    X_train, X_test, y_train, y_test = split_data(df)

    # train model
    model = train_model(args.reg_rate, X_train, X_test, y_train, y_test)

    eval_model(model, X_test, y_test)

# function that reads the data
def get_data(data_path):

    all_files = glob.glob(data_path + "/*.csv")
    df = pd.concat((pd.read_csv(f) for f in all_files), sort=False)
    
    return df

# function that splits the data
def split_data(df):
    print("Splitting data...")
    X = df.drop(columns="RainTomorrow")
    y = df.RainTomorrow

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.20, random_state=101, stratify=y)

    return X_train, X_test, y_train, y_test

# function that trains the model
def train_model(reg_rate, X_train, X_test, y_train, y_test):
    mlflow.log_param("Regularization rate", reg_rate)
    print("Training model...")
    model = LogisticRegression(C=1/reg_rate, solver="liblinear").fit(X_train, y_train)

    mlflow.sklearn.save_model(model, args.model_output)

    return model

# function that evaluates the model
def eval_model(model, X_test, y_test):
    # calculate accuracy
    y_hat = model.predict(X_test)
    acc = np.average(y_hat == y_test)
    print('Accuracy:', acc)

    # calculate AUC
    y_scores = model.predict_proba(X_test)
    auc = roc_auc_score(y_test,y_scores[:,1])
    print('AUC: ' + str(auc))

    # plot ROC curve
    fpr, tpr, thresholds = roc_curve(y_test, y_scores[:,1])
    fig = plt.figure(figsize=(6, 4))
    # Plot the diagonal 50% line
    plt.plot([0, 1], [0, 1], 'k--')
    # Plot the FPR and TPR achieved by our model
    plt.plot(fpr, tpr)
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve')
    plt.savefig("ROC-Curve.png") 

def parse_args():
    # setup arg parser
    parser = argparse.ArgumentParser()

    # add arguments
    parser.add_argument("--training_data", dest='training_data',
                        type=str)
    parser.add_argument("--reg_rate", dest='reg_rate',
                        type=float, default=0.01)
    parser.add_argument("--model_output", dest='model_output',
                        type=str)

    # parse args
    args = parser.parse_args()

    # return args
    return args

# run script
if __name__ == "__main__":
    # add space in logs
    print("\n\n")
    print("*" * 60)

    # parse args
    args = parse_args()

    # run main function
    main(args)

    # add space in logs
    print("*" * 60)
    print("\n\n")

Overwriting src/train-model.py


### Definiowanie komponentów
Poniższy kod utworzy pliki YAML dla każdego komponentu, który będzie stanowił krok pipeline'u. Do zdefiniowania komponentu należy podać metadane, dane wejściowe, wyjście oraz komendę powłoki, która zostanie użyta do uruchomienia komponentu i jej środowisko wykonawcze.

In [49]:
%%writefile prep-data.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: prep_data
display_name: Prepare training data
version: 1
type: command
inputs:
  input_data: 
    type: uri_file
outputs:
  output_data:
    type: uri_folder
code: ./src
environment: azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu@latest
command: >-
  python prep-data.py 
  --input_data ${{inputs.input_data}}
  --output_data ${{outputs.output_data}}

Overwriting prep-data.yml


In [50]:
%%writefile train-model.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
name: train_model
display_name: Train a decision tree classifier model
version: 1
type: command
inputs:
  training_data: 
    type: uri_folder
  reg_rate:
    type: number
    default: 0.01
outputs:
  model_output:
    type: mlflow_model
code: ./src
environment: azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu@latest
command: >-
  python train-model.py 
  --training_data ${{inputs.training_data}} 
  --reg_rate ${{inputs.reg_rate}} 
  --model_output ${{outputs.model_output}} 

Overwriting train-model.yml


### Ładowanie komponentów

In [51]:
from azure.ai.ml import load_component

parent_dir = ""

prep_data = load_component(source=parent_dir + "./prep-data.yml")
train_decision_tree = load_component(source=parent_dir + "./train-model.yml")

### Ładowanie danych
Dane do trenowania modelu mogą znajdować się w trzech formatach:
* URI_FILE - jeden plik
* URI_FOLDER - folder z plikami
* MLTABLE - typ danych specyficzny dla azure, za jego pomocą można odczytać wiele plików w różnych formatach. Jest on wymagany do użycia narzędzi AutoML, jednak kiedy próbowałem tworzyć go w sposób programistyczny to nie chiał działać. Może go jednak również stworzyć za pomocą GUI.  

Po wykonaniu poniższego kodu stworzy się data asset. Jeżeli stworzymy kolejny asset o takiej samej nazwie będą one wersjonowane zaczynając od liczby 1, więc do tego możemy się dostać za pomocą "*azureml:rain-local:1*", gdybyśmy chcieli uruchomić kolejną wersję tego datasetu to analogicznie będzie to "*azureml:rain-local:2*". Testowo stworzyłem kilka wersji danych co zaprezentowane jest na poniższym zdjęciu.
![Wersjonowanie danych](images/dataAssets.png)

In [52]:
from azure.ai.ml.entities import Data
from azure.ai.ml.constants import AssetTypes

my_path = './data/weatherAUS.csv'

my_data = Data(
    path=my_path,
    type=AssetTypes.URI_FILE,
    description="Rain in Australia dataset from kaggle",
    name="rain-local"
)

ml_client.data.create_or_update(my_data)

Data({'skip_validation': False, 'mltable_schema_url': None, 'referenced_uris': None, 'type': 'uri_file', 'is_anonymous': False, 'auto_increment_version': False, 'name': 'rain-local', 'description': 'Rain in Australia dataset from kaggle', 'tags': {}, 'properties': {}, 'print_as_yaml': True, 'id': '/subscriptions/a09bae00-b244-4265-9803-cfcc15f34f12/resourceGroups/rg-asi/providers/Microsoft.MachineLearningServices/workspaces/mlw-asi/data/rain-local/versions/3', 'Resource__source_path': None, 'base_path': '/mnt/batch/tasks/shared/LS_root/mounts/clusters/ci-asi/code/Users/s20636', 'creation_context': <azure.ai.ml.entities._system_data.SystemData object at 0x7fa49be72f20>, 'serialize': <msrest.serialization.Serializer object at 0x7fa49be73250>, 'version': '3', 'latest_version': None, 'path': 'azureml://subscriptions/a09bae00-b244-4265-9803-cfcc15f34f12/resourcegroups/rg-asi/workspaces/mlw-asi/datastores/workspaceblobstore/paths/LocalUpload/58e3442047b54f8bbbed5b42f21dd66b/weatherAUS.csv', 

### Tworzenie pipeline'u
Łączymy oba komponenty. Danymi wejściowymi do modelu jest wcześniej stworzony data asset. Następnie wyjście pierwszego komponentu jest danymi wejściowymi do kolejnego.

In [53]:
from azure.ai.ml import Input
from azure.ai.ml.constants import AssetTypes
from azure.ai.ml.dsl import pipeline

@pipeline()
def rain_classification(pipeline_job_input):
    clean_data = prep_data(input_data=pipeline_job_input)
    train_model = train_decision_tree(training_data=clean_data.outputs.output_data)

    return {
        "pipeline_job_transformed_data": clean_data.outputs.output_data,
        "pipeline_job_trained_model": train_model.outputs.model_output,
    }

pipeline_job = rain_classification(Input(type=AssetTypes.URI_FILE, path="azureml:rain-local:1"))
     

In [54]:
print(pipeline_job)

display_name: rain_classification
type: pipeline
inputs:
  pipeline_job_input:
    type: uri_file
    path: azureml:rain-local:1
outputs:
  pipeline_job_transformed_data:
    type: uri_folder
  pipeline_job_trained_model:
    type: mlflow_model
jobs:
  clean_data:
    type: command
    inputs:
      input_data:
        path: ${{parent.inputs.pipeline_job_input}}
    outputs:
      output_data: ${{parent.outputs.pipeline_job_transformed_data}}
    component:
      $schema: https://azuremlschemas.azureedge.net/latest/commandComponent.schema.json
      name: prep_data
      version: '1'
      display_name: Prepare training data
      type: command
      inputs:
        input_data:
          type: uri_file
      outputs:
        output_data:
          type: uri_folder
      command: python prep-data.py  --input_data ${{inputs.input_data}} --output_data
        ${{outputs.output_data}}
      environment: azureml:AzureML-sklearn-0.24-ubuntu18.04-py37-cpu@latest
      code: /mnt/batch/tasks/s

In [55]:
# change the output mode
pipeline_job.outputs.pipeline_job_transformed_data.mode = "upload"
pipeline_job.outputs.pipeline_job_trained_model.mode = "upload"
# set pipeline level compute
pipeline_job.settings.default_compute = "cluster-ASI"
# set pipeline level datastore
pipeline_job.settings.default_datastore = "workspaceblobstore"

### Uruchamianie pipeline'u 

In [56]:
# submit job to workspace
pipeline_job = ml_client.jobs.create_or_update(
    pipeline_job, experiment_name="pipeline_rain"
)
pipeline_job

[32mUploading src (0.01 MBs):   0%|          | 0/5889 [00:00<?, ?it/s][32mUploading src (0.01 MBs): 100%|██████████| 5889/5889 [00:00<00:00, 138656.43it/s]
[39m



Experiment,Name,Type,Status,Details Page
pipeline_rain,red_apricot_579vyxzl52,pipeline,Preparing,Link to Azure Machine Learning studio


Pipeline wykonywał się około 5 minut. Po jego pomyślnym zakończeniu mamy dostęp do wytrenowanego modelu oraz artefaktów.
![Pipeline](images/pipeline1.png)
Możemy również zlecić cron job, który będzie uruchamiał pipeline co określony okres czasu.
![Pipeline](images/pipeline2.png)


# AutoML

Aby stworzyć model AutoML w Azure potrzebny jest dataset w formacie MLTable, stworzyłem go za pomocą GUI, ponieważ rozwiązanie używające kodu nie chciało działać i po kilkunastu próbach poddałem się. Poniżej pokazany jest graficzny interfejs do tworzenia data assetów.
![MLTable](images/mltable.png)


Po stworzeniu zbioru danych w odpowiednim formacie należy przejść do modułu **Automated ML** po lewej stronie interfejsu. Następnie wybieramy dane oraz konfigurujemy zadanie. W tym przypadku wybieramy klasyfikację oraz podajemy sposób walidacji modeli.
![Automl](images/automl1.png)
![Automl](images/automl2.png)
![Automl](images/automl3.png)

Eksperyment wykonywał się 3 godziny 9 minut na klastrze złożonym z dwóch jednostek Standard_DS11_v2 (2 cores, 14 GB RAM, 28 GB disk). Łącznie zostało przetestowanych około 50 modeli. Większość z nich trenowała się poniżej 2 minut, ale jeden z nich zakończył się błędem z powodu timeoutu po godzinie.
![Automl](images/automl4.png)
Po kliknięciu w jeden z modeli możemy zobaczyć jego metryki oraz objaśnienie działania zawierające najważniejsze feature'y.
![Automl](images/automl5.png)


Oczywiście każdy z modeli jest automatycznie zapisywany.Aby model mógł być wersjonowany musimy go zarejestrować.
![Versioning](images/register1.png)
![Versioning](images/register2.png)
W zakładce **Models** po lewej stronie interfejsu mamy dostęp do wszystkich wersjonowanych modeli. Ich działanie jest podobne do wersjonowania datasetów, które zostało wcześniej opisane.
![Versioning](images/models1.png)
## Deployment
Po wybraniu zarejestrowanego modelu można go wdrożyć jako endpoint. 
![Deployment](images/deploy1.png)
![Deployment](images/deploy2.png)
Endpoint uruchamiał się około 20 minut. Po tym czasie można przetestować jego działanie. Aby używać endpointu należy pobrać klucz. Bez niego nie da się autoryzować dostępu.
![Endpoint](images/endpoint1.png)

Poniżej znajduje się kod, za pomocą którego można użyć endpointu. Został on wygenerowany przez azure.  Jednak endpoint został wyłączony po jego przetestowaniu (z powodu stałych opłat za każdą godzinę użytkowania) dlatego ten kod nie będzie działał.
~~~
import urllib.request
import json
import os
import ssl

def allowSelfSignedHttps(allowed):
    # bypass the server certificate verification on client side
    if allowed and not os.environ.get('PYTHONHTTPSVERIFY', '') and getattr(ssl, '_create_unverified_context', None):
        ssl._create_default_https_context = ssl._create_unverified_context

allowSelfSignedHttps(True) # this line is needed if you use self-signed certificate in your scoring service.

# Request data goes here
# The example below assumes JSON formatting which may be updated
# depending on the format your endpoint expects.
# More information can be found here:
# https://docs.microsoft.com/azure/machine-learning/how-to-deploy-advanced-entry-script
data =  {
  "input_data": {
    "columns": [
      "Rainfall",
      "Evaporation",
      "Sunshine",
      "WindGustSpeed",
      "Humidity3pm",
      "Pressure3pm",
      "Cloud3pm",
      "Temp3pm",
      "RainToday"
    ],
    "index": [],
    "data": []
  }
}

body = str.encode(json.dumps(data))

url = 'https://mlw-asi-psrel.northeurope.inference.ml.azure.com/score'
# Replace this with the primary/secondary key or AMLToken for the endpoint
api_key = ''
if not api_key:
    raise Exception("A key should be provided to invoke the endpoint")

# The azureml-model-deployment header will force the request to go to a specific deployment.
# Remove this header to have the request observe the endpoint traffic rules
headers = {'Content-Type':'application/json', 'Authorization':('Bearer '+ api_key), 'azureml-model-deployment': 'raininaustraliamodel-1' }

req = urllib.request.Request(url, body, headers)

try:
    response = urllib.request.urlopen(req)

    result = response.read()
    print(result)
except urllib.error.HTTPError as error:
    print("The request failed with status code: " + str(error.code))

    # Print the headers - they include the requert ID and the timestamp, which are useful for debugging the failure
    print(error.info())
    print(error.read().decode("utf8", 'ignore'))
~~~

Endpoint możemy też monitorować.
![Endpoint](images/endpoint2.png)
Isnieje również opcja automatycznego skalowania w zależności od metryk takich jak na przykład zużycie procesora. 
![Endpoint](images/autoscale1.png)