# **_Big Data e Processamento Distribuído_**
## *Projeto de Disciplina*
### 02 de Janeiro de 2022
___________________________
### **Luiz Flavio Pereira** <br> **Ana Luiza Gouveia**
___________________________
</br>

**Valor:** 50 pontos <br>
**Entrega:**  16/01/22 até 23h59 <br>
**Formato:** Jupyter Notebook <br><br>

**Objetivo:** realizar um ciclo de ciência de dados completo no Spark. 
 
Nesse projeto, deverá ser realizado um ciclo completo de ciência de dados utilizando o PySpark. Isso significa os dados deverão ser explorados  e  preparados; um modelo de aprendizado de máquina deverá ser treinado e deverá ser feita a análise dos resultados obtidos. <br><br>

Todo o projeto deve ser construído em um único Notebook. Nele deverão conter além do código, as análises, explicações e motivações para a escolha do dataset e do algoritmo de aprendizado de máquina. <br>IMPORTANTE: Todos os imports utilizados deverão ser colocados no 
início do Notebook.<br><br>
 
**_Parte I: Exploração de Dados_**

Deverão ser utilizadas as funcionalidades de RDD e/ou Dataframes para análise e limpeza dos dados. Como essa tarefa é dependente de cada conjunto de dados, não há um modelo rígido a seguir. Deverão ser realizadas no mínimo 2 análises (estatísticas, análise com gráficos, 
etc.) e 3 transformações (filtragem, remoção de características, remoção/troca de valores nulos, normalizações, etc). As transformações devem ser pautadas no que for descoberto ao analisar os dados. Por exemplo: normalização dos valores por discrepância de magnitude entre 
características. <br><br>
 
**_Parte II: Criação de um Modelo e Análise de Resultados_**

Nessa etapa, deverá ser rodado um algoritmo da biblioteca MLlib do Spark para aprender um modelo  de  aprendizado  de  máquina  com  os  dados  que  vocês  acabaram  de  organizar. A escolha do algoritmo deverá ser motivada, que deve ser um dos disponíveis dentro da MLlib do 
Spark. Além disso, os dados deverão ser divididos utilizando-se alguma metodologia de validação (cross-validation, 60-40, 80-20, etc), e o resultado deverá ser validado e a performance do seu modelo analisada.

______________________
______________________
## _Configuração do ambiente para execução no Google Colab_

In [None]:
# Atualizacao necessária para a execucao no Google Colab

!apt-get update

0% [Working]            Ign:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Ign:4 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:5 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [696 B]
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:8 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Hit:9 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:10 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:11 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:12 https://cloud.r-proj

In [None]:
# Etapa necessaria para a execucao no Google Colab

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz
!pip install -q findspark

___________________________
## _Importação das bibliotecas necessárias à execução do programa_

Importação das bibliotecas do Spark e  outras necessárias à execução do programa

In [None]:
import findspark
findspark.init('spark-3.2.0-bin-hadoop3.2')

import pyspark

sc = pyspark.SparkContext(appName='Projeto_BDPD')

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [None]:
import pandas as pd
import plotly.express as px
from plotly.subplots import make_subplots

from pyspark.sql.functions import round, col
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression, RandomForestRegressor, DecisionTreeRegressor, GBTRegressor
from pyspark.ml.evaluation import RegressionEvaluator

import warnings
warnings.filterwarnings("ignore")

_______________
## _Clonagem da pasta com arquivos no GitHub_

Por praticidade, o dataset foi disponibilizado no repositorio do GitHub

In [None]:
!git clone https://github.com/lflaviop/ProjetoBDPD.git

