## Building Machine learning Pipeline

- Data ingestion
- Model training
- Model testing
- Model packaging
- Model registering

### Data ingestion

Data ingestion is a trigger step for the ML pipeline.
import the preprocessed data

In [2]:
import pandas as pd
import numpy as np
import warnings
from math import sqrt
warnings.filterwarnings('ignore')
from azureml.core.run import Run
from azureml.core.experiment import Experiment
from azureml.core.workspace import Workspace
from azureml.core.model import Model
from azureml.core.authentication import ServicePrincipalAuthentication
from azureml.train.automl import AutoMLConfig
import pickle
from matplotlib import pyplot as plt
from matplotlib.pyplot import figure
import mlflow

In [None]:
from azureml.core import Workspace, Dataset

subscription_id = ''
resource_group = ''
workspace_name = ''

workspace = Workspace(subscription_id, resource_group, workspace_name)

In [None]:
# MLFLOW

uri = workspace.get_mlflow_tracking_uri()
mlflow.set_tracking_uri(uri)

In [None]:
# Importing pre-processed dataset

dataset = Dataset.get_by_name (workspace, name='processed_weather_data_portofTurku')

print(dataset.name, dataset.version)

In [None]:
# convert to pandas dataframe

df = dataset.to_pandas_dataframe()

# preview the first 3 rows of the dataset
# df = dataset.take(3).to_pandas_dataframe()

In [None]:
df.head()

## Train test split

In [None]:
df_training = df.iloc[:77160]

df_validation = df.drop(df_training.index)

df_training.to_csv('Dataset/training_data.csv',index=False)

df_validation.to_csv('Dataset/validation_data.csv',index=False)

In [None]:
df_training.shape

In [None]:
df_validation.shape

### After splitting the data, these two datasets are stored and registered to the datastore (connected to the Azure ML workspace) 

In [None]:
!mkdir train_val_dataset

In [None]:
df_training.to_csv('train_val_dataset/training_data.csv',index=False)

In [None]:
df_validation.to_csv('train_val_dataset/validation_data.csv',index=False)

In [None]:
datastore = workspace.get_default_datastore()

In [None]:
datastore.upload(src_dir='Dataset', target_path='train_val_dataset')

In [None]:
training_dataset = Dataset.Tabular.from_delimited_files(datastore.path('train_val_dataset/training_data.csv'))

In [None]:
validation_dataset = Dataset.Tabular.from_delimited_files(datastore.path('train_val_dataset/validation_data.csv'))

In [None]:
# this will register training ds to our workpace
training_ds = training_dataset.register(workspace=workspace,
                                 name='training_dataset',
                                 description='Dataset to use for ML training')

In [None]:
# this will register validation ds  to our workpace
validation_ds = validation_dataset.register(workspace=workspace,
                                 name='validation_dataset',
                                 description='Dataset for validation ML models')

### Data ingestion (training dataset)

start by importing it using the get_by_name() function and converting it to a pandas dataframe using the to_pandas_dataframe() function:

In [None]:
dataset = Dataset.get_by_name (workspace, name='training_dataset')
print(dataset.name, dataset.version)


In [None]:
df = dataset.to_pandas_dataframe()

In [None]:
df.shape


### Feature Selection and scaling

In [None]:
# use .values so that the shape can be matched while transforming
X = df[['Temperature_C', 'Humidity', 'Wind_speed_kmph', 'Wind_bearing_degrees', 'Visibility_km', 'Pressure_millibars', 'Current_weather_condition']].values
y = df['Future_weather_condition'].values


In [None]:
y

In [None]:
# Splitting the Training dataset into Train and Test set for ML training
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=42)

In [None]:
from sklearn.preprocessing import StandardScaler
ss = StandardScaler()

In [None]:
# fit on test data only
# fit_transform on train data !
X_train = ss.fit_transform(X_train)
X_test = ss.transform(X_test)

## Machine learning training and hyperparameter optimization

The output of this step is a trained ML model.

In [None]:
# initiating the training or experiment using the Experiment() function from the Azure SDK. 
# The purpose of this function is to start a training run or experiment 
# in order to monitor and log the model training performance in the Azure ML workspace:

