# **Projeto de Disciplina**
## Big Data e Processamento Distribuído 
---
### **Aluno**: Kaio Vinícius Cândido de Souza
### **Dataset Utilizado**: ([Body performance Data](https://www.kaggle.com/kukuroo3/body-performance-data)) disponibilizado pelo Kaggle.
### **Objetivo**: Realizar um ciclo de ciência de dados completo no Spark. 


In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!pip install -q findspark

## 1. Setup do Projeto

In [28]:
import findspark
findspark.init('spark-3.2.0-bin-hadoop3.2')

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.sql.functions import col

from pyspark.ml.classification import DecisionTreeClassifier, RandomForestClassifier, LogisticRegression, FMClassifier

from pyspark.ml.evaluation import RegressionEvaluator, MulticlassClassificationEvaluator, BinaryClassificationEvaluator
spark = SparkSession.builder.getOrCreate()

print(spark)

<pyspark.sql.session.SparkSession object at 0x7f0b546ae590>


## 2. Exploração de dados

In [3]:
df = spark.read.csv('/content/bodyPerformance.csv', header=True, inferSchema=True, nullValue='NA')

body_performance_df = df.withColumnRenamed("body fat_%","body_fat").withColumnRenamed("sit and bend forward_cm","sit_bend_forward").withColumnRenamed("sit-ups counts","sit_ups_counts").withColumnRenamed("broad jump_cm","broad_jump_cm")
                        

In [36]:
body_performance_df.show(10)
body_performance_df.groupBy('class').count().show()

+---+------+---------+---------+--------+---------+--------+---------+----------------+--------------+-------------+-----+
|age|gender|height_cm|weight_kg|body_fat|diastolic|systolic|gripForce|sit_bend_forward|sit_ups_counts|broad_jump_cm|class|
+---+------+---------+---------+--------+---------+--------+---------+----------------+--------------+-------------+-----+
| 27|     M|    172.3|    75.24|    21.3|       80|     130|     54.9|            18.4|            60|          217|    C|
| 25|     M|    165.0|     55.8|    15.7|       77|     126|     36.4|            16.3|            53|          229|    A|
| 31|     M|    179.6|     78.0|    20.1|       92|     152|     44.8|            12.0|            49|          181|    C|
| 32|     M|    174.5|     71.1|    18.4|       76|     147|     41.4|            15.2|            53|          219|    B|
| 28|     M|    173.8|     67.7|    17.1|       70|     127|     43.5|            27.1|            45|          217|    B|
| 36|     F|    

Este é a base do nosso dataset, temos 4 classes que representam a "performance" de aptidão física de A a D sendo A a melhor classificação. O intuito é modelar os dados para que possamos treinar um modelo que classifique a performance de um ser humano baseado em sua aptidão física.

In [5]:
print('Mudando o tipo de variaveis para o tipo inteiro')
body_performance_df = body_performance_df.withColumn('age',col('age').cast("integer"))
body_performance_df = body_performance_df.withColumn('diastolic',col('diastolic').cast("integer"))
body_performance_df = body_performance_df.withColumn('systolic',col('systolic').cast("integer"))
body_performance_df = body_performance_df.withColumn('sit_ups_counts',col('sit_ups_counts').cast("integer"))
body_performance_df = body_performance_df.withColumn('broad_jump_cm',col('broad_jump_cm').cast("integer"))
body_performance_df.printSchema()

Mudando o tipo de variaveis para o tipo inteiro
root
 |-- age: integer (nullable = true)
 |-- gender: string (nullable = true)
 |-- height_cm: double (nullable = true)
 |-- weight_kg: double (nullable = true)
 |-- body_fat: double (nullable = true)
 |-- diastolic: integer (nullable = true)
 |-- systolic: integer (nullable = true)
 |-- gripForce: double (nullable = true)
 |-- sit_bend_forward: double (nullable = true)
 |-- sit_ups_counts: integer (nullable = true)
 |-- broad_jump_cm: integer (nullable = true)
 |-- class: string (nullable = true)



In [6]:
body_performance_df.show(10)

+---+------+---------+---------+--------+---------+--------+---------+----------------+--------------+-------------+-----+
|age|gender|height_cm|weight_kg|body_fat|diastolic|systolic|gripForce|sit_bend_forward|sit_ups_counts|broad_jump_cm|class|
+---+------+---------+---------+--------+---------+--------+---------+----------------+--------------+-------------+-----+
| 27|     M|    172.3|    75.24|    21.3|       80|     130|     54.9|            18.4|            60|          217|    C|
| 25|     M|    165.0|     55.8|    15.7|       77|     126|     36.4|            16.3|            53|          229|    A|
| 31|     M|    179.6|     78.0|    20.1|       92|     152|     44.8|            12.0|            49|          181|    C|
| 32|     M|    174.5|     71.1|    18.4|       76|     147|     41.4|            15.2|            53|          219|    B|
| 28|     M|    173.8|     67.7|    17.1|       70|     127|     43.5|            27.1|            45|          217|    B|
| 36|     F|    

#### 2.1 Indexando o gênero e a classe.

