In [0]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [0]:
# Create a spark session
spark = SparkSession.builder.appName('classification').getOrCreate()

In [0]:
# File location and type
file_location = "/FileStore/tables/ObesityDataSet.csv"
file_type = "csv"

# Load data
df = spark.read.csv(file_location, header=True, inferSchema=True)

In [0]:
# Display data
df.show()

+------+----+------+------+------------------------------+----+----+---+----------+-----+----+---+---+---+----------+--------------------+-------------------+
|Gender| Age|Height|Weight|family_history_with_overweight|FAVC|FCVC|NCP|      CAEC|SMOKE|CH2O|SCC|FAF|TUE|      CALC|              MTRANS|         NObeyesdad|
+------+----+------+------+------------------------------+----+----+---+----------+-----+----+---+---+---+----------+--------------------+-------------------+
|Female|21.0|  1.62|  64.0|                           yes|  no| 2.0|3.0| Sometimes|   no| 2.0| no|0.0|1.0|        no|Public_Transporta...|      Normal_Weight|
|Female|21.0|  1.52|  56.0|                           yes|  no| 3.0|3.0| Sometimes|  yes| 3.0|yes|3.0|0.0| Sometimes|Public_Transporta...|      Normal_Weight|
|  Male|23.0|   1.8|  77.0|                           yes|  no| 2.0|3.0| Sometimes|   no| 2.0| no|2.0|1.0|Frequently|Public_Transporta...|      Normal_Weight|
|  Male|27.0|   1.8|  87.0|                   

In [0]:
# Check for null values
from pyspark.sql.functions import when, col, count, isnan
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+------+---+------+------+------------------------------+----+----+---+----+-----+----+---+---+---+----+------+----------+
|Gender|Age|Height|Weight|family_history_with_overweight|FAVC|FCVC|NCP|CAEC|SMOKE|CH2O|SCC|FAF|TUE|CALC|MTRANS|NObeyesdad|
+------+---+------+------+------------------------------+----+----+---+----+-----+----+---+---+---+----+------+----------+
|     0|  0|     0|     0|                             0|   0|   0|  0|   0|    0|   0|  0|  0|  0|   0|     0|         0|
+------+---+------+------+------------------------------+----+----+---+----+-----+----+---+---+---+----+------+----------+



In [0]:
# Get datatypes of columns to divide cols into categorical and continuous values
df.printSchema()

root
 |-- Gender: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Height: double (nullable = true)
 |-- Weight: double (nullable = true)
 |-- family_history_with_overweight: string (nullable = true)
 |-- FAVC: string (nullable = true)
 |-- FCVC: double (nullable = true)
 |-- NCP: double (nullable = true)
 |-- CAEC: string (nullable = true)
 |-- SMOKE: string (nullable = true)
 |-- CH2O: double (nullable = true)
 |-- SCC: string (nullable = true)
 |-- FAF: double (nullable = true)
 |-- TUE: double (nullable = true)
 |-- CALC: string (nullable = true)
 |-- MTRANS: string (nullable = true)
 |-- NObeyesdad: string (nullable = true)



In [0]:
# Divide the data into categorical and continous columns
categoricalCols = [item[0] for item in df.dtypes if item[1].startswith('string')]
continuousCols = [i for i in df.columns if i not in categoricalCols]


In [0]:
# Categorical to numerical - Encoding
indexer = StringIndexer(inputCols = categoricalCols, 
                        outputCols = [f"{i}_idx" for i in categoricalCols])
df_idx = indexer.fit(df).transform(df)

In [0]:
df_idx.show()

+------+----+------+------+------------------------------+----+----+---+----------+-----+----+---+---+---+----------+--------------------+-------------------+----------+----------------------------------+--------+--------+---------+-------+--------+----------+--------------+
|Gender| Age|Height|Weight|family_history_with_overweight|FAVC|FCVC|NCP|      CAEC|SMOKE|CH2O|SCC|FAF|TUE|      CALC|              MTRANS|         NObeyesdad|Gender_idx|family_history_with_overweight_idx|FAVC_idx|CAEC_idx|SMOKE_idx|SCC_idx|CALC_idx|MTRANS_idx|NObeyesdad_idx|
+------+----+------+------+------------------------------+----+----+---+----------+-----+----+---+---+---+----------+--------------------+-------------------+----------+----------------------------------+--------+--------+---------+-------+--------+----------+--------------+
|Female|21.0|  1.62|  64.0|                           yes|  no| 2.0|3.0| Sometimes|   no| 2.0| no|0.0|1.0|        no|Public_Transporta...|      Normal_Weight|       1.0|   