myexperiment = Experiment(workspace, "support-vector-machine")
mlflow.set_experiment("mlflow-support-vector-machine")

In [None]:
# hyperparameter tuning to find the best parameters to converge the best model

#from sklearn.svm import SVC
from sklearn import svm, datasets
from sklearn.model_selection import GridSearchCV

In [None]:
parameters = {'kernel':('linear', 'rbf'), 'C':[1, 10]}

In [None]:
svc = svm.SVC()

In [None]:
# initialize a run in Azureml and mlflow experiments
run = myexperiment.start_logging()

mlflow.start_run()

run.log("dataset name", dataset.name)

run.log("dataset Version", dataset.version)


In [None]:
svc_grid = GridSearchCV(svc, parameters)

In [None]:
%%time
svc_grid.fit(X_train, y_train)

In [None]:
# If deep=True, it will just return the parameters of the inner estimators if any
svc_grid.get_params(deep=True)

In [None]:
from sklearn.svm import SVC

sklearn uses double underscore as separator. classifier key is the same as the pipeline name for estimator in the pipeline definition, classifier__C basically tells the grid searcher that we would like to try these provided values for C which is a parameter for the classifier to define regularization weight.

In [3]:
#https://towardsdatascience.com/ml-pipelines-with-grid-search-in-scikit-learn-2539d6b53cfb

In [None]:
# using the best parameters, a new model is trained using C=1 and kernel='rbf '
svc = SVC(C=svc_grid.get_params(deep=True)['estimator__C'], kernel=svc_grid.get_params(deep=True)['estimator__kernel'])

In [None]:
svc.fit(X_train, y_train)

In [None]:
# Logging training parameters to AzureML and MLFlow experiments
run.log("C", svc_grid.get_params(deep=True)['estimator__C'])
run.log("Kernel", svc_grid.get_params(deep=True)['estimator__kernel'])

In [None]:
from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score

In [None]:
# Prediction with test data 

predicted_svc = svc.predict(X_test)

In [None]:
acc = accuracy_score(y_test, predicted_svc)

In [None]:
fscore = f1_score(y_test, predicted_svc, average="macro")
precision = precision_score(y_test, predicted_svc, average="macro")
recall = recall_score(y_test, predicted_svc, average="macro")

In [None]:
# to keep track of hash histories

import git
repo = git.Repo(search_parent_directories=True)
sha = repo.head.object.hexsha

In [None]:
# Log to AzureML and MLflow

run.log("Test_accuracy", acc)
run.log("Precision", precision)
run.log("Recall", recall)
run.log("F-Score", fscore)
run.log("Git-sha", sha)

In [None]:
run.complete()
print ("run id:", run.id)

In [None]:
mlflow.end_run()

In [None]:
run.get_metrics()

In [None]:
workspace.get_details()

In [None]:
import mlflow.sklearn
mlflow.sklearn.log_model(svc, 'outputs')

### Training with Random Forest Classifier

In [None]:
myexperiment = Experiment(workspace, "random-forest-classifier")
mlflow.set_experiment("mlflow-random-forest-classifier")

In [None]:
from sklearn.ensemble import RandomForestClassifier

In [None]:
# manually chosen as compute option is not large enough
# for grid search on Random Forest need a bigger compute 

rf = RandomForestClassifier(max_depth=10, random_state=0, n_estimators=100)


In [None]:
# initialize runs in Azureml and mlflow
run = myexperiment.start_logging()
mlflow.start_run()


# Log dataset used 
run.log("dataset name", dataset.name)
run.log("dataset Version", dataset.version)

In [None]:
%%time
rf.fit(X_train, y_train)

In [None]:
# Logging training parameters to AzureML and MLFlow experiments
run.log("max_depth", 10)
run.log("random_state", 0)
run.log("n_estimators", 100)

In [None]:
# Prediction with test data

predicted_rf = rf.predict(X_test)

In [None]:
acc = accuracy_score(y_test, predicted_rf)
fscore = f1_score(y_test, predicted_rf, average="macro")
precision = precision_score(y_test, predicted_rf, average="macro")
recall = recall_score(y_test, predicted_rf, average="macro")