In [7]:
print('Indexando "gender"')
indexer = StringIndexer(inputCol='gender', outputCol='gender_idx')
indexer_model = indexer.fit(body_performance_df)
df_g_indexed = indexer_model.transform(body_performance_df)
df_g_indexed.select('gender', 'gender_idx').distinct().show()

print('Indexando "class"')
indexer = StringIndexer(inputCol='class', outputCol='class_idx')
indexer_model = indexer.fit(df_g_indexed)
body_df_indexed = indexer_model.transform(df_g_indexed)
body_df_indexed.select('class', 'class_idx').distinct().show()

Indexando "gender"
+------+----------+
|gender|gender_idx|
+------+----------+
|     F|       1.0|
|     M|       0.0|
+------+----------+

Indexando "class"
+-----+---------+
|class|class_idx|
+-----+---------+
|    B|      3.0|
|    C|      0.0|
|    D|      1.0|
|    A|      2.0|
+-----+---------+



Testando OneHotEncoder

In [16]:
body_df_indexed = body_df_indexed.drop(*['gender', 'class'])
body_df_indexed.show(10)

+---+---------+---------+--------+---------+--------+---------+----------------+--------------+-------------+----------+---------+
|age|height_cm|weight_kg|body_fat|diastolic|systolic|gripForce|sit_bend_forward|sit_ups_counts|broad_jump_cm|gender_idx|class_idx|
+---+---------+---------+--------+---------+--------+---------+----------------+--------------+-------------+----------+---------+
| 27|    172.3|    75.24|    21.3|       80|     130|     54.9|            18.4|            60|          217|       0.0|      0.0|
| 25|    165.0|     55.8|    15.7|       77|     126|     36.4|            16.3|            53|          229|       0.0|      2.0|
| 31|    179.6|     78.0|    20.1|       92|     152|     44.8|            12.0|            49|          181|       0.0|      0.0|
| 32|    174.5|     71.1|    18.4|       76|     147|     41.4|            15.2|            53|          219|       0.0|      3.0|
| 28|    173.8|     67.7|    17.1|       70|     127|     43.5|            27.1|   

#### 2.2 Simplificando os dados de treinamento em 2 colunas (features e class).

In [18]:
cols_to_use = ['age','height_cm','weight_kg','body_fat','diastolic', 'systolic', 'gripForce', 'sit_bend_forward', 'sit_ups_counts', 'broad_jump_cm', 'gender_idx']
vec = VectorAssembler(inputCols=cols_to_use, outputCol='features')
body_df_vec = vec.transform(body_df_indexed)
body_df_vec.select('features', 'class_idx').show(truncate=False)

+-----------------------------------------------------------+---------+
|features                                                   |class_idx|
+-----------------------------------------------------------+---------+
|[27.0,172.3,75.24,21.3,80.0,130.0,54.9,18.4,60.0,217.0,0.0]|0.0      |
|[25.0,165.0,55.8,15.7,77.0,126.0,36.4,16.3,53.0,229.0,0.0] |2.0      |
|[31.0,179.6,78.0,20.1,92.0,152.0,44.8,12.0,49.0,181.0,0.0] |0.0      |
|[32.0,174.5,71.1,18.4,76.0,147.0,41.4,15.2,53.0,219.0,0.0] |3.0      |
|[28.0,173.8,67.7,17.1,70.0,127.0,43.5,27.1,45.0,217.0,0.0] |3.0      |
|[36.0,165.4,55.4,22.0,64.0,119.0,23.8,21.0,27.0,153.0,1.0] |3.0      |
|[42.0,164.5,63.7,32.2,72.0,135.0,22.7,0.8,18.0,146.0,1.0]  |1.0      |
|[33.0,174.9,77.2,36.9,84.0,137.0,45.9,12.3,42.0,234.0,0.0] |3.0      |
|[54.0,166.8,67.5,27.6,85.0,165.0,40.4,18.6,34.0,148.0,0.0] |0.0      |
|[28.0,185.0,84.6,14.4,81.0,156.0,57.9,12.1,55.0,213.0,0.0] |3.0      |
|[42.0,169.2,65.4,19.3,63.0,110.0,43.5,16.0,68.0,211.0,0.0] |2.0

## 3. Treinando e testando o modelo

### 3.1 DecisionTreeClassifier

In [31]:
body_train, body_test = body_df_vec.randomSplit([0.8,0.2], seed=123)
print('Decision tree Classifier')
dt_body_model = DecisionTreeClassifier(featuresCol='features', labelCol='class_idx')
preds_dt_body = dt_body_model.fit(body_train).transform(body_test)
preds_dt_body.select('class_idx', 'prediction', 'probability').show(truncate=False)

multi_eval = MulticlassClassificationEvaluator().setLabelCol('class_idx')
prec_dt_body = round(multi_eval.evaluate(preds_dt_body, {multi_eval.metricName: 'weightedPrecision'}),2)
recall_dt_body = round(multi_eval.evaluate(preds_dt_body, {multi_eval.metricName: 'weightedRecall'}),2)

print('Matriz de confusão')
preds_dt_body.groupBy('class_idx', 'prediction').count().show()