In [0]:
# Display the value counts of each categorical columns
for i in [i for i in df_idx.columns if '_idx' in i]:
    df_idx.groupBy(i).count().show()

+----------+-----+
|Gender_idx|count|
+----------+-----+
|       0.0| 1068|
|       1.0| 1043|
+----------+-----+

+----------------------------------+-----+
|family_history_with_overweight_idx|count|
+----------------------------------+-----+
|                               0.0| 1726|
|                               1.0|  385|
+----------------------------------+-----+

+--------+-----+
|FAVC_idx|count|
+--------+-----+
|     0.0| 1866|
|     1.0|  245|
+--------+-----+

+--------+-----+
|CAEC_idx|count|
+--------+-----+
|     0.0| 1765|
|     1.0|  242|
|     3.0|   51|
|     2.0|   53|
+--------+-----+

+---------+-----+
|SMOKE_idx|count|
+---------+-----+
|      0.0| 2067|
|      1.0|   44|
+---------+-----+

+-------+-----+
|SCC_idx|count|
+-------+-----+
|    0.0| 2015|
|    1.0|   96|
+-------+-----+

+--------+-----+
|CALC_idx|count|
+--------+-----+
|     0.0| 1401|
|     1.0|  639|
|     3.0|    1|
|     2.0|   70|
+--------+-----+

+----------+-----+
|MTRANS_idx|count|
+----

In [0]:
# Assemble all independent features into one vector
featuresAssembler = VectorAssembler(inputCols = ['Age',
                            'Height',
                            'Weight',
                            'FCVC',
                            'NCP',
                            'CH2O',
                            'FAF',
                            'TUE',
                            'Gender_idx',
                            'family_history_with_overweight_idx',
                            'FAVC_idx',
                            'CAEC_idx',
                            'SMOKE_idx',
                            'SCC_idx',
                            'CALC_idx',
                            'MTRANS_idx'],
                outputCol = "Independent_Variables")
features = featuresAssembler.transform(df_idx)

In [0]:
features.show()

+------+----+------+------+------------------------------+----+----+---+----------+-----+----+---+---+---+----------+--------------------+-------------------+----------+----------------------------------+--------+--------+---------+-------+--------+----------+--------------+---------------------+
|Gender| Age|Height|Weight|family_history_with_overweight|FAVC|FCVC|NCP|      CAEC|SMOKE|CH2O|SCC|FAF|TUE|      CALC|              MTRANS|         NObeyesdad|Gender_idx|family_history_with_overweight_idx|FAVC_idx|CAEC_idx|SMOKE_idx|SCC_idx|CALC_idx|MTRANS_idx|NObeyesdad_idx|Independent_Variables|
+------+----+------+------+------------------------------+----+----+---+----------+-----+----+---+---+---+----------+--------------------+-------------------+----------+----------------------------------+--------+--------+---------+-------+--------+----------+--------------+---------------------+
|Female|21.0|  1.62|  64.0|                           yes|  no| 2.0|3.0| Sometimes|   no| 2.0| no|0.0|1.0|

In [0]:
finalData = features.select(["Independent_variables", "NObeyesdad_idx"])
finalData.show()

+---------------------+--------------+
|Independent_variables|NObeyesdad_idx|
+---------------------+--------------+
| [21.0,1.62,64.0,2...|           5.0|
| [21.0,1.52,56.0,3...|           5.0|
| [23.0,1.8,77.0,2....|           5.0|
| [27.0,1.8,87.0,3....|           3.0|
| (16,[0,1,2,3,4,5,...|           4.0|
| (16,[0,1,2,3,4,5,...|           5.0|
| (16,[0,1,2,3,4,5,...|           5.0|
| (16,[0,1,2,3,4,5,...|           5.0|
| (16,[0,1,2,3,4,5,...|           5.0|
| (16,[0,1,2,3,4,5,...|           5.0|
| (16,[0,1,2,3,4,5,...|           0.0|
| [21.0,1.72,80.0,2...|           4.0|
| (16,[0,1,2,3,4,5,...|           5.0|
| [41.0,1.8,99.0,2....|           0.0|
| (16,[0,1,2,3,4,5,...|           5.0|
| [22.0,1.7,66.0,3....|           5.0|
| (16,[0,1,2,3,4,5,...|           4.0|
| [29.0,1.53,78.0,2...|           0.0|
| [30.0,1.71,82.0,3...|           4.0|
| (16,[0,1,2,3,4,5,...|           3.0|
+---------------------+--------------+
only showing top 20 rows



In [0]:
# Split the data into train and test

train_data, test_data = finalData.randomSplit([0.7, 0.3], seed=42)

In [0]:
# Initialize Random forest classifier
rf = RandomForestClassifier(labelCol="NObeyesdad_idx", featuresCol="Independent_variables")

In [0]:
# Define the hyperparameter grid
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10, 20, 30]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .build()

# Create the cross-validator
cross_validator = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=MulticlassClassificationEvaluator(labelCol="NObeyesdad_idx", metricName="accuracy"),
                          numFolds=5, seed=42)

# Train the model with the best hyperparameters
cv_model = cross_validator.fit(train_data)

In [0]:
# Feature importance
best_rf_model = cv_model.bestModel
importances = best_rf_model.featureImportances
feature_list = ['Age',
                'Height',
                'Weight',
                'FCVC',
                'NCP',
                'CH2O',
                'FAF',
                'TUE',
                'Gender',
                'family_history_with_overweight',
                'FAVC',
                'CAEC',
                'SMOKE',
                'SCC',
                'CALC',
                'MTRANS']

print("Feature Importances:")
for feature, importance in zip(feature_list, importances):
    print(f"{feature}: {importance:.4f}")

Feature Importances:
Age: 0.0947
Height: 0.1081
Weight: 0.3346
FCVC: 0.0870
NCP: 0.0523
CH2O: 0.0419
FAF: 0.0420
TUE: 0.0450
Gender: 0.0566
family_history_with_overweight: 0.0251
FAVC: 0.0167
CAEC: 0.0313
SMOKE: 0.0021
SCC: 0.0063
CALC: 0.0313
MTRANS: 0.0251


In [0]:
# Make predictions on the test data
predictions = cv_model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(labelCol="NObeyesdad_idx", metricName="accuracy")

# Evaluate the model
accuracy = evaluator.evaluate(predictions)
print("Test set accuracy = {:.2f}".format(accuracy))

Test set accuracy = 0.95


In [0]:
# create a feature important scores dataframe for visualization

feature_imp_map = {feature_list[i]: round(importances[i], 4) for i in range(len(feature_list))}
feature_map = [(k,)+(float(v),) for k,v in feature_imp_map.items()]
feature_df = spark.createDataFrame(feature_map, schema=['Features', 'FeatureImpScores'])


In [0]:
import plotly.express as px

pyfeature_df = feature_df.toPandas()
x = pyfeature_df['Features']
y = pyfeature_df['FeatureImpScores']

fig = px.bar(pyfeature_df, x, y, color = x, title = "Feature Importance")
fig.show()


In [0]:
# Confusion Matrix
cm_dt_result = predictions.crosstab("prediction", "NObeyesdad_idx")
cm_dt_result = cm_dt_result.toPandas()
cm_dt_result

Unnamed: 0,prediction_NObeyesdad_idx,0.0,1.0,2.0,3.0,4.0,5.0,6.0
0,1.0,0,83,1,0,0,0,0
1,0.0,96,0,0,0,2,0,0
2,5.0,0,0,0,3,0,86,5
3,6.0,0,0,0,0,0,0,75
4,4.0,2,0,0,4,64,1,0
5,2.0,0,1,74,0,1,0,0
6,3.0,1,0,1,68,3,6,0