In [None]:
run.log("Test_accuracy", acc)
run.log("Precision", precision)
run.log("Recall", recall)
run.log("F-Score", fscore)
run.log("Git-sha", sha)

In [None]:
run.complete()
print ("run id:", run.id)

In [None]:
mlflow.end_run()

In [None]:
run.get_metrics()

## Model Packaging 

ONNX offers an open standard for model interoperability. ONNX stands for Open Neural Network Exchange. It provides a serialization standard for importing and exporting models. We will use the ONNX format to serialize the models to avoid compatibility and interoperability issues.

Using ONNX, the trained model is serialized using the skl2onnx library. The model is serialized as the file svc.onnx

In [None]:
# Convert into SVC model into ONNX format file
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
initial_type = [('float_input', FloatTensorType([None, 6]))]
onx = convert_sklearn(svc, initial_types=initial_type)
with open("outputs/svc.onnx", "wb") as f:
    f.write(onx.SerializeToString())

In [None]:
# Convert into RF model into ONNX format file
from skl2onnx import convert_sklearn
from skl2onnx.common.data_types import FloatTensorType
initial_type = [('float_input', FloatTensorType([None, 6]))]
onx = convert_sklearn(rf, initial_types=initial_type)
with open("outputs/rf.onnx", "wb") as f:
    f.write(onx.SerializeToString())

## Register these serialized models to the model registry.

A registered model is compiled as a logical container for one or more files that function as a model. For instance, a model made up of multiple files can be registered as a single model in the model registry. By downloading the registered model, all the files can be received. The registered model can be deployed and used for inference on demand.

In [None]:
# Register Model on AzureML WS
model = Model.register(model_path = './outputs/svc.onnx', # this points to a local file 
                       model_name = "support-vector-classifier", # this is the name the model is registered as
                       tags = {'dataset': dataset.name, 'version': dataset.version, 'hyparameter-C': '1', 'testdata-accuracy': '0.9519'}, 
                       model_framework='pandas==0.23.4',
                       description = "Support vector classifier to predict",
                       workspace = workspace)

print('Name:', model.name)
print('Version:', model.version)

In [None]:
# Register Model on AzureML WS
model = Model.register(model_path = './outputs/rf.onnx', # this points to a local file 
                       model_name = "random-forest-classifier", # this is the name the model is registered as
                       tags = {'dataset': dataset.name, 'version': dataset.version, 'hyparameter-C': '1', 'testdata-accuracy': '0.9548'}, 
                       model_framework='pandas==0.23.4',
                       description = "Random forest classifier to predict",
                       workspace = workspace)

print('Name:', model.name)
print('Version:', model.version)

In [None]:
import mlflow.sklearn

In [None]:
# Save the model to the outputs directory for capture
mlflow.sklearn.log_model(svc, 'outputs/svc.onnx')

In [None]:
# Save the model to the outputs directory for capture
mlflow.sklearn.log_model(rf, 'outputs/rf.onnx')

## Save model artefacts

For model inference in real time, a scalar is needed in order to scale the incoming data on the scale at which the data was scaled for ML training. We will use the same scaler function used for scaling X_train using sc.fit_transform(X_train) and serialize this variable into a pickle file. Lastly, we register this pickle file to the workspace for further retrieval and usage as needed (especially for model inference in the test and production environment). Using pickle, write the scaler variable sc into a pickle file using the pickle.dump() 

In [None]:
import pickle

with open('./outputs/scaler.pkl', 'wb') as scaler_pkl:
    pickle.dump(sc, scaler_pkl)

In [None]:
# Register Model on AzureML WS
scaler = Model.register(model_path = './outputs/scaler.pkl', # this points to a local file 
                       model_name = "scaler", # this is the name the model is registered as
                       tags = {'dataset': dataset.name, 'version': dataset.version}, 
                       model_framework='pandas==0.23.4',
                       description = "Scaler used for scaling incoming inference data",
                       workspace = workspace)

print('Name:', scaler.name)
print('Version:', scaler.version)

## Both the SVM classifier and Random Forest classifier, along with the serialized scaler, are registered in the model registry. These models can be downloaded and deployed. 