print(f'Precision: {prec_dt_body}')
print(f'Recall: {recall_dt_body}')

Decision tree Classifier
+---------+----------+--------------------------------------------------------------------------------+
|class_idx|prediction|probability                                                                     |
+---------+----------+--------------------------------------------------------------------------------+
|1.0      |0.0       |[0.5570032573289903,0.20358306188925082,0.0,0.23941368078175895]                |
|3.0      |2.0       |[0.09014675052410902,0.011879804332634521,0.6995108315863033,0.1984626135569532]|
|1.0      |1.0       |[0.0,1.0,0.0,0.0]                                                               |
|0.0      |0.0       |[0.5570032573289903,0.20358306188925082,0.0,0.23941368078175895]                |
|0.0      |0.0       |[0.5570032573289903,0.20358306188925082,0.0,0.23941368078175895]                |
|2.0      |2.0       |[0.09014675052410902,0.011879804332634521,0.6995108315863033,0.1984626135569532]|
|0.0      |3.0       |[0.19526627218934

### 3.2 LogisticRegression

In [32]:
logrec_model = LogisticRegression(featuresCol='features', labelCol='class_idx')
preds_log_body = logrec_model.fit(body_train).transform(body_test)
preds_log_body.select('class_idx', 'prediction', 'probability').show(truncate=False)

multi_eval = MulticlassClassificationEvaluator().setLabelCol('class_idx')
prec_log_body = round(multi_eval.evaluate(preds_log_body, {multi_eval.metricName: 'weightedPrecision'}),2)
recall_log_body = round(multi_eval.evaluate(preds_log_body, {multi_eval.metricName: 'weightedRecall'}),2)

print('Matriz de confusão')
preds_log_body.groupBy('class_idx', 'prediction').count().show()

print(f'Precision: {prec_log_body}')
print(f'Recall: {recall_log_body}')


+---------+----------+------------------------------------------------------------------------------------+
|class_idx|prediction|probability                                                                         |
+---------+----------+------------------------------------------------------------------------------------+
|1.0      |1.0       |[0.13399581293409707,0.8599506994832524,2.3157619353976277E-5,0.006030329963296604] |
|3.0      |3.0       |[0.28616968371074225,0.09808450794069674,0.1633729270774801,0.45237288127108094]    |
|1.0      |1.0       |[0.003597412727590517,0.9963854627820823,2.997062997752001E-9,1.7121493264173092E-5]|
|0.0      |1.0       |[0.174419327922463,0.8007617075474422,4.2020932699061037E-4,0.02439875520310417]    |
|0.0      |1.0       |[0.35215814368750437,0.5931511798160712,0.001245165572132867,0.053445510924291444]  |
|2.0      |3.0       |[0.18793684536872593,0.01940990832991869,0.3387435642406383,0.45390968206071713]    |
|0.0      |3.0       |[0.299

### 3.3 RandomForestClassifier

In [33]:
rfc = RandomForestClassifier(featuresCol='features', labelCol='class_idx', numTrees=20)
preds_rfc_body = rfc.fit(body_train).transform(body_test)
preds_rfc_body.select('class_idx', 'prediction', 'probability').show(truncate=False)

multi_eval = MulticlassClassificationEvaluator().setLabelCol('class_idx')
prec_rfc_body = round(multi_eval.evaluate(preds_rfc_body, {multi_eval.metricName: 'weightedPrecision'}),2)
recall_rfc_body = round(multi_eval.evaluate(preds_rfc_body, {multi_eval.metricName: 'weightedRecall'}),2)

print('Matriz de confusão')
preds_rfc_body.groupBy('class_idx', 'prediction').count().show()

print(f'Precision: {prec_rfc_body}')
print(f'Recall: {recall_rfc_body}')

+---------+----------+---------------------------------------------------------------------------------+
|class_idx|prediction|probability                                                                      |
+---------+----------+---------------------------------------------------------------------------------+
|1.0      |0.0       |[0.513516963506931,0.3397368070765225,0.026820889736996312,0.11992533967955024]  |
|3.0      |2.0       |[0.14124072985898004,0.04870292777929774,0.5454544942670634,0.2646018480946589]  |
|1.0      |1.0       |[0.07141615318364399,0.9050523654107314,0.004406883707380007,0.0191245976982445] |
|0.0      |1.0       |[0.41891694999828794,0.436951750110956,0.019800796673282713,0.12433050321747341] |
|0.0      |0.0       |[0.3599738820634944,0.2169209423919407,0.0977080615403588,0.3253971140042061]    |
|2.0      |2.0       |[0.1453104974587661,0.05198510640965226,0.5108128379137058,0.2918915582178759]   |
|0.0      |3.0       |[0.33531996265452,0.1188539755559

#4. Conclusão


---

Podemos observar que o `RandomForestClassifier` foi o classificador que atingiu 64% de precisão. Com isto, podemos concluir que foi o melhor método de classificação para este conjunto de dados. 

Este dataset precisou passar por alguns ajustes tais como a indexação das colunas ("*gender*" e "*class*") pois para a vetorização e para treinamento dos modelos, colunas do tipo `string` não são aceitas.