Cloning into 'ProjetoBDPD'...
remote: Enumerating objects: 7, done.[K
remote: Counting objects: 100% (7/7), done.[K
remote: Compressing objects: 100% (7/7), done.[K
remote: Total 7 (delta 2), reused 0 (delta 0), pack-reused 0[K
Unpacking objects: 100% (7/7), done.


_______________
## _Leitura do arquivo CSV, transformação e análise das features do dataframe_

O dataset escolhido foi o [Medical Insurance Cost Predictor](https://www.kaggle.com/rajgupta2019/medical-insurance-cost-prediction), que possui uma lista com algumas características de algumas pessoas e seus respectivos gastos com saúde.

O objetivo do trabalho é criar um algoritmo capaz de prever os gastos com saúde de um determinado usuário, à partir de uma série de características do mesmo. Para isso deverá ser utilizado um algoritmo de Regressão.

In [None]:
# Leitura do dataset e transformacao em dataframe

med_insu_df = spark.read.csv('/content/ProjetoBDPD/medical_insurance_train.csv', inferSchema=True, header=True, nullValue='NA')
med_insu_df.show(10)

+------------------+------+------------------+------+---------+--------+------------------+
|               age|   sex|               bmi|smoker|   region|children|           charges|
+------------------+------+------------------+------+---------+--------+------------------+
|              21.0|  male|            25.745|    no|northeast|       2|        3279.86855|
|  36.9769779971995|female| 25.74416484562688|   yes|southeast|       3|21454.494238637613|
|              18.0|  male|             30.03|    no|southeast|       1|         1720.3537|
|              37.0|  male|30.676891267593227|    no|northeast|       3| 6801.437541833969|
|              58.0|  male|             32.01|    no|southeast|       1|        11946.6259|
|              46.0|  male|             26.62|    no|southeast|       1|         7742.1098|
|25.221730645797628|  male|31.192647361633732|    no|northeast|       4| 21736.32814453227|
| 29.48644271452622|female|24.222614152328703|    no|northwest|       2| 4916.95

As features do dataset são: idade (age), sexo (sex), índice de massa corporal - IMC (bmi), se o usuário é fumante (smoker), região do usuário (region), filhos (children) e despesas com saúde (charges)

In [None]:
# Apresentacao de algumas caracterisitcas das features do dataframe

med_insu_df.summary().show()

+-------+------------------+------+-----------------+------+---------+------------------+------------------+
|summary|               age|   sex|              bmi|smoker|   region|          children|           charges|
+-------+------------------+------+-----------------+------+---------+------------------+------------------+
|  count|              3630|  3630|             3630|  3630|     3630|              3630|              3630|
|   mean| 38.88703646824275|  null|30.62965244990723|  null|     null|2.5035812672176307|12784.808643545062|
| stddev|12.151029434929693|  null| 5.44130707750024|  null|     null|1.7125678955250763|10746.166742743266|
|    min|              18.0|female|            15.96|    no|northeast|                 0|         1121.8739|
|    25%|              29.0|  null|26.69452631025602|  null|     null|                 1| 5654.818261783543|
|    50%| 39.17015215006738|  null|             30.2|  null|     null|                 3| 9441.516533188485|
|    75%| 48.348754

In [None]:
# As features age, bmi e charges vieram com muitos dígitos após a casa decimal. Essa etapa tem por
# intencao tornar o dataframe mais amigavel atraves do arredondamento dos valores destas features

med_insu_df = med_insu_df.withColumn("age_rounded", round(col('age'),0))
med_insu_df = med_insu_df.withColumn("bmi_rounded", round(col('bmi'),1))
med_insu_df = med_insu_df.withColumn("charges_rounded", round(col('charges'),2))

cols_to_drop = ['age', 'bmi', 'charges']

med_insu_df = med_insu_df.drop(*cols_to_drop)

med_insu_df = med_insu_df.withColumnRenamed('age_rounded', 'age') \
                         .withColumnRenamed('bmi_rounded', 'bmi') \
                         .withColumnRenamed('charges_rounded', 'charges')
med_insu_df = med_insu_df.select(['age', 'sex', 'smoker', 'bmi', 'children', 'region', 'charges'])
med_insu_df.show(5)

+----+------+------+----+--------+---------+--------+
| age|   sex|smoker| bmi|children|   region| charges|
+----+------+------+----+--------+---------+--------+
|21.0|  male|    no|25.7|       2|northeast| 3279.87|
|37.0|female|   yes|25.7|       3|southeast|21454.49|
|18.0|  male|    no|30.0|       1|southeast| 1720.35|
|37.0|  male|    no|30.7|       3|northeast| 6801.44|
|58.0|  male|    no|32.0|       1|southeast|11946.63|
+----+------+------+----+--------+---------+--------+
only showing top 5 rows



_______________
## _Análise gráfica das features do dataframe_

Nesta etapa as principais informações do dataframe serão analisadas através de gráficos com o intuito de se extrair informações uteis ao desenvolvimento do algoritmo. Foi escolhida a biblioteca **Plotly** pelo fato dela ser interativa, o que facilita a análise do gráfico.

In [None]:
med_insu_pd = med_insu_df.toPandas()

In [None]:
fig = px.histogram(med_insu_pd, x='charges', color='smoker', opacity=0.7, barmode='overlay', histnorm='probability density', marginal='box', 
                 color_discrete_sequence=['blue', 'orange'],  width=1000, height=600, nbins=75) 
fig.update_layout(font_color="black", xaxis_title='Despesa médica ($)', yaxis=dict(title='Densidade de Probabilidade', gridcolor='darkgray', zerolinecolor='white'),
                  legend=dict(orientation="h", yanchor="bottom", y=1, xanchor="right", x=1))
fig.update_layout(title_text='Distribuição das despesas pelo status de fumante ou não', title_x=0.5)
fig.update_xaxes(showgrid=True, zerolinecolor='white', gridcolor='darkgray')

fig.show()

Pode se perceber claramente que os não-fumantes possuem uma despesa muito menor com saúde. Conclui-se que adoecem menos que os fumantes. Com excessão de alguns outliers, quase a totalidade dos não-fumantes tem uma despesa até 20 mil dólares.

In [None]:
med_insu_pd_cp = med_insu_pd.copy()

med_insu_pd_cp["age_group"] = ['18 a 29' if i<30 else '30 a 44' if (i>=30)&(i<45) else '45 a 59' if (i>=45)&(i<60) else '60 ou mais' for i in med_insu_pd['age']]
med_insu_pd_cp = med_insu_pd_cp.groupby(['age_group','smoker'])['charges'].mean()
med_insu_pd_cp = med_insu_pd_cp.rename('charges').reset_index().sort_values('smoker', ascending=True)

fig = px.bar(med_insu_pd_cp, x='age_group', y='charges', color='smoker', text='charges', 
             opacity=0.75, barmode='group', color_discrete_sequence=['blue','orange'], width=1000, height=600)
fig.update_traces(texttemplate='$%{text:,.0f}', textposition='outside', marker_line=dict(width=1, color='#303030'))
fig.update_layout(font_color='black',bargroupgap=0.05, bargap=0.3, legend=dict(orientation="h", yanchor="bottom", y=1, xanchor="right", x=1),
                  xaxis=dict(title='Idade (anos)', showgrid=False), yaxis=dict(title='Despesa médica ($)', showgrid=False, zerolinecolor='white', showline=True, linecolor='white', linewidth=2))
fig.update_layout(title_text='Despesa média por idade e por status de fumante', title_x=0.5)
fig.show()

Independentemente da faixa de idade, os fumantes sempre possuem despesa média muito maior que os não fumantes.

In [None]:
fig = px.scatter(med_insu_pd, x='bmi', y='charges', color='smoker', size='charges',
                 color_discrete_sequence=['blue', 'orange'], width=1000, height=600)
fig.update_layout(legend=dict(orientation="h", yanchor="bottom", y=1, xanchor="right", x=1),
                  font_color='black', xaxis=dict(title='Índice de massa corporal (IMC)',showgrid=True, gridcolor='darkgray'), 
                  yaxis=dict(title='Despesa médica ($)',showgrid=True, zerolinecolor='white',
                             showline=True, linecolor='#E5E5EA', linewidth=2, gridcolor='darkgray'))
fig.update_layout(title_text='Despesa média em função do IMC e do status de fumante', title_x=0.5)
fig.show()

Os não-fumantes possuem despesa inferior aos fumantes mesmo quando o seu IMC possui valor elevado. Os fumantes com IMC acima do normal possuem considerável despesa médica.

In [None]:
fig = px.scatter(med_insu_pd, x='age', y='charges', color='smoker', size='charges',
                 color_discrete_sequence=['blue', 'orange'], width=1000, height=600)
fig.update_layout(legend=dict(orientation="h", yanchor="bottom", y=1, xanchor="right", x=1),
                  font_color='black', xaxis=dict(title='Idade (anos)',showgrid=True, gridcolor='darkgray'), 
                  yaxis=dict(title='Despesa médica ($)',showgrid=True, zerolinecolor='white',
                             showline=True, linecolor='#E5E5EA', linewidth=2, gridcolor='darkgray'))
fig.update_layout(title_text='Relação entre as despesas com saúde e os status de fumante da pessoa', title_x=0.5)
fig.show()

Confirma-se aqui que a despesa dos fumantes é sempre superior a dos não-fumantes, nesse caso mesmo entre os mais jovens. Também, como era de se esperar, pode se observar que o custo com saúde cresce com o aumento da idade.

_______________
## _Análise e tratamento das variáveis categóricas_

Nesta etapa as variáveis categóricas serão analisadas e tratadas para que se adequem ao formato exigido para tratamento no algoritmo.

In [None]:
# Verificacao do quantitativo das variaveis categoricas

med_insu_df.groupBy('sex').count().show()
med_insu_df.groupBy('smoker').count().show()
med_insu_df.groupBy('region').count().show()

+------+-----+
|   sex|count|
+------+-----+
|female| 1601|
|  male| 2029|
+------+-----+

+------+-----+
|smoker|count|
+------+-----+
|    no| 3070|
|   yes|  560|
+------+-----+

+---------+-----+
|   region|count|
+---------+-----+
|northwest|  911|
|southeast| 1021|
|northeast|  848|
|southwest|  850|
+---------+-----+



Verifica-se que existe um desbalanço muito grande entre os fumantes e não-fumantes, mas por se tratar de uma feature de muita relevância, ela será mantida. Na vida real, outras estratégias podem ser adotadas, tais como a obtenção de mais dados sobre não fumantes.

In [None]:
# Aqui as features categoricas serao indexadas, gerando-se um indice numerico para cada uma.

indexer = StringIndexer(inputCols=['sex', 'smoker', 'region'], outputCols=['sex_idx', 'smoker_idx', 'region_idx'])
med_insu_idx = indexer.fit(med_insu_df).transform(med_insu_df)
med_insu_idx.select('sex', 'sex_idx').distinct().show()
med_insu_idx.select('smoker', 'smoker_idx').distinct().show()
med_insu_idx.select('region', 'region_idx').distinct().show()

+------+-------+
|   sex|sex_idx|
+------+-------+
|  male|    0.0|
|female|    1.0|
+------+-------+

+------+----------+
|smoker|smoker_idx|
+------+----------+
|    no|       0.0|
|   yes|       1.0|
+------+----------+

+---------+----------+
|   region|region_idx|
+---------+----------+
|northeast|       3.0|
|northwest|       1.0|
|southeast|       0.0|
|southwest|       2.0|
+---------+----------+



In [None]:
# As features region e children possuem mais de dois valores. Deixa-las em indices pode enviesar o algoritmo
# que pode atribuir importancia maior aos pesos maiores. Assim sera feito o OneHotEncoder para eliminar
# essa possibilidade.

onehot = OneHotEncoder(inputCols=['region_idx', 'children'], outputCols=['region_oh', 'children_oh'])

med_insu_oh = onehot.fit(med_insu_idx).transform(med_insu_idx)
med_insu_oh.select('region_idx', 'region_oh').distinct().show()
med_insu_oh.select('children', 'children_oh').distinct().show()

+----------+-------------+
|region_idx|    region_oh|
+----------+-------------+
|       3.0|    (3,[],[])|
|       2.0|(3,[2],[1.0])|
|       1.0|(3,[1],[1.0])|
|       0.0|(3,[0],[1.0])|
+----------+-------------+

+--------+-------------+
|children|  children_oh|
+--------+-------------+
|       2|(5,[2],[1.0])|
|       3|(5,[3],[1.0])|
|       1|(5,[1],[1.0])|
|       0|(5,[0],[1.0])|
|       5|    (5,[],[])|
|       4|(5,[4],[1.0])|
+--------+-------------+



In [None]:
# Limpeza do dataframe para melhor visualizacao, com a eliminacao das features desnecessarias

cols_to_drop = ['sex', 'smoker', 'children', 'region', 'region_idx']

med_insu_oh = med_insu_oh.drop(*cols_to_drop)
med_insu_oh.show(5)

+----+----+--------+-------+----------+-------------+-------------+
| age| bmi| charges|sex_idx|smoker_idx|    region_oh|  children_oh|
+----+----+--------+-------+----------+-------------+-------------+
|21.0|25.7| 3279.87|    0.0|       0.0|    (3,[],[])|(5,[2],[1.0])|
|37.0|25.7|21454.49|    1.0|       1.0|(3,[0],[1.0])|(5,[3],[1.0])|
|18.0|30.0| 1720.35|    0.0|       0.0|(3,[0],[1.0])|(5,[1],[1.0])|
|37.0|30.7| 6801.44|    0.0|       0.0|    (3,[],[])|(5,[3],[1.0])|
|58.0|32.0|11946.63|    0.0|       0.0|(3,[0],[1.0])|(5,[1],[1.0])|
+----+----+--------+-------+----------+-------------+-------------+
only showing top 5 rows



In [None]:
# Renomeacao das features para melhor visualizacao

med_insu_df_ready = med_insu_oh.withColumnRenamed('sex_idx', 'sex') \
                               .withColumnRenamed('smoker_idx', 'smoker') \
                               .withColumnRenamed('region_oh', 'region') \
                               .withColumnRenamed('children_oh', 'children')                           

med_insu_df_ready = med_insu_df_ready.select(['age', 'sex', 'smoker', 'bmi', 'children', 'region', 'charges'])
med_insu_df_ready.show(5)

+----+---+------+----+-------------+-------------+--------+
| age|sex|smoker| bmi|     children|       region| charges|
+----+---+------+----+-------------+-------------+--------+
|21.0|0.0|   0.0|25.7|(5,[2],[1.0])|    (3,[],[])| 3279.87|
|37.0|1.0|   1.0|25.7|(5,[3],[1.0])|(3,[0],[1.0])|21454.49|
|18.0|0.0|   0.0|30.0|(5,[1],[1.0])|(3,[0],[1.0])| 1720.35|
|37.0|0.0|   0.0|30.7|(5,[3],[1.0])|    (3,[],[])| 6801.44|
|58.0|0.0|   0.0|32.0|(5,[1],[1.0])|(3,[0],[1.0])|11946.63|
+----+---+------+----+-------------+-------------+--------+
only showing top 5 rows



In [None]:
# Criacao de nova coluna copiada da feature age, para apresentacao futura nos graficos de predicao.
# Isso se faz necessario, uma vez que a feature age devera ser normalizada tendo seus valores absolutos reduzidos

med_insu_df_ready = med_insu_df_ready.withColumn("age_unscaled", med_insu_df_ready['age'])

med_insu_df_ready = med_insu_df_ready.select(['age_unscaled', 'age', 'sex', 'smoker', 'bmi', 'children', 'region', 'charges'])
med_insu_df_ready.show(5)

+------------+----+---+------+----+-------------+-------------+--------+
|age_unscaled| age|sex|smoker| bmi|     children|       region| charges|
+------------+----+---+------+----+-------------+-------------+--------+
|        21.0|21.0|0.0|   0.0|25.7|(5,[2],[1.0])|    (3,[],[])| 3279.87|
|        37.0|37.0|1.0|   1.0|25.7|(5,[3],[1.0])|(3,[0],[1.0])|21454.49|
|        18.0|18.0|0.0|   0.0|30.0|(5,[1],[1.0])|(3,[0],[1.0])| 1720.35|
|        37.0|37.0|0.0|   0.0|30.7|(5,[3],[1.0])|    (3,[],[])| 6801.44|
|        58.0|58.0|0.0|   0.0|32.0|(5,[1],[1.0])|(3,[0],[1.0])|11946.63|
+------------+----+---+------+----+-------------+-------------+--------+
only showing top 5 rows



_______________
## _Etapa de padronização das features fora de escala_

As features age e bmi serão normalizadas. A feature charge será a feature alvo e não será alterada.
O Spark não é tão prático para essa tarefa quanto o Scikit Learn, por isso será realizada através dos passos seguintes.

In [None]:
# Transformação das features age e bmi em vetores para entrada no transformador Standard Scaler

cols_to_use = ['age', 'bmi']
cols_to_not_use = ['age_unscaled', 'sex', 'smoker', 'children', 'region', 'charges']

vec = VectorAssembler(inputCols=cols_to_use, outputCol='features_to_scale')
med_insu_vec = vec.transform(med_insu_df_ready)
med_insu_vec.show(5)

+------------+----+---+------+----+-------------+-------------+--------+-----------------+
|age_unscaled| age|sex|smoker| bmi|     children|       region| charges|features_to_scale|
+------------+----+---+------+----+-------------+-------------+--------+-----------------+
|        21.0|21.0|0.0|   0.0|25.7|(5,[2],[1.0])|    (3,[],[])| 3279.87|      [21.0,25.7]|
|        37.0|37.0|1.0|   1.0|25.7|(5,[3],[1.0])|(3,[0],[1.0])|21454.49|      [37.0,25.7]|
|        18.0|18.0|0.0|   0.0|30.0|(5,[1],[1.0])|(3,[0],[1.0])| 1720.35|      [18.0,30.0]|
|        37.0|37.0|0.0|   0.0|30.7|(5,[3],[1.0])|    (3,[],[])| 6801.44|      [37.0,30.7]|
|        58.0|58.0|0.0|   0.0|32.0|(5,[1],[1.0])|(3,[0],[1.0])|11946.63|      [58.0,32.0]|
+------------+----+---+------+----+-------------+-------------+--------+-----------------+
only showing top 5 rows



In [None]:
# Divisao do dataframe em Teste e treino.
# A literatura sugere a divisao antes da padronizacao a fim de se fazer o fit apenas nos dados de treino
# e a transformacao nos dados de treino e teste.

med_insu_train, med_insu_test = med_insu_vec.randomSplit([0.8,0.2], seed=123)

In [None]:
med_insu_train.groupBy('sex').count().show()
med_insu_test.groupBy('sex').count().show()

med_insu_train.groupBy('smoker').count().show()
med_insu_test.groupBy('smoker').count().show()

+---+-----+
|sex|count|
+---+-----+
|0.0| 1613|
|1.0| 1293|
+---+-----+

+---+-----+
|sex|count|
+---+-----+
|0.0|  416|
|1.0|  308|
+---+-----+

+------+-----+
|smoker|count|
+------+-----+
|   0.0| 2457|
|   1.0|  449|
+------+-----+

+------+-----+
|smoker|count|
+------+-----+
|   0.0|  613|
|   1.0|  111|
+------+-----+



In [None]:
# Etapa de padronizacao

scaler = StandardScaler(inputCol="features_to_scale", outputCol="scaled_features")
model = scaler.fit(med_insu_train.select("features_to_scale"))

med_insu_train_scaled = model.transform(med_insu_train)
med_insu_test_scaled = model.transform(med_insu_test)

med_insu_train_scaled.show(5)
med_insu_test_scaled.show(5)

+------------+----+---+------+----+-------------+-------------+-------+-----------------+--------------------+
|age_unscaled| age|sex|smoker| bmi|     children|       region|charges|features_to_scale|     scaled_features|
+------------+----+---+------+----+-------------+-------------+-------+-----------------+--------------------+
|        18.0|18.0|0.0|   0.0|16.0|(5,[0],[1.0])|    (3,[],[])| 1694.8|      [18.0,16.0]|[1.47204828185173...|
|        18.0|18.0|0.0|   0.0|21.5|(5,[0],[1.0])|    (3,[],[])|1702.46|      [18.0,21.5]|[1.47204828185173...|
|        18.0|18.0|0.0|   0.0|23.0|(5,[0],[1.0])|    (3,[],[])|1704.57|      [18.0,23.0]|[1.47204828185173...|
|        18.0|18.0|0.0|   0.0|23.1|(5,[0],[1.0])|    (3,[],[])| 1704.7|      [18.0,23.1]|[1.47204828185173...|
|        18.0|18.0|0.0|   0.0|23.2|(5,[0],[1.0])|(3,[0],[1.0])|1121.87|      [18.0,23.2]|[1.47204828185173...|
+------------+----+---+------+----+-------------+-------------+-------+-----------------+--------------------+
o

In [None]:
# Funcao para conversao dos dados em formato amigavel de dataframe

def extract(row):
    return (row.age_unscaled, row.sex, row.smoker, row.children, row.region, row.charges,) + tuple(row.scaled_features.toArray().tolist())

med_insu_df_train_sc = med_insu_train_scaled.select(*cols_to_not_use, "scaled_features").rdd.map(extract).toDF(cols_to_not_use + cols_to_use)
med_insu_df_test_sc = med_insu_test_scaled.select(*cols_to_not_use, "scaled_features").rdd.map(extract).toDF(cols_to_not_use + cols_to_use)
  
med_insu_df_train_sc.show(5)
med_insu_df_test_sc.show(5)

+------------+---+------+-------------+-------------+-------+-----------------+------------------+
|age_unscaled|sex|smoker|     children|       region|charges|              age|               bmi|
+------------+---+------+-------------+-------------+-------+-----------------+------------------+
|        18.0|0.0|   0.0|(5,[0],[1.0])|    (3,[],[])| 1694.8|1.472048281851737|2.9597526075634732|
|        18.0|0.0|   0.0|(5,[0],[1.0])|    (3,[],[])|1702.46|1.472048281851737| 3.977167566413417|
|        18.0|0.0|   0.0|(5,[0],[1.0])|    (3,[],[])|1704.57|1.472048281851737| 4.254644373372493|
|        18.0|0.0|   0.0|(5,[0],[1.0])|    (3,[],[])| 1704.7|1.472048281851737| 4.273142827169765|
|        18.0|0.0|   0.0|(5,[0],[1.0])|(3,[0],[1.0])|1121.87|1.472048281851737| 4.291641280967036|
+------------+---+------+-------------+-------------+-------+-----------------+------------------+
only showing top 5 rows

+------------+---+------+-------------+-------------+--------+-----------------+----

In [None]:
# Organizacao do dataframe

med_insu_df_train_sc = med_insu_df_train_sc.select(['age_unscaled', 'age', 'sex', 'smoker', 'bmi', 'children', 'region', 'charges'])
med_insu_df_test_sc = med_insu_df_test_sc.select(['age_unscaled', 'age', 'sex', 'smoker', 'bmi', 'children', 'region', 'charges'])
med_insu_df_train_sc.show(5)
med_insu_df_test_sc.show(5)

+------------+-----------------+---+------+------------------+-------------+-------------+-------+
|age_unscaled|              age|sex|smoker|               bmi|     children|       region|charges|
+------------+-----------------+---+------+------------------+-------------+-------------+-------+
|        18.0|1.472048281851737|0.0|   0.0|2.9597526075634732|(5,[0],[1.0])|    (3,[],[])| 1694.8|
|        18.0|1.472048281851737|0.0|   0.0| 3.977167566413417|(5,[0],[1.0])|    (3,[],[])|1702.46|
|        18.0|1.472048281851737|0.0|   0.0| 4.254644373372493|(5,[0],[1.0])|    (3,[],[])|1704.57|
|        18.0|1.472048281851737|0.0|   0.0| 4.273142827169765|(5,[0],[1.0])|    (3,[],[])| 1704.7|
|        18.0|1.472048281851737|0.0|   0.0| 4.291641280967036|(5,[0],[1.0])|(3,[0],[1.0])|1121.87|
+------------+-----------------+---+------+------------------+-------------+-------------+-------+
only showing top 5 rows

+------------+-----------------+---+------+------------------+-------------+--------

_______________
## _Vetorização e preparação dos dados para o algoritmo_

Aqui o dataframe será preparado para o formato adequado para a entrada no algoritmo

In [None]:
cols_to_use = ['age', 'sex', 'smoker', 'bmi', 'children', 'region']

vec = VectorAssembler(inputCols=cols_to_use, outputCol='features')

med_insu_train_scaled_vec = vec.transform(med_insu_df_train_sc)
med_insu_test_scaled_vec = vec.transform(med_insu_df_test_sc)

med_insu_train_scaled_vec.select('features', 'charges').show(truncate=False)
med_insu_test_scaled_vec.select('features', 'charges').show(truncate=False)

+-------------------------------------------------------------+--------+
|features                                                     |charges |
+-------------------------------------------------------------+--------+
|(12,[0,3,4],[1.472048281851737,2.9597526075634732,1.0])      |1694.8  |
|(12,[0,3,4],[1.472048281851737,3.977167566413417,1.0])       |1702.46 |
|(12,[0,3,4],[1.472048281851737,4.254644373372493,1.0])       |1704.57 |
|(12,[0,3,4],[1.472048281851737,4.273142827169765,1.0])       |1704.7  |
|(12,[0,3,4,9],[1.472048281851737,4.291641280967036,1.0,1.0]) |1121.87 |
|(12,[0,3,5,9],[1.472048281851737,4.347136642358851,1.0,1.0]) |1711.34 |
|(12,[0,3,4],[1.472048281851737,4.402632003750667,1.0])       |1705.62 |
|(12,[0,3,5,9],[1.472048281851737,4.5136227265342965,1.0,1.0])|1712.47 |
|(12,[0,3,5,10],[1.472048281851737,4.55061963412884,1.0,1.0]) |2210.19 |
|(12,[0,3,5,10],[1.472048281851737,4.698607264507014,1.0,1.0])|2241.77 |
|(12,[0,3,5,10],[1.472048281851737,4.84659489488518

_______________
## _Treinamento, teste e predição nos algoritmos_

O problema se trata de um problema de regressão. Dentre os algoritmos de regressão disponibilizados pelo Spark, foram escolhidos os seguintes:
* Linear Regression
* Random Forest
* Decision Trees
* Gradient Boost Trees

In [None]:
# Lista com as metricas disponiveis para analise do algoritmo
metrics = ['mse', 'rmse', 'mae', 'r2']

In [None]:
# Regressao Linear

print('Resultado e comparacao da predicao e do valor real')
preds_lr = LinearRegression(labelCol = 'charges', featuresCol='features').fit(med_insu_train_scaled_vec).transform(med_insu_test_scaled_vec)
preds_lr.select('charges', 'prediction').show(10)

print('Metricas de avaliacao: ')
print('----------------------')
for metric in metrics:
    reg_eval = RegressionEvaluator(metricName=metric).setLabelCol('charges')
    metric_lr = reg_eval.evaluate(preds_lr)

    if metric == 'rmse':
        rmse_lr = metric_lr

    print(f'{metric.upper()}: {metric_lr:.2f}')

Resultado e comparacao da predicao e do valor real
+--------+------------------+
| charges|        prediction|
+--------+------------------+
|11884.05|2418.9154766685824|
| 1711.03| 1652.352570465162|
|  1708.0| 3182.181450425587|
| 1708.93|3344.5262326395223|
| 1720.35|3465.2026385207682|
| 4559.32|7375.8162922442025|
| 1137.01| 4178.151559504258|
| 1137.47| 4259.323950611226|
|12890.06| 6861.996513941453|
|  1146.8| 6072.174018666836|
+--------+------------------+
only showing top 10 rows

Metricas de avaliacao: 
----------------------
MSE: 34795372.88
RMSE: 5898.76
MAE: 4074.69
R2: 0.68


In [None]:
# Random Forest

print('Resultado e comparacao da predicao e do valor real')
preds_rf = RandomForestRegressor(labelCol='charges', featuresCol='features').fit(med_insu_train_scaled_vec).transform(med_insu_test_scaled_vec)
preds_rf.select('charges', 'prediction').show(10)

print('Metricas de avaliacao: ')
print('----------------------')
for metric in metrics:
    reg_eval = RegressionEvaluator(metricName=metric).setLabelCol('charges')
    metric_rf = reg_eval.evaluate(preds_rf)

    if metric == 'rmse':
        rmse_rf = metric_rf

    print(f'{metric.upper()}: {metric_rf:.2f}')

Resultado e comparacao da predicao e do valor real
+--------+-----------------+
| charges|       prediction|
+--------+-----------------+
|11884.05|6972.909310084637|
| 1711.03|5987.234570209261|
|  1708.0|4405.519970317574|
| 1708.93|4405.519970317574|
| 1720.35|5911.289722662284|
| 4559.32|9699.207635534354|
| 1137.01|5335.792256773658|
| 1137.47|5335.792256773658|
|12890.06|5640.945205917621|
|  1146.8|5656.503766551532|
+--------+-----------------+
only showing top 10 rows

Metricas de avaliacao: 
----------------------
MSE: 23129148.52
RMSE: 4809.28
MAE: 3092.01
R2: 0.79


In [None]:
# Decision Trees

print('Resultado e comparacao da predicao e do valor real')
preds_dt = DecisionTreeRegressor(labelCol='charges', featuresCol='features',).fit(med_insu_train_scaled_vec).transform(med_insu_test_scaled_vec)
preds_dt.select('charges', 'prediction').show(10)

print('Metricas de avaliacao: ')
print('----------------------')
for metric in metrics:
    reg_eval = RegressionEvaluator(metricName=metric).setLabelCol('charges')
    metric_dt = reg_eval.evaluate(preds_dt)

    if metric == 'rmse':
        rmse_dt = metric_dt

    print(f'{metric.upper()}: {metric_dt:.2f}')

Resultado e comparacao da predicao e do valor real
+--------+-----------------+
| charges|       prediction|
+--------+-----------------+
|11884.05| 4683.27196153846|
| 1711.03| 4683.27196153846|
|  1708.0|2859.777885714286|
| 1708.93|2859.777885714286|
| 1720.35| 4683.27196153846|
| 4559.32|5316.776857142856|
| 1137.01|2859.777885714286|
| 1137.47|2859.777885714286|
|12890.06|2859.777885714286|
|  1146.8|2859.777885714286|
+--------+-----------------+
only showing top 10 rows

Metricas de avaliacao: 
----------------------
MSE: 23030799.35
RMSE: 4799.04
MAE: 2612.07
R2: 0.79


In [None]:
# Gradient Boost Trees

print('Resultado e comparacao da predicao e do valor real')
preds_gbt = GBTRegressor(labelCol='charges', featuresCol='features',).fit(med_insu_train_scaled_vec).transform(med_insu_test_scaled_vec)
preds_gbt.select('charges', 'prediction').show(10)

print('Metricas de avaliacao: ')
print('----------------------')
for metric in metrics:
    reg_eval = RegressionEvaluator(metricName=metric).setLabelCol('charges')
    metric_gbt = reg_eval.evaluate(preds_gbt)

    if metric == 'rmse':
        rmse_gbt = metric_gbt

    print(f'{metric.upper()}: {metric_gbt:.2f}')

Resultado e comparacao da predicao e do valor real
+--------+------------------+
| charges|        prediction|
+--------+------------------+
|11884.05|4371.6571120647895|
| 1711.03|1930.8701867618233|
|  1708.0| 1612.930444479009|
| 1708.93|1614.8643372671922|
| 1720.35| 3031.007222059815|
| 4559.32| 4795.466785929078|
| 1137.01|1757.4919805349996|
| 1137.47|1757.4919805349996|
|12890.06| 1729.476927631574|
|  1146.8|1683.7950609634647|
+--------+------------------+
only showing top 10 rows

Metricas de avaliacao: 
----------------------
MSE: 18379918.86
RMSE: 4287.18
MAE: 2252.73
R2: 0.83


### Comparação gráfica dos resultados gerados pelos algoritmos

In [None]:
# Criacao de dataframes contendo os valores preditos por cada modelo para apresentacao grafica dos resultados

preds_lr_pd = preds_lr.toPandas()
preds_rf_pd = preds_rf.toPandas()
preds_dt_pd = preds_dt.toPandas()
preds_gbt_pd = preds_gbt.toPandas()

pred_lr = preds_lr_pd[['age_unscaled', 'charges', 'prediction']]
pred_lr['predictor'] = 'Linear_Regression'

pred_rf = preds_rf_pd[['age_unscaled', 'charges', 'prediction']]
pred_rf['predictor'] = 'Random Forest'

pred_dt = preds_dt_pd[['age_unscaled', 'charges', 'prediction']]
pred_dt['predictor'] = 'Decision Trees'

pred_gbt = preds_gbt_pd[['age_unscaled', 'charges', 'prediction']]
pred_gbt['predictor'] = 'Gradient Boost Trees'

pred_lr.head(5)

Unnamed: 0,age_unscaled,charges,prediction,predictor
0,18.0,11884.05,2418.915477,Linear_Regression
1,18.0,1711.03,1652.35257,Linear_Regression
2,18.0,1708.0,3182.18145,Linear_Regression
3,18.0,1708.93,3344.526233,Linear_Regression
4,18.0,1720.35,3465.202639,Linear_Regression


In [None]:
# Unificacao dos dataframes em um unico

preds = pd.concat([pred_lr, pred_rf, pred_dt, pred_gbt])

In [None]:
# Apresentacao grafica dos resultados

fig = px.scatter(preds, x='charges', y='prediction', color='age_unscaled', facet_col='predictor', 
                 facet_col_wrap=2, trendline='lowess', size='charges', opacity=0.6, color_continuous_scale='Rainbow',
                 width=1100, height=900)
fig.update_layout(title_text='Comparativo entre o valor real e o previsto', title_x=0.5, font_color='black')
fig.update_yaxes(title='Custo predito ($)', col=1)
fig.update_xaxes(title='Custo Real, $', row=1)
fig.update_xaxes(showgrid=True, gridwidth=1, gridcolor='#E5E5EA', zeroline=True, zerolinewidth=2, zerolinecolor='white')
fig.update_yaxes(showgrid=True, gridwidth=1, gridcolor='#E5E5EA', zeroline=True, zerolinewidth=2, zerolinecolor='white')

fig.add_annotation(text=f'RMSE: {rmse_lr:.2f}', x=10e3,y=55e3, row=2,col=1, showarrow=False)
fig.add_annotation(text=f'RMSE: {rmse_rf:.2f}', x=10e3,y=55e3, row=2,col=2, showarrow=False)
fig.add_annotation(text=f'RMSE: {rmse_dt:.2f}', x=10e3,y=55e3, row=1,col=1, showarrow=False)
fig.add_annotation(text=f'RMSE: {rmse_gbt:.2f}', x=10e3,y=55e3, row=1,col=2, showarrow=False)

fig.show()

_______________
## _Conclusão_

O algoritmo com melhor desempenho foi o _Gradient Boost Trees_, em todas as métricas. Porém, para efeito de análise será adotada apenas a métrica RMSE para fins de comparação entre os modelos.
<br><br>
Para manter o notebook limpo e organizado algumas informações percebidas durante a análise do programa foram omitidas. Por exemplo, alguns algoritmos se comportaram melhor para determinadas faixas etárias que outros. Entretanto, como o objetivo neste caso é a generalização para qualquer faixa etária, foi escolhido o GBT com base nas suas métricas.
<br><br>
Outro fato observado é que as pessoas com idades mais baixas, mas com alto valor de despesa real, tiveram um valor predito de despesa longe do valor real, sempre muito abaixo e conforme pode ser visto no gráfico. Isso pode ser explicado pelo fato de que o algoritmo generaliza para despesas dentro do padrão normal do dataset (talvez custos com planos de saúde e tratamentos básicos). Um alto valor de despesa para uma pessoa mais jovem nos leva a imaginar um tratamento extraordinário de saúde, fora do padrão comum (acidente, doenças graves, etc.) e por isso, previsto com erro. Uma alternativa possível seria a detecção e exclusão desses valores do dataset, porém isso foi não realizado aqui pelo fato do dataset já ser pequeno e também por não se conhecer os limiares de corte, o que na prática seria fornecido na Análise de Negócios da empresa ou pelo BI.