In [None]:
#importing required libraries
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
#initializing the Spark session
spark = SparkSession.builder \
    .appName("RandomForestExample") \
    .getOrCreate()

In [None]:
!pip install pymongo

[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m
Collecting pymongo
  Downloading pymongo-4.6.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (677 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 677.2/677.2 kB 6.6 MB/s eta 0:00:00
Collecting dnspython<3.0.0,>=1.16.0
  Downloading dnspython-2.6.1-py3-none-any.whl (307 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 307.7/307.7 kB 10.2 MB/s eta 0:00:00
Installing collected packages: dnspython, pymongo
Successfully installed dnspython-2.6.1 pymongo-4.6.2
[43mNote: you may need to restart the kernel using dbutils.library.restartPython() to use updated packages.[0m


In [None]:
#initalizing connection to the Mongo database
from pymongo import MongoClient
client= MongoClient("mongodb+srv://<user_name>:<password>@cluster0.fclxeov.mongodb.net/?retryWrites=true&w=majority&appName=Cluster0")

In [None]:
#Obtaining the dataset
db_train = client.get_database('Training')
collections = db_train.Obesity_project


In [None]:
collections.count_documents({})

20758

In [None]:
train_data = list(collections.find())

In [None]:
#Defining the schema of the dataset
import pyspark.sql.types as T
schema = T.StructType([
    T.StructField("_id", T.StringType(), True),
    T.StructField("id", T.IntegerType(), True),
    T.StructField("Gender", T.StringType(), True),
    T.StructField('family_history_with_overweight', T.StringType(), True),
    T.StructField('FAVC', T.StringType(), True),
    T.StructField('CAEC', T.StringType(), True),
    T.StructField('SMOKE', T.StringType(), True),
    T.StructField('CALC', T.StringType(), True),
    T.StructField('MTRANS', T.StringType(), True),
    T.StructField('NObeyesdad', T.StringType(), True),
    T.StructField('SCC', T.StringType(), True),
    T.StructField('Age', T.FloatType(), True),
    T.StructField("Height", T.FloatType(), True),
    T.StructField("Weight", T.FloatType(), True),
    T.StructField('FCVC', T.FloatType(), True),
    T.StructField('NCP', T.FloatType(), True),
    T.StructField("CH2O", T.FloatType(), True),
    T.StructField("FAF", T.FloatType(), True),
    T.StructField("TUE", T.FloatType(), True),
])

df = spark.createDataFrame(train_data, schema=schema)

In [None]:
df.printSchema()

root
 |-- _id: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- family_history_with_overweight: string (nullable = true)
 |-- FAVC: string (nullable = true)
 |-- CAEC: string (nullable = true)
 |-- SMOKE: string (nullable = true)
 |-- CALC: string (nullable = true)
 |-- MTRANS: string (nullable = true)
 |-- NObeyesdad: string (nullable = true)
 |-- SCC: string (nullable = true)
 |-- Age: float (nullable = true)
 |-- Height: float (nullable = true)
 |-- Weight: float (nullable = true)
 |-- FCVC: float (nullable = true)
 |-- NCP: float (nullable = true)
 |-- CH2O: float (nullable = true)
 |-- FAF: float (nullable = true)
 |-- TUE: float (nullable = true)



In [None]:
df.show(3)

+---+------+---------+--------+---------+------------------------------+----+--------+--------+----------+-----+--------+---+--------+--------+---------+--------------------+-------------------+
| id|Gender|      Age|  Height|   Weight|family_history_with_overweight|FAVC|    FCVC|     NCP|      CAEC|SMOKE|    CH2O|SCC|     FAF|     TUE|     CALC|              MTRANS|         NObeyesdad|
+---+------+---------+--------+---------+------------------------------+----+--------+--------+----------+-----+--------+---+--------+--------+---------+--------------------+-------------------+
|  0|  Male|24.443011|1.699998| 81.66995|                           yes| yes|     2.0|2.983297| Sometimes|   no|2.763573| no|     0.0|0.976473|Sometimes|Public_Transporta...|Overweight_Level_II|
|  1|Female|     18.0|    1.56|     57.0|                           yes| yes|     2.0|     3.0|Frequently|   no|     2.0| no|     1.0|     1.0|       no|          Automobile|      Normal_Weight|
|  2|Female|     18.0| 1.

In [None]:
#Indexing categorical variables in the dataset
indexer = StringIndexer(inputCols=['Gender', 'family_history_with_overweight', 'FAVC', 'CAEC', 'SMOKE', 'CALC', 'MTRANS', 'NObeyesdad', 'SCC'],
                        outputCols=['Gender_X', 'family_history_with_overweight_X', 'FAVC_X', 'CAEC_X', 'SMOKE_X', 'CALC_X', 'MTRANS_X', 'NObeyesdad_X', 'SCC_X'])
df_indexed = indexer.fit(df).transform(df)

In [None]:
df_indexed.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Height: double (nullable = true)
 |-- Weight: double (nullable = true)
 |-- family_history_with_overweight: string (nullable = true)
 |-- FAVC: string (nullable = true)
 |-- FCVC: double (nullable = true)
 |-- NCP: double (nullable = true)
 |-- CAEC: string (nullable = true)
 |-- SMOKE: string (nullable = true)
 |-- CH2O: double (nullable = true)
 |-- SCC: string (nullable = true)
 |-- FAF: double (nullable = true)
 |-- TUE: double (nullable = true)
 |-- CALC: string (nullable = true)
 |-- MTRANS: string (nullable = true)
 |-- NObeyesdad: string (nullable = true)
 |-- Gender_X: double (nullable = false)
 |-- family_history_with_overweight_X: double (nullable = false)
 |-- FAVC_X: double (nullable = false)
 |-- CAEC_X: double (nullable = false)
 |-- SMOKE_X: double (nullable = false)
 |-- CALC_X: double (nullable = false)
 |-- MTRANS_X: double (nullable = false)
 |-- NOb

In [None]:
#One-hot encoding required variables in the dataset
encoder = OneHotEncoder(inputCols=["CAEC_X", "CALC_X", "MTRANS_X", "NObeyesdad_X"], outputCols=["CAEC_Y", "CALC_Y", "MTRANS_Y", "NObeyesdad_Y"])
df_encoded = encoder.fit(df_indexed).transform(df_indexed)


In [None]:
df_encoded.printSchema()

root
 |-- id: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Height: double (nullable = true)
 |-- Weight: double (nullable = true)
 |-- family_history_with_overweight: string (nullable = true)
 |-- FAVC: string (nullable = true)
 |-- FCVC: double (nullable = true)
 |-- NCP: double (nullable = true)
 |-- CAEC: string (nullable = true)
 |-- SMOKE: string (nullable = true)
 |-- CH2O: double (nullable = true)
 |-- SCC: string (nullable = true)
 |-- FAF: double (nullable = true)
 |-- TUE: double (nullable = true)
 |-- CALC: string (nullable = true)
 |-- MTRANS: string (nullable = true)
 |-- NObeyesdad: string (nullable = true)
 |-- Gender_X: double (nullable = false)
 |-- family_history_with_overweight_X: double (nullable = false)
 |-- FAVC_X: double (nullable = false)
 |-- CAEC_X: double (nullable = false)
 |-- SMOKE_X: double (nullable = false)
 |-- CALC_X: double (nullable = false)
 |-- MTRANS_X: double (nullable = false)
 |-- NOb

In [None]:
#Creating a vector assembler for running the classification
feature_assembler = VectorAssembler(inputCols=['MTRANS_Y', 'CALC_Y', 'CAEC_Y', 'SCC_X','SMOKE_X','FAVC_X', 'family_history_with_overweight_X', 'Gender_X',
                           'TUE', 'FAF', 'CH2O', 'NCP', 'FCVC', 'Weight', 'Height','Age'], outputCol = 'Features')
output = feature_assembler.transform(df_encoded)

In [None]:
finalized_data = output.select('Features', 'NObeyesdad_X')

In [None]:
finalized_data.show()

+--------------------+------------+
|            Features|NObeyesdad_X|
+--------------------+------------+
|(22,[0,4,6,13,14,...|         5.0|
|(22,[1,5,7,14,15,...|         2.0|
|(22,[0,5,6,14,15,...|         4.0|
|(22,[0,4,6,14,15,...|         0.0|
|(22,[0,4,6,13,14,...|         5.0|
|(22,[0,4,6,13,14,...|         4.0|
|(22,[1,4,6,13,14,...|         1.0|
|(22,[1,4,6,13,15,...|         1.0|
|[1.0,0.0,0.0,0.0,...|         6.0|
|(22,[0,4,6,14,16,...|         0.0|
|(22,[0,4,6,15,16,...|         6.0|
|(22,[2,5,11,13,14...|         2.0|
|(22,[0,5,6,13,14,...|         3.0|
|(22,[0,4,6,14,15,...|         0.0|
|(22,[2,5,6,11,12,...|         2.0|
|(22,[0,4,6,13,14,...|         1.0|
|(22,[1,5,8,11,16,...|         5.0|
|(22,[0,4,6,12,15,...|         2.0|
|(22,[0,5,6,13,14,...|         5.0|
|(22,[0,4,6,13,14,...|         1.0|
+--------------------+------------+
only showing top 20 rows



In [None]:
#Running the Random Forrest Algorithm after splitting data into test and train
(trainingData, testData) = finalized_data.randomSplit([0.7, 0.3])
rf = RandomForestClassifier(labelCol="NObeyesdad_X", featuresCol="Features", maxDepth=3, numTrees=100)
classifier = rf.fit(trainingData)


In [None]:
predictions = classifier.evaluate(testData)

In [None]:
predictions.predictions.show(5)

+--------------------+------------+--------------------+--------------------+----------+
|            Features|NObeyesdad_X|       rawPrediction|         probability|prediction|
+--------------------+------------+--------------------+--------------------+----------+
|(22,[0,4,6,9,11,1...|         4.0|[10.7468252695954...|[0.10746825269595...|       4.0|
|(22,[0,4,6,9,11,1...|         4.0|[10.7431002803368...|[0.10743100280336...|       4.0|
|(22,[0,4,6,9,11,1...|         5.0|[1.47512822730725...|[0.01475128227307...|       3.0|
|(22,[0,4,6,9,11,1...|         5.0|[22.0241283828055...|[0.22024128382805...|       0.0|
|(22,[0,4,6,9,11,1...|         6.0|[3.79126558455488...|[0.03791265584554...|       2.0|
+--------------------+------------+--------------------+--------------------+----------+
only showing top 5 rows



In [None]:
#Evalating to check testing accuracy
evaluator = MulticlassClassificationEvaluator(
    labelCol="NObeyesdad_X", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions.predictions)


In [None]:
print(accuracy)

0.8906319796954315
