# Machine Learning Pipeline

In [1]:
from pyspark.sql import SparkSession, functions as F
import findspark
import pandas as pd

findspark.init("C:\Program Files\Spark\spark-3.3.1-bin-hadoop3")

In [2]:
df = pd.read_parquet("datasets/diabetes_extraction.parquet")

In [3]:
df.head()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,Outcome,Age_Stages,BMI_Stages,Glucose_Health_Stages,Insulin_Health_Stages,BloodPressure_Stages,BP_Health_Stages,BMI_Health_Stages,Health_Ratio
0,6,148.0,72.0,35.0,202.035714,33.6,0.627,50,1,late_adult,obesity,0,0,normal,1,0,25.0
1,1,85.0,66.0,29.0,85.285714,26.6,0.351,31,0,middle_adult,overweight,1,1,normal,1,0,75.0
2,8,183.0,64.0,25.178571,225.25,23.3,0.672,32,1,middle_adult,healthweight,0,0,normal,1,1,50.0
3,1,89.0,66.0,23.0,94.0,28.1,0.167,21,0,early_adult,overweight,1,1,normal,1,0,75.0
4,0,137.0,40.0,35.0,168.0,43.1,2.288,33,1,middle_adult,obesity,1,0,low,0,0,25.0


In [33]:
from feature_utils import get_columns_type

In [34]:
miss_df = pd.read_parquet("datasets/diabets_non_null.parquet")

In [35]:
numeric_cols, categoric_cols, cardinal_cols = get_columns_type(miss_df, categoric_threshold=2)

2023-03-13 01:14:28,724 - logging_utils - DEBUG - get_columns_type executing...
2023-03-13 01:14:28,728 - logging_utils - INFO - 
Numeric Columns: ['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age', 'Outcome']
Categoric Columns: []
Cardinal Columns: []


In [36]:
numeric_cols = [col for col in numeric_cols if col != "Outcome"]

# Control

In [10]:
df = df[['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age', 'Outcome',
        'BloodPressure_Stages', "Insulin_Health_Stages"]]

## Seperating Columns for Type

In [11]:
numeric_cols, categoric_cols, cardinal_cols = get_columns_type(df, categoric_threshold=2)

2023-03-13 01:12:56,928 - logging_utils - DEBUG - get_columns_type executing...
2023-03-13 01:12:56,930 - logging_utils - INFO - 
Numeric Columns: ['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age', 'Outcome', 'Insulin_Health_Stages']
Categoric Columns: ['BloodPressure_Stages']
Cardinal Columns: []


In [12]:
numeric_cols = [col for col in numeric_cols if col != "Outcome"]

In [13]:
target = "Outcome"

## Create Model

## One Hot Encoder

In [14]:
df = pd.get_dummies(df, columns=categoric_cols, drop_first=True)

In [15]:
df.head()

Unnamed: 0,Pregnancies,Glucose,BloodPressure,SkinThickness,Insulin,BMI,DiabetesPedigreeFunction,Age,Outcome,Insulin_Health_Stages,BloodPressure_Stages_hypertension_1,BloodPressure_Stages_hypertension_2,BloodPressure_Stages_low,BloodPressure_Stages_normal,BloodPressure_Stages_prehypertension
0,6,148.0,72.0,35.0,202.035714,33.6,0.627,50,1,0,0,0,0,1,0
1,1,85.0,66.0,29.0,85.285714,26.6,0.351,31,0,1,0,0,0,1,0
2,8,183.0,64.0,25.178571,225.25,23.3,0.672,32,1,0,0,0,0,1,0
3,1,89.0,66.0,23.0,94.0,28.1,0.167,21,0,1,0,0,0,1,0
4,0,137.0,40.0,35.0,168.0,43.1,2.288,33,1,0,0,0,1,0,0


In [16]:
numeric_cols, categoric_cols, cardinal_cols = get_columns_type(df, categoric_threshold=2)

2023-03-13 01:13:05,063 - logging_utils - DEBUG - get_columns_type executing...
2023-03-13 01:13:05,065 - logging_utils - INFO - 
Numeric Columns: ['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI', 'DiabetesPedigreeFunction', 'Age', 'Outcome', 'Insulin_Health_Stages', 'BloodPressure_Stages_hypertension_1', 'BloodPressure_Stages_hypertension_2', 'BloodPressure_Stages_low', 'BloodPressure_Stages_normal', 'BloodPressure_Stages_prehypertension']
Categoric Columns: []
Cardinal Columns: []


In [17]:
numeric_cols = [col for col in numeric_cols if col != "Outcome"]

### Vector Assembler

In [18]:
spark = SparkSession.builder \
    .appName("Diabetes Classification") \
    .master("local[2]") \
    .config("spark.sql.shuffle.partitions", "2") \
    .getOrCreate()

In [19]:
from pyspark.ml.feature import VectorAssembler

In [37]:
assembler = VectorAssembler() \
    .setHandleInvalid("skip") \
    .setInputCols(numeric_cols) \
    .setOutputCol("unscaled_features")

### Scale

In [21]:
from pyspark.ml.feature import RobustScaler

In [38]:
scaler = RobustScaler() \
    .setInputCol("unscaled_features") \
    .setOutputCol("features")

### Estimator

In [23]:
from pyspark.ml.classification import GBTClassifier

In [39]:
estimator = GBTClassifier() \
    .setFeaturesCol("features") \
    .setLabelCol(target)

### Pipeline

In [25]:
from pyspark.ml import Pipeline

In [40]:
pipeline_obj = Pipeline() \
    .setStages([assembler, scaler, estimator])

### Model

In [27]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

In [41]:
evaluator = BinaryClassificationEvaluator(labelCol=target)

In [29]:
def get_score(df, evaluator):
    spark_df = spark.createDataFrame(df)
    
    train_df, test_df = spark_df.randomSplit([.8, .2], seed=142)
    
    pipeline_model = pipeline_obj.fit(train_df)
    transformed_df = pipeline_model.transform(test_df)
    
    print("Score: ", evaluator.evaluate(transformed_df)) 
    
    return transformed_df

# Final Data

In [30]:
tranformed_df = get_score(df, evaluator)

  for column, series in pdf.iteritems():


Score:  0.8594669435790931


In [31]:
tranformed_df.select("Outcome", "prediction").show()

+-------+----------+
|Outcome|prediction|
+-------+----------+
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       1.0|
|      1|       1.0|
|      1|       1.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      1|       0.0|
|      1|       1.0|
|      0|       0.0|
|      0|       0.0|
|      1|       1.0|
+-------+----------+
only showing top 20 rows



# Original Data

In [43]:
tranformed_df = get_score(pd.read_csv("datasets/diabetes.csv"), evaluator)

  for column, series in pdf.iteritems():


Score:  0.8413379073756432


In [44]:
spark.stop()

<div class="alert alert-block alert-info"> <b>Final:</b> Accuracy increased <b>0.8413379073756432<b> to <b>0.8594669435790931<b> </div>