<a href="https://colab.research.google.com/github/kennenvi/Challange-Dados/blob/main/Sistema_de_recomendacao_com_PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Criando um sistema recomendador de imóveis para a Insight Places
----

Nessa etapa será construido um recomendador de imóves utilizando o PySpark e seu módelo de aprendizado de máquina. 
Esse recomendador terá como base um clusterizador para agrupar os dados e apartir desses clusters indicar novos imóveis

# Iniciando ambiente

## Instalando o PySpark no Google Colab

In [160]:
!pip install pyspark==3.3.1

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


## SparkSession

O ponto de entrada para programar o Spark com a API Dataset e DataFrame.

In [161]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

In [162]:
spark

## Baixando conjunto de dados

In [163]:
!wget 'https://caelum-online-public.s3.amazonaws.com/challenge-spark/semanas-3-e-4.zip' && unzip semanas-3-e-4.zip -d dados/

--2022-12-15 00:58:11--  https://caelum-online-public.s3.amazonaws.com/challenge-spark/semanas-3-e-4.zip
Resolving caelum-online-public.s3.amazonaws.com (caelum-online-public.s3.amazonaws.com)... 52.216.77.4, 54.231.128.185, 52.216.212.225, ...
Connecting to caelum-online-public.s3.amazonaws.com (caelum-online-public.s3.amazonaws.com)|52.216.77.4|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2588308 (2.5M) [application/zip]
Saving to: ‘semanas-3-e-4.zip.2’


2022-12-15 00:58:13 (1.92 MB/s) - ‘semanas-3-e-4.zip.2’ saved [2588308/2588308]

Archive:  semanas-3-e-4.zip
replace dados/dataset_ml_parquet/_SUCCESS? [y]es, [n]o, [A]ll, [N]one, [r]ename: A
  inflating: dados/dataset_ml_parquet/_SUCCESS  
  inflating: dados/dataset_ml_parquet/._SUCCESS.crc  
  inflating: dados/dataset_ml_parquet/part-00003-a14b227c-f87e-4893-b5f9-4163ed07cb37-c000.snappy.parquet  
  inflating: dados/dataset_ml_parquet/part-00001-a14b227c-f87e-4893-b5f9-4163ed07cb37-c000.snappy.parquet 

# Criando modelo de recomendação

In [164]:
#Carregando dados
dados = spark.read.parquet(
  '/content/dados/dataset_ml_parquet'  
)

In [165]:
dados.show()

+--------------------+-----+---------+---------+-------+------+----+--------------------+----------+------+---------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+
|                  id|andar|area_util|banheiros|quartos|suites|vaga|              bairro|condominio|  iptu|    valor|Zona Central|Zona Norte|Zona Oeste|Zona Sul|Academia|Animais permitidos|Churrasqueira|Condomínio fechado|Elevador|Piscina|Playground|Portaria 24h|Portão eletrônico|Salão de festas|
+--------------------+-----+---------+---------+-------+------+----+--------------------+----------+------+---------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+
|00002dd9-cc74-480...|    2|       35|        1|      1|   0.0| 0.0|        Santo Cristo|     100.0| 100.0

In [166]:
# Verificando estrutura dos dados
dados.printSchema()

root
 |-- id: string (nullable = true)
 |-- andar: integer (nullable = true)
 |-- area_util: integer (nullable = true)
 |-- banheiros: integer (nullable = true)
 |-- quartos: integer (nullable = true)
 |-- suites: double (nullable = true)
 |-- vaga: double (nullable = true)
 |-- bairro: string (nullable = true)
 |-- condominio: double (nullable = true)
 |-- iptu: double (nullable = true)
 |-- valor: double (nullable = true)
 |-- Zona Central: integer (nullable = true)
 |-- Zona Norte: integer (nullable = true)
 |-- Zona Oeste: integer (nullable = true)
 |-- Zona Sul: integer (nullable = true)
 |-- Academia: integer (nullable = true)
 |-- Animais permitidos: integer (nullable = true)
 |-- Churrasqueira: integer (nullable = true)
 |-- Condomínio fechado: integer (nullable = true)
 |-- Elevador: integer (nullable = true)
 |-- Piscina: integer (nullable = true)
 |-- Playground: integer (nullable = true)
 |-- Portaria 24h: integer (nullable = true)
 |-- Portão eletrônico: integer (nullable 

A coluna `bairro` é categória e a primeira vista parece conter muitos valores únicos, por isso será explorada para confirmar se vale a pena utiliza-la no modelo

In [167]:
# Coletando valores únicos da coluna "bairro"
dados\
    .select('bairro')\
    .distinct()\
    .count()

150

In [168]:
# Mostrando a quantidade de amostras em cada categoria de "bairro"
dados\
    .groupby('bairro')\
    .count()\
    .sort('count')\
    .show()

+--------------------+-----+
|              bairro|count|
+--------------------+-----+
|       Vasco da Gama|    1|
|          Manguinhos|    1|
|             Deodoro|    1|
|                Caju|    1|
|        Barros Filho|    1|
|              Galeão|    1|
|   Campo dos Afonsos|    2|
|Ricardo de Albuqu...|    2|
|     Engenheiro Leal|    2|
|     Parque Anchieta|    2|
|              Jacaré|    3|
|    Magalhães Bastos|    3|
|     Parque Colúmbia|    3|
|  Barra de Guaratiba|    4|
|           Mangueira|    4|
|                 Joá|    6|
|       Gardênia Azul|    7|
|            Sepetiba|    7|
|             Paquetá|    7|
|          Cavalcanti|    7|
+--------------------+-----+
only showing top 20 rows



De acordo com a análise acima a coluna `bairro` será retirada, porque esta possui uma quantidade muito grande de valores únicos com poucas amostras para serem transformados de variáveis categóricas para numéricas, além de possuir categorias com poucas amostras, o que traria poucas informações para o modelo ao custo de adicionar 150 novas dimenções (utilizando a técnica de variáveis dummy)

In [169]:
# Retirando coluna "dados"
dados = dados.drop('bairro')

### Preparando os dados

In [170]:
from pyspark.ml.feature import VectorAssembler

In [171]:
# Selecionando colunas que serão utilizadas como features
X = dados.drop('id').columns
X

['andar',
 'area_util',
 'banheiros',
 'quartos',
 'suites',
 'vaga',
 'condominio',
 'iptu',
 'valor',
 'Zona Central',
 'Zona Norte',
 'Zona Oeste',
 'Zona Sul',
 'Academia',
 'Animais permitidos',
 'Churrasqueira',
 'Condomínio fechado',
 'Elevador',
 'Piscina',
 'Playground',
 'Portaria 24h',
 'Portão eletrônico',
 'Salão de festas']

In [172]:
# Vetorizando os dados
dados_prep = VectorAssembler(inputCols=X, outputCol='features').transform(dados)

In [173]:
# Visualizando a coluna features
dados_prep.select('features').show(truncate=False)

+----------------------------------------------------------------------------------------------------------------+
|features                                                                                                        |
+----------------------------------------------------------------------------------------------------------------+
|[2.0,35.0,1.0,1.0,0.0,0.0,100.0,100.0,245000.0,1.0,0.0,0.0,0.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]         |
|(23,[0,1,2,3,5,6,7,8,10,15,17,19,20,22],[1.0,84.0,2.0,2.0,1.0,770.0,105.0,474980.0,1.0,1.0,1.0,1.0,1.0,1.0])    |
|(23,[1,2,3,6,7,8,12,14,17],[85.0,2.0,2.0,460.0,661.0,290000.0,1.0,1.0,1.0])                                     |
|(23,[1,2,3,5,6,7,8,11,18,19],[58.0,1.0,2.0,1.0,550.0,550.0,249000.0,1.0,1.0,1.0])                               |
|(23,[1,2,3,4,5,6,8,10],[64.0,2.0,2.0,1.0,1.0,850.0,530000.0,1.0])                                               |
|[0.0,200.0,6.0,4.0,4.0,2.0,2500.0,420.0,2900000.0,0.0,0.0,1.0,0.0,1.0,1.0,1.0,1

### Normalizando os dados

No decorrer do projeto será utilizado a técnica do PCA que necessita que todas as variáveis sejam normalizadas para funcionar, por isso será utilizado o StandardScaler do PySpark para normalizar as features

In [174]:
from pyspark.ml.feature import StandardScaler

In [175]:
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
model_scaler = scaler.fit(dados_prep)
dados_prep = model_scaler.transform(dados_prep)

In [176]:
dados_prep.select('features', 'scaled_features').show()

+--------------------+--------------------+
|            features|     scaled_features|
+--------------------+--------------------+
|[2.0,35.0,1.0,1.0...|[0.13607726247524...|
|(23,[0,1,2,3,5,6,...|(23,[0,1,2,3,5,6,...|
|(23,[1,2,3,6,7,8,...|(23,[1,2,3,6,7,8,...|
|(23,[1,2,3,5,6,7,...|(23,[1,2,3,5,6,7,...|
|(23,[1,2,3,4,5,6,...|(23,[1,2,3,4,5,6,...|
|[0.0,200.0,6.0,4....|[0.0,2.2447697820...|
|(23,[1,2,3,6,8,10...|(23,[1,2,3,6,8,10...|
|(23,[0,1,2,3,4,5,...|(23,[0,1,2,3,4,5,...|
|(23,[1,2,3,4,5,8,...|(23,[1,2,3,4,5,8,...|
|[0.0,41.0,1.0,1.0...|[0.0,0.4601778053...|
|[5.0,78.0,1.0,2.0...|[0.34019315618810...|
|(23,[1,2,3,4,5,6,...|(23,[1,2,3,4,5,6,...|
|(23,[1,2,3,4,5,6,...|(23,[1,2,3,4,5,6,...|
|(23,[1,2,3,4,5,6,...|(23,[1,2,3,4,5,6,...|
|[9.0,120.0,2.0,2....|[0.61234768113858...|
|[20.0,341.0,2.0,3...|[1.36077262475241...|
|[0.0,194.0,5.0,4....|[0.0,2.1774266885...|
|(23,[1,2,3,8,10,2...|(23,[1,2,3,8,10,2...|
|(23,[0,1,2,3,8,10...|(23,[0,1,2,3,8,10...|
|(23,[0,1,2,3,5,6,...|(23,[0,1,2

### Redução de dimensionalidade

In [177]:
from pyspark.ml.feature import PCA

In [178]:
# Selecionando a quantidade de features presentes nos dados, este número será utilizado para a quantidade de componentes no PCA
pca_k = len(X)
pca_k

23

In [179]:
# Instanciando e treinando o PCA
pca = PCA(k=pca_k, inputCol='scaled_features', outputCol='pca_features')
model_pca = pca.fit(dados_prep)

Verificando a explicabilidade da variancia de cada feature

In [180]:
import numpy as np

In [181]:
# Retorna um vetor denso contendo quanto da variabilidade é explicada por cada componente
model_pca.explainedVariance

DenseVector([0.2655, 0.1721, 0.0913, 0.0544, 0.0522, 0.0466, 0.0443, 0.0416, 0.0347, 0.0272, 0.0244, 0.0201, 0.0192, 0.0176, 0.0155, 0.0139, 0.012, 0.0113, 0.0101, 0.0092, 0.0089, 0.0079, 0.0])

In [182]:
# Transformando esse vetor denso em um numpy array
exp_var = np.array(model_pca.explainedVariance)

Selecionado a quantidade de componente que gerará 80% de explicação da variabilidade

In [183]:
# Selecionando novo número de componentes
pca_k = sum(exp_var.cumsum() <= .8)
pca_k

8

In [184]:
# criando outro modelo_pca com o novo "k"
model_pca = pca.setK(pca_k).fit(dados_prep)
dados_prep = model_pca.transform(dados_prep)

In [185]:
componentes_pca = len(dados_prep.select('pca_features').take(1)[0][0])
print('Quantidades de componentes provenientes do PCA:', componentes_pca)

Quantidades de componentes provenientes do PCA: 8


### Realizando agrupamento (Clusterização)

In [186]:
from pyspark.ml.clustering import KMeans

In [187]:
# Criando variável para armazenar um valor para que o resultado seja reproduzível
SEED = 42

In [188]:
# Instanciando KMeans com 10 cluster, treinando-o e gerando os clasters das amostras
kmeans = KMeans(k=10, featuresCol='pca_features', predictionCol='cluster', seed=SEED)
model_kmeans = kmeans.fit(dados_prep)
dados_cluster = model_kmeans.transform(dados_prep)

In [189]:
# Visualizando o resultado da clusterização
dados_cluster.show()

+--------------------+-----+---------+---------+-------+------+----+----------+------+---------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+--------------------+--------------------+--------------------+-------+
|                  id|andar|area_util|banheiros|quartos|suites|vaga|condominio|  iptu|    valor|Zona Central|Zona Norte|Zona Oeste|Zona Sul|Academia|Animais permitidos|Churrasqueira|Condomínio fechado|Elevador|Piscina|Playground|Portaria 24h|Portão eletrônico|Salão de festas|            features|     scaled_features|        pca_features|cluster|
+--------------------+-----+---------+---------+-------+------+----+----------+------+---------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+--------------------+-------

## Otimizando Clusterização

### Verificando performance do cluster

Utilizando o ClusteringEvaluator é possível extrair métricas de quão bem está a separação dos clusters

In [190]:
from pyspark.ml.evaluation import ClusteringEvaluator

avaliador = ClusteringEvaluator(featuresCol='pca_features', predictionCol='cluster')
avaliador.evaluate(dados_cluster)

0.4089818254215943

### Pipeline

Para a otimização será feita uma pipeline para facilitar a manipulação dos dados e o teste de diferentes hiperparâmetros

In [None]:
from pyspark.ml import Pipeline

In [None]:
# Instanciando os algortimos que serão utilizados na pipeline
va = VectorAssembler(inputCols=X, outputCol='features')
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
pca = PCA(k=pca_k, inputCol='scaled_features', outputCol='pca_features')
kmeans = KMeans(k=10, featuresCol='pca_features', predictionCol='cluster', seed=SEED)

# Instanciando a pipeline, treinando-a e transformando os dados
pipe = Pipeline(stages=[va, scaler, pca, kmeans])
model_pipe = pipe.fit(dados)
dados_prep_pipe = model_pipe.transform(dados)

In [None]:
# Visualizando os dados após as etapas da pipeline
dados_prep_pipe.show()

+--------------------+-----+---------+---------+-------+------+----+----------+------+---------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+--------------------+--------------------+--------------------+-------+
|                  id|andar|area_util|banheiros|quartos|suites|vaga|condominio|  iptu|    valor|Zona Central|Zona Norte|Zona Oeste|Zona Sul|Academia|Animais permitidos|Churrasqueira|Condomínio fechado|Elevador|Piscina|Playground|Portaria 24h|Portão eletrônico|Salão de festas|            features|     scaled_features|        pca_features|cluster|
+--------------------+-----+---------+---------+-------+------+----+----------+------+---------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+--------------------+-------

### Testando diferentes valores de k

Testando vários valores de `k` para o KMeans, através do ParamGridBuilder

In [None]:
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

In [None]:
# Criando um grid com os valores a serem testados e a qual atributo eles pertencem
grid = ParamGridBuilder()\
    .addGrid(model_pipe.stages[-1].k, range(10, 100, 10))\
    .build()

In [None]:
# Criando um TrainValidationSplit para testar os valores que estão no grid
tvs_pipe = TrainValidationSplit(
    estimator=pipe,
    estimatorParamMaps=grid,
    evaluator=avaliador,
    seed=SEED
)

In [None]:
# Treinando o modelo e gerando previsões
model_tvs_pipe = tvs_pipe.fit(dados)
previsoes_tvs_pipe = model_tvs_pipe.transform(dados)

+--------------------+-----+---------+---------+-------+------+----+----------+------+---------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+--------------------+--------------------+--------------------+-------+
|                  id|andar|area_util|banheiros|quartos|suites|vaga|condominio|  iptu|    valor|Zona Central|Zona Norte|Zona Oeste|Zona Sul|Academia|Animais permitidos|Churrasqueira|Condomínio fechado|Elevador|Piscina|Playground|Portaria 24h|Portão eletrônico|Salão de festas|            features|     scaled_features|        pca_features|cluster|
+--------------------+-----+---------+---------+-------+------+----+----------+------+---------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+--------------------+-------

In [None]:
previsoes_tvs_pipe.show()

In [191]:
# Extraindo informações das previsões
print('Quantidade de clusters utilizados:', previsoes_tvs_pipe.select(f.max('cluster')).collect()[0][0] + 1)

# Performance do modelo
score = avaliador.evaluate(previsoes_tvs_pipe)
print(f'O modelo obteve {score:.2f} na métrica Silhouette')

Quantidade de clusters utilizados: 30
O modelo obteve 0.53 na métrica Silhouette


# Criando recomendador

A partir do modelo elaborado, será feito um recomendador, cuja função consistirá em utilizar o id de um móvel para encontrar os 10 outros móveis mais parecidos, incluindo ele mesmo.

Para isso será utilizado o cluster onde está o imóvel base, além da distância euclidiana que será calculada entre o móvel base e os membros de seu cluster.

In [30]:
# Selecionando apenas as colunas necessárias para a recomendação
dados_recomendacao = dados_cluster\
    .select('id', 'pca_features', 'cluster')

In [31]:
# Extraindo o id de um móvel que será utilizado como a base da recomendação
id_movel_base = dados_recomendacao.take(1)[0][0]

In [32]:
from pyspark.sql import functions as f
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
from scipy.spatial.distance import euclidean

In [34]:
def recomendador(id_movel):
    # Extraindo o cluster e os componentes do móvel base
    cluster, componentes_movel = dados_cluster\
        .where(f.col('id') == id_movel_base)\
        .select('cluster', 'pca_features')\
        .collect()[0]
    
    # Extraindo os móveis do cluster
    moveis_recomendados = dados_cluster\
        .where(f.col('cluster') == cluster)
    
    # Função utilizada para calcular a distância euclidiana
    @udf(returnType=FloatType()) # Decorator para modificar a função de forma que o pyspark consiga utiliza-la
    def calcula_distancia(valor):
        return euclidean(componentes_movel, valor)
    
    # Calculando as distâncias e selecioando os 10 mais próximos
    moveis_recomendados_dist = moveis_recomendados\
        .withColumn('dist', calcula_distancia('pca_features'))\
        .sort('dist')\
        .limit(10)

    # Mostrando a recomendação
    moveis_recomendados_dist.show()

In [35]:
recomendador(id_movel_base)

+--------------------+-----+---------+---------+-------+------+----+----------+-----+--------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+--------------------+--------------------+--------------------+-------+------------+
|                  id|andar|area_util|banheiros|quartos|suites|vaga|condominio| iptu|   valor|Zona Central|Zona Norte|Zona Oeste|Zona Sul|Academia|Animais permitidos|Churrasqueira|Condomínio fechado|Elevador|Piscina|Playground|Portaria 24h|Portão eletrônico|Salão de festas|            features|     scaled_features|        pca_features|cluster|        dist|
+--------------------+-----+---------+---------+-------+------+----+----------+-----+--------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+--------

# Extraindo informações dos clusters

In [40]:
import plotly.express as px

In [110]:
# Visualizando a média de quartos por cluster
previsoes_tvs_pipe\
    .select('quartos', 'cluster')\
    .groupby('cluster')\
    .agg(
        f.mean('quartos').alias('quartos')
    )\
    .sort('quartos')\
    .show()

+-------+------------------+
|cluster|           quartos|
+-------+------------------+
|      3|1.3562753036437247|
|     19|1.5978428351309708|
|     16| 1.741620962827544|
|      0|2.0494957041464326|
|     29| 2.057787561915245|
|     11|2.0830156713257093|
|     24|  2.08778840742825|
|      1| 2.278717720391808|
|     23|2.2802101576182134|
|      7|2.2894656488549616|
|      5|2.3771803816950543|
|     21|2.4242788461538463|
|     27|2.5222570532915363|
|      8| 2.591715976331361|
|     22|2.7752808988764044|
|     25|3.1321783766442093|
|     12|3.1599788806758182|
|      4| 3.165309956167815|
|     20| 3.189664502164502|
|     18| 3.291213389121339|
+-------+------------------+
only showing top 20 rows



In [121]:
# Transformando o código acima em um DataFrame pandas e atribuindo-o a uma variável
quartos_cluster = previsoes_tvs_pipe\
    .select('quartos', 'cluster')\
    .groupby('cluster')\
    .agg(
        f.mean('quartos').alias('quartos')
    )\
    .sort('quartos')\
    .toPandas()

In [122]:
# Visualizando a quantidade de quartos por cluster
fig = px.bar(quartos_cluster, y='quartos', color='quartos', hover_data=['quartos', 'cluster'], text='cluster')
fig.show()

Na imagem acima é vísivel que há uma relação entre a quantidade de quartos e o cluster escolhido

In [44]:
import plotly.graph_objects as go

In [140]:
# Função para pegar dados agrupador por cluster com base na coluna
def get_dados_agrupados_cluster(coluna):
    dados_coluna = previsoes_tvs_pipe\
        .select(coluna, 'cluster')\
        .groupby('cluster')\
        .agg(
            f.mean(coluna).alias(coluna)
        )\
        .sort(coluna)
    
    return dados_coluna.toPandas()

In [145]:
# Criando figura
fig = go.Figure()

lista_colunas = ['banheiros', 'quartos']

# Criando a visualização
for coluna in lista_colunas:
    temp = get_dados_agrupados_cluster(coluna)
    fig.add_trace(go.Bar(x=temp['cluster'].apply(lambda x: str(x)), y=temp[coluna], name=coluna))

fig.show()

Na figura acima é possível notar que há uma relação entre a quantidade de quartos e o cluster, onde foram colocados

# Finalizando sessão PySpark

In [46]:
# spark.stop()