## Synopsis

The following Notebook contains an orchestration on ML pipelines using inbuilt Pyspark Pipeline Features. The Notebook provides a brief introduction on using ML Pipelines with Codes. ML Pipelines serves as one of the important pieces of ML-Ops Best practices, which reduces manual intervention within codes to a high extent and makes codes reproducible.

### Dataset
Dataset used in below example has been cloned from Kaggle platform, [link](https://www.kaggle.com/datasets/kamilpytlak/personal-key-indicators-of-heart-disease) 
Description: 2020 annual CDC survey data of 400k adults related to their health status, with context to having a heart disease. (Additional Information can be extracted from Kaggle Link).

Scope of Notebook:

1. **Pipelines**: Using Pipeline Feature on Pyspark Codes, using step Feature.
(Steps like Categorical conversion, regex , Ordinal Mapping, dropping Col, Vectorization of Input Features etc..) 

2. **Model Experiments**: Automated Model Experiments (with Hypertuning) to result in the Best Model.

Out of Scope:

1. This Exercise aims to introduce to ML pipelines, additional prior steps like EDA, feature selection, outlier removal etc. hasn't been much focused upon.


#### Data Versioning: (Hugging Face)
`https://huggingface.co/datasets/mozay22/heart_disease/tree/main` 

#### Code Repo: (GitHub)
`https://github.com/mohdtaher2022/ML_Ops_Practices` 


#### Pre-Requisites

Python, SQL, PySpark, ML Lifecycle, Statistics

Kindly Note: The Notebook has been executed on a remote server, change in  path referencing might be required in host server. Python packages are printed below to observe similar results executed in exercise.






### Setting up Spark Infrastructure and Pre-requisite Libraries  

In [None]:
%%capture
!apt-get install openjdk-8-jdk

In [None]:
import os
#Set the JAVA_HOME env variable
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-8-openjdk-amd64"

In [None]:
%%capture
!echo $JAVA_HOME
!pip install pyspark==3.0.0
!pip install -q findspark
!pip install datasets

### Importing Modules

In [None]:
# 3. Start Spark Session
import findspark
findspark.init()

#import the necessary dependencies
import sys
import os
import operator
import json

# Importing Specific Dataset of Heart Disease
from datasets import load_dataset


# data wrangling
import numpy as np
import pandas as pd
import pyspark
from pyspark import SparkContext, SparkConf

import pyspark.sql.types  as st
import pyspark.sql.functions  as sf
from pyspark.sql.functions import rand 
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession, types as T, functions as F
from pyspark.sql.functions import udf
pd.options.display.html.table_schema = True

# machine learning
from pyspark.ml import Pipeline
from pyspark.ml.pipeline import Transformer
from pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModel , GBTClassifier \
, GBTClassificationModel, LogisticRegression, LogisticRegressionModel
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics ,BinaryClassificationMetrics
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.stat import ChiSquareTest


# Sklearn Model
from sklearn.metrics import accuracy_score, classification_report, precision_score, recall_score, roc_auc_score
from sklearn.metrics import *

# Visuals
import plotly.express as px

# Ignore Warnings
import warnings
warnings.filterwarnings('ignore')

# misc
import math
from operator import add
from functools import reduce
from datetime import datetime
import operator
import re
import random

# Dropping the display of Scientific Notations.
# for pandas 
pd.set_option('display.float_format', lambda x: '%.3f' % x)
# for Numpys
np.set_printoptions(suppress=True,formatter={'float_kind':'{:16.3f}'.format}, linewidth=130)

### Dowloading Dataset

In [None]:
# Cloning Data from Hugging Face.
!git lfs install
!git clone https://huggingface.co/datasets/mozay22/heart_disease

Error: Failed to call git rev-parse --git-dir --show-toplevel: "fatal: not a git repository (or any of the parent directories): .git\n"
Git LFS initialized.
fatal: destination path 'heart_disease' already exists and is not an empty directory.


In [None]:
os.environ['dir'] = os.getcwd()  ; os.environ['repo'] = 'heart_disease' ;
os.environ['file_1'] = 'train_df.zip'; os.environ['file_2'] = 'validation_df.zip' ;

In [None]:
%%capture
!unzip $dir/$repo/$file_1 -d output/

### Loading Code Modules from Git. 

In [None]:
# Cloning modules from Github.
!rm -rf ML_Ops_Practices/
!git clone https://github.com/mohdtaher2022/ML_Ops_Practices.git

Cloning into 'ML_Ops_Practices'...
remote: Enumerating objects: 239, done.[K
remote: Counting objects: 100% (21/21), done.[K
remote: Compressing objects: 100% (21/21), done.[K
remote: Total 239 (delta 5), reused 0 (delta 0), pack-reused 218[K
Receiving objects: 100% (239/239), 22.22 MiB | 20.52 MiB/s, done.
Resolving deltas: 100% (74/74), done.


#### Loading all Modules in main environment. 
any module to be loaded within Pyspark Environment has to be loaded in the main Kernel Environment. 

In [None]:
# Functions
helper_func = open(os.path.join(os.getcwd(),'ML_Ops_Practices/Utilities/Functions/helper_functions.py')).read()
pipeline_func = open(os.path.join(os.getcwd(),'ML_Ops_Practices/Utilities/Functions/pipeline_func.py')).read()
retrain_n_validation = open(os.path.join(os.getcwd(),'ML_Ops_Practices/Utilities/Functions/retrain_n_validation.py')).read()
# Configs
feature_eng_configs = open(os.path.join(os.getcwd(),'ML_Ops_Practices/Utilities/Configs/feature_eng_config.py')).read()
model_params = open(os.path.join(os.getcwd(),'ML_Ops_Practices/Utilities/Configs/model_params.py')).read()
# Variables
variables = open(os.path.join(os.getcwd(),'ML_Ops_Practices/Utilities/Variables/env_var.py')).read()

# Environment Variables
exec(variables) ;
# Model_scores , train_test_split_spark, extract_prob_udf, input_features, sensivity_specificity
exec(helper_func); exec(retrain_n_validation) ;  
# one_hot_encoder_pipeline, ordinal_label_mapping_pipeline, extract_regex_expr_pipeline, drop_columns_pipeline, data_type_col
exec(pipeline_func) ; 
# Model Parameters.
exec(model_params) ; 

#### (Aleternate Approach to Download Dataset File)
Import Data using datasets library from Huggingface

In [None]:
# dataset = load_dataset("mozay22/heart_disease", data_files= "heart_disease_diagnosis.csv")
# dataset_to_pandas = pd.DataFrame(dataset['train'])
# print("Dimensions of Dataset: ", dataset_to_pandas.shape)

### Loading Spark Session, Spark Context & Dataframe

In [None]:
# Building App using Spark Session
spark = SparkSession.builder \
    .master('local[*]') \
    .config("spark.driver.memory", "20g") \
    .appName('my-cool-app') \
    .getOrCreate()
sc=spark.sparkContext
# Configuring the job to distributed among 5 nodes
spark.conf.set("spark.sql.shuffle.partitions", "5")

# To read from Local File
load_path = os.path.join(os.getcwd(),"output/content/train_df.parquet")
df = spark.read.parquet(load_path)
df.show(5)

+------------+-----+-------+---------------+------+--------------+------------+-----------+------+-----------+-----+--------+----------------+---------+---------+------+-------------+----------+------------+
|HeartDisease|  BMI|Smoking|AlcoholDrinking|Stroke|PhysicalHealth|MentalHealth|DiffWalking|   Sex|AgeCategory| Race|Diabetic|PhysicalActivity|GenHealth|SleepTime|Asthma|KidneyDisease|SkinCancer|capture_date|
+------------+-----+-------+---------------+------+--------------+------------+-----------+------+-----------+-----+--------+----------------+---------+---------+------+-------------+----------+------------+
|          No|32.12|    Yes|             No|    No|           0.0|         0.0|        Yes|Female|80 or older|White|      No|              No|     Good|      9.0|    No|           No|       Yes|  2020-01-01|
|          No|28.25|     No|             No|    No|           0.0|        14.0|         No|Female|      60-64|White|      No|             Yes|Very good|      6.0|    No

In [None]:
# Data Shape
df.shape()

(199728, 19)

### Evaluating Data Types 
Identifying Data columns if they are incorrectly mapped and changing the data types.

In [None]:
# Checking the Datatypes
(df.dtypes)

[('HeartDisease', 'string'),
 ('BMI', 'double'),
 ('Smoking', 'string'),
 ('AlcoholDrinking', 'string'),
 ('Stroke', 'string'),
 ('PhysicalHealth', 'double'),
 ('MentalHealth', 'double'),
 ('DiffWalking', 'string'),
 ('Sex', 'string'),
 ('AgeCategory', 'string'),
 ('Race', 'string'),
 ('Diabetic', 'string'),
 ('PhysicalActivity', 'string'),
 ('GenHealth', 'string'),
 ('SleepTime', 'double'),
 ('Asthma', 'string'),
 ('KidneyDisease', 'string'),
 ('SkinCancer', 'string'),
 ('capture_date', 'date')]

### Evaluating Categorical Data Health

Viewing the categorical to draw conclusion on its treatment.

In [None]:
# Extract string Columns details
string_cols = data_type_col(df = df)

#  To see if there are infilteration within the dataframe for categorical features
col_details = {}
for col in string_cols:
  types = df.select(col).distinct().rdd.flatMap(lambda x: x).collect()
  col_details[col] = types

# Checking Anolmalies: (Col Names, Categories)
for col in (col_details):
  print(col, col_details[col])

HeartDisease ['Yes', 'No']
Smoking ['Yes', 'No']
AlcoholDrinking ['Yes', 'No']
Stroke ['Yes', 'No']
DiffWalking ['Yes', 'No']
Sex ['Female', 'Male']
AgeCategory ['50-54', '70-74', '18-24', '40-44', '45-49', '65-69', '25-29', '35-39', '30-34', '55-59', '80 or older', '60-64', '75-79']
Race ['Black', 'Hispanic', 'White', 'Other', 'American Indian/Alaskan Native', 'Asian']
Diabetic ['No, borderline diabetes', 'Yes', 'Yes (during pregnancy)', 'No']
PhysicalActivity ['Yes', 'No']
GenHealth ['Good', 'Fair', 'Very good', 'Excellent', 'Poor']
Asthma ['Yes', 'No']
KidneyDisease ['Yes', 'No']
SkinCancer ['Yes', 'No']


#### Binary Category features.

In [None]:
# Segregation of two categories features.
# Set up one Hot Encoder for 2 Categories
two_categories = []
for _col_ in col_details:
  if len(col_details[_col_]) == 2:
    two_categories.append(_col_)
print(two_categories)

['HeartDisease', 'Smoking', 'AlcoholDrinking', 'Stroke', 'DiffWalking', 'Sex', 'PhysicalActivity', 'Asthma', 'KidneyDisease', 'SkinCancer']


### Feature Engineering Pipeline Steps.

* Step 1: One Hot Encoding for Binary Variables.
* Step 2: One Hot Encoding for muti-Class Variables.
* Step 3: Ordinal Mapping to categorical Variable using Mapping definition.
* Step 4: Apply Regex condition to extract Number from text.
* Step 5: Drop Unnecessary columns.
* Step 6: Create Vector based on input features.

In [None]:
# Listing Two categories
two_categories = ['HeartDisease', 'Smoking', 'AlcoholDrinking', 'Stroke', 'DiffWalking', 
                  'Sex', 'PhysicalActivity', 'Asthma', 'KidneyDisease', 'SkinCancer']

# Mapping only Yes Category in Encoding for Heart Disease and other 
binary_indicator =  {'HeartDisease': ['No'], 
                    'Smoking': ['No'],            
                 'AlcoholDrinking': ['No'],
                 'Stroke':['No'],
                'PhysicalHealth': ['No'],
                 'MentalHealth': ['No'],
                 'DiffWalking' : ['No'], 
                 'Sex': ['Male'],
                 'PhysicalActivity': ['No'],
                 'Asthma' :  ['No'],
                 'KidneyDisease' :['No'],
                 'SkinCancer': ['No']}

#  Setup Ordinal Piece
# Numerical Mapping to Categorical columns
ordinal_dict = {"GenHealth": {'Excellent': 5.0,'Very good': 4.5,
                              'Good': 4,'Fair': 3,'Poor': 2},
                'Diabetic' : { 'No, borderline diabetes': 1 , 'No': 0, 'Yes': 3} }

# Pipeline Steps
# excluding categories from One Hot Encoding 
excludng_categories = {'Diabetic' :  ['No, borderline diabetes', 'No', 'Yes'],
                       'Race': ['Other']}

# Step 1 : One Hot encoding for two level categories Yes or No's
step_1_one_hot_enc=  one_hot_encoder_pipeline(col_list = two_categories, exclude_cat = binary_indicator, drop_cols = two_categories,
                                              drop_orignal =True,drop_last_col= True)
# Step 2 : One Hot Encoding for Female who are diabetice during Pregnancy, as it is non comparable to Normal Diabetic condtions.  
# Excluding Others from Race as it's not interpretable from Business Perspective.
step_2_diabetic_enc_pregnancy = one_hot_encoder_pipeline(col_list = ['Diabetic', 'Race'],exclude_cat = excludng_categories,
                                                         drop_orignal =True, drop_cols = ['Race'], drop_last_col= False)
# Ordinal Mapping (for Genhealth Diabetics)
step_3_ordinal_mapping = ordinal_label_mapping_pipeline(mapping_ = ordinal_dict, drop_orignal =True,
                                                        replace_na = True, replace_na_val = 0)

# Step 4 Regex Expresion, to Extract out  first characters from the age categorical binning column
exp_ = '^.{0,2}'
step_4_regex = extract_regex_expr_pipeline(cols_ = ['AgeCategory'] , expr = exp_, first_value = True)

# Step 5  Dropping unessary Columns. 
step_5_drop_columns = drop_columns_pipeline( cols = ['capture_date'])


# Create model input Feature vector
exc_cols = ['HeartDisease_Yes', 'capture_date']
create_feature_pipeline = create_features_and_transform_pipeline(exc_cols = exc_cols)

#### Feature Engineering Pipeline Execution

In [None]:
# Pipeline Execution for Feature Engineering Steps
Feature_eng_Pipeline =  Pipeline(stages=[step_1_one_hot_enc, step_2_diabetic_enc_pregnancy, 
                                     step_3_ordinal_mapping, step_4_regex, step_5_drop_columns,create_feature_pipeline])
Featpip = Feature_eng_Pipeline.fit(df)
df_transform = Featpip.transform(df)

#### Comparing Both Data Frames

In [None]:
print('---------------------------------------------------Original DF---------------------------------------------------')
df.show(3)
print('---------------------------------------------------Transformed DF---------------------------------------------------')
df_transform.show(3)


+------------+-----+-------+---------------+------+--------------+------------+-----------+------+-----------+-----+--------+----------------+---------+---------+------+-------------+----------+------------+
|HeartDisease|  BMI|Smoking|AlcoholDrinking|Stroke|PhysicalHealth|MentalHealth|DiffWalking|   Sex|AgeCategory| Race|Diabetic|PhysicalActivity|GenHealth|SleepTime|Asthma|KidneyDisease|SkinCancer|capture_date|
+------------+-----+-------+---------------+------+--------------+------------+-----------+------+-----------+-----+--------+----------------+---------+---------+------+-------------+----------+------------+
|          No|32.12|    Yes|             No|    No|           0.0|         0.0|        Yes|Female|80 or older|White|      No|              No|     Good|      9.0|    No|           No|       Yes|  2020-01-01|
|          No|28.25|     No|             No|    No|           0.0|        14.0|         No|Female|      60-64|White|      No|             Yes|Very good|      6.0|    N

In [None]:
# Checking Data types again on tranformed DF
df_transform.dtypes

[('BMI', 'double'),
 ('PhysicalHealth', 'double'),
 ('MentalHealth', 'double'),
 ('SleepTime', 'double'),
 ('HeartDisease_Yes', 'int'),
 ('Smoking_Yes', 'int'),
 ('AlcoholDrinking_Yes', 'int'),
 ('Stroke_Yes', 'int'),
 ('DiffWalking_Yes', 'int'),
 ('Sex_Female', 'int'),
 ('PhysicalActivity_Yes', 'int'),
 ('Asthma_Yes', 'int'),
 ('KidneyDisease_Yes', 'int'),
 ('SkinCancer_Yes', 'int'),
 ('Diabetic_Yes_during_pregnancy', 'int'),
 ('Race_American_IndianAlaskan_Native', 'int'),
 ('Race_Asian', 'int'),
 ('Race_Black', 'int'),
 ('Race_Hispanic', 'int'),
 ('Race_White', 'int'),
 ('GenHealth_transformed', 'float'),
 ('Diabetic_transformed', 'float'),
 ('AgeCategory_transformed', 'float'),
 ('features', 'vector')]

In [None]:
# Observing Target Variable Distribution
df_transform.groupBy('HeartDisease_Yes').agg(F.count('HeartDisease_Yes').alias('category_count')).show()

+----------------+--------------+
|HeartDisease_Yes|category_count|
+----------------+--------------+
|               0|        182592|
|               1|         17136|
+----------------+--------------+



In [None]:
# Splitting Data in Train and Test
# Purpose of adding repartition col is to make sure there is consistency in split of train and test data. 
train_, test_ =  train_test_split_spark(df = df_transform, train_split = 0.5, test_split = 0.5,
                                        dep_var = 'HeartDisease_Yes',
                                        repartition_col = 'BMI') # Preferred to be most unique column in dataframe like primary key identifier. 
                                                                # (Using BMI for reference as there are no unque key identfiers in Dataframe. )

#### Saving Test Data for Monitoring.


In [None]:
%%capture
test_.drop(*['features']).repartition(1).write.format("parquet").mode("append").save("test_df.parquet")
# Zipping Data
!zip -r test_df.zip /content/test_df.parquet

### Model Experiments. 

As Save Model == True all best trained Model of each model cateogry will be saved based on training Configs `train_config`.

In [None]:
# Model Scores
label_dictionary_ = {0.0: "Heart Disease No", 1.0: "Heart Disease Yes"}
# GBTClassifier, RandomForestClassifier, LogisticRegression
model_list_= [GBTClassifier,LogisticRegression]


model_experiment = Binary_model_Experiment( model_list = model_list_ ,train_df = train_,test_df =  test_,
                                           y_label =  'HeartDisease_Yes',input_feature = 'features' , 
                                          configs_ = train_config, label_dictionary = label_dictionary_, 
                                          save_model = True, model_location =  os.path.join(os.getcwd(),'saved_models'))

Iter Model : GBTClassifier
Iter Model : LogisticRegression


### Experiments Results

In [None]:
model_experiment['train_results'].show()

+-------------+-----------------+--------------------+------------+-------------------+--------------------+
|Model_version|   Label_category|         Metric_name|capture_date|      GBTClassifier|  LogisticRegression|
+-------------+-----------------+--------------------+------------+-------------------+--------------------+
|            1| Weighted_Overall|            Accuracy|  2022-11-15| 0.9098583056979198|  0.9157170133654909|
|            1| Weighted_Overall|Weighted F(0.5) S...|  2022-11-15|  0.878837398705619|  0.8629843650069577|
|            1| Heart Disease No|              recall|  2022-11-15| 0.9832624487597947|   0.998648247667403|
|            1|Heart Disease Yes|          F1 Measure|  2022-11-15| 0.1926192619261926|0.056898684358484206|
|            1| Heart Disease No|          F1 Measure|  2022-11-15| 0.9522643818849449|  0.9558873805403756|
|            1| Weighted_Overall|     Weighted recall|  2022-11-15| 0.9098583056979198|  0.9157170133654909|
|            1| Wei

### Insights

It is evident from the experiments results that `Logistic regression` has performed better compared to GBTClassifier, as it has performed optimum at following points: `avg PR curve area` & `AUC ROC curve`. 

#### Saving Models

In [None]:
%%capture
# This Model will be passed to the Repo under model artifacts.
!zip -r saved_models_1.zip /content/saved_models
from google.colab import files
files.download('saved_models_1.zip')

#### Loading Saved Model within Environment
(The trained models are saved to repo location and are loaded from there) 

In [None]:
%%capture
os.environ['model_path'] = 'ML_Ops_Practices/Models'
os.environ['model_file'] = 'saved_models_1.zip'
!unzip $dir/$model_path/$model_file -d output/

### Selecting Model Threshold (Cut-off)

Observing the Precision Recall Curve and locking model Cut-off for production

In [None]:
recall_pr_dict = recall_precision_selection(model_ = 'LogisticRegression', 
                                            model_location = os.path.join(os.getcwd(),'output/content/saved_models'),
                                            df = test_ , return_graph = True, select_metric = 'precision',
                                            threshold = 0.4, print_cut_off = True)

To attain  precision  0.4  the LogisticRegression Model threshold cut-off has to be set to  0.22


#### Saving Model Artifacts

In [None]:
# Saving Model threshold.
with open("recall_pr_threshold.json", "w") as outfile:
    json.dump(recall_pr_dict, outfile)

# Categorical Continous features stores to json
# Using a thumb rule segregate the categorical and Continous Variables: Variables with two distinct values Categorical else Continous
# This definition applies here but  has to be verified accross. 
exclude_cols= exc_cols_ + ['features']
input_feat = [feature for feature in test_.columns if feature not in exclude_cols]

cat_cols = [] ; cont_cols = [] ; 
for col in input_feat:
  col_len = len(test_.select(col).distinct().collect())
  if col_len > 2:
    cont_cols.append(col)
  if col_len <= 2:
    cat_cols.append(col)

# Saving Cateogrical and continous Variables Data types. 
col_dtypes_json = {'category_col': cat_cols ,
                   'Cont_cols' : cont_cols }

with open("feature_dtypes.json", "w") as outfile:
    json.dump(col_dtypes_json, outfile)

#### Packages and versions installed within the Python environment.
(Just to be used for cross validation of versions)

In [None]:
!pip freeze

findspark==2.0.1
json==2.0.9
numpy==1.21.6
pandas==1.3.5
pyspark==3.0.0
re==2.2.1
absl-py==1.3.0
aeppl==0.0.33
aesara==2.7.9
aiohttp==3.8.3
aiosignal==1.3.1
alabaster==0.7.12
albumentations==1.2.1
altair==4.2.0
appdirs==1.4.4
arviz==0.12.1
astor==0.8.1
astropy==4.3.1
astunparse==1.6.3
async-timeout==4.0.2
asynctest==0.13.0
atari-py==0.2.9
atomicwrites==1.4.1
attrs==22.1.0
audioread==3.0.0
autograd==1.5
Babel==2.11.0
backcall==0.2.0
beautifulsoup4==4.6.3
bleach==5.0.1
blis==0.7.9
bokeh==2.3.3
branca==0.6.0
bs4==0.0.1
CacheControl==0.12.11
cached-property==1.5.2
cachetools==5.2.0
catalogue==2.0.8
certifi==2022.9.24
cffi==1.15.1
cftime==1.6.2
chardet==3.0.4
charset-normalizer==2.1.1
click==7.1.2
clikit==0.6.2
cloudpickle==1.5.0
cmake==3.22.6
cmdstanpy==1.0.8
colorcet==3.0.1
colorlover==0.3.0
community==1.0.0b1
confection==0.0.3
cons==0.4.5
contextlib2==0.5.5
convertdate==2.4.0
crashtest==0.3.1
crcmod==1.7
cufflinks==0.17.3
cvxopt==1.3.0
cvxpy==1.2.2
cycler==0.11.0
cymem==2.0.7
Cython==0.2