<a href="https://colab.research.google.com/github/sarahgruetz/AluraChallenge_DS_2/blob/main/sistema-recomedacao.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


### Iniciando a SparkSession

In [2]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Sistema de Recomendação") \
    .getOrCreate()

In [3]:
import zipfile

zipfile.ZipFile('/content/drive/MyDrive/curso-spark/ChallengeAlura/semanas-3-e-4.zip','r')\
        .extractall('/content/drive/MyDrive/curso-spark/ChallengeAlura/dados')

In [4]:
dataset = spark.read.parquet('/content/drive/MyDrive/curso-spark/ChallengeAlura/dados/dataset_ml_parquet')

In [5]:
print(f'O dataset possui {dataset.count()} registros e {len(dataset.columns)} colunas.')

O dataset possui 66551 registros e 25 colunas.


In [6]:
dataset.show(5)

+--------------------+-----+---------+---------+-------+------+----+------------+----------+-----+--------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+
|                  id|andar|area_util|banheiros|quartos|suites|vaga|      bairro|condominio| iptu|   valor|Zona Central|Zona Norte|Zona Oeste|Zona Sul|Academia|Animais permitidos|Churrasqueira|Condomínio fechado|Elevador|Piscina|Playground|Portaria 24h|Portão eletrônico|Salão de festas|
+--------------------+-----+---------+---------+-------+------+----+------------+----------+-----+--------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+
|00002dd9-cc74-480...|    2|       35|        1|      1|   0.0| 0.0|Santo Cristo|     100.0|100.0|245000.0|           1|         0|     

In [7]:
dataset.printSchema()

root
 |-- id: string (nullable = true)
 |-- andar: integer (nullable = true)
 |-- area_util: integer (nullable = true)
 |-- banheiros: integer (nullable = true)
 |-- quartos: integer (nullable = true)
 |-- suites: double (nullable = true)
 |-- vaga: double (nullable = true)
 |-- bairro: string (nullable = true)
 |-- condominio: double (nullable = true)
 |-- iptu: double (nullable = true)
 |-- valor: double (nullable = true)
 |-- Zona Central: integer (nullable = true)
 |-- Zona Norte: integer (nullable = true)
 |-- Zona Oeste: integer (nullable = true)
 |-- Zona Sul: integer (nullable = true)
 |-- Academia: integer (nullable = true)
 |-- Animais permitidos: integer (nullable = true)
 |-- Churrasqueira: integer (nullable = true)
 |-- Condomínio fechado: integer (nullable = true)
 |-- Elevador: integer (nullable = true)
 |-- Piscina: integer (nullable = true)
 |-- Playground: integer (nullable = true)
 |-- Portaria 24h: integer (nullable = true)
 |-- Portão eletrônico: integer (nullable 

In [8]:
from pyspark.sql import functions as f

In [9]:
dataset\
  .select([f.count(f.when(f.isnan(c) | f.isnull(c), True)).alias(c) for c in dataset.columns])\
  .show(vertical=True)

-RECORD 0-----------------
 id                 | 0   
 andar              | 0   
 area_util          | 0   
 banheiros          | 0   
 quartos            | 0   
 suites             | 0   
 vaga               | 0   
 bairro             | 0   
 condominio         | 0   
 iptu               | 0   
 valor              | 0   
 Zona Central       | 0   
 Zona Norte         | 0   
 Zona Oeste         | 0   
 Zona Sul           | 0   
 Academia           | 0   
 Animais permitidos | 0   
 Churrasqueira      | 0   
 Condomínio fechado | 0   
 Elevador           | 0   
 Piscina            | 0   
 Playground         | 0   
 Portaria 24h       | 0   
 Portão eletrônico  | 0   
 Salão de festas    | 0   



### Renomeando colunas

In [10]:
novos_nomes = [
      'id',
      'andar',
      'area_util',
      'banheiros',
      'quartos',
      'suites',
      'vaga',
      'bairro',
      'condominio',
      'iptu',
      'valor',
      'zona_central',
      'zona_norte',
      'zona_oeste',
      'zona_sul',
      'academia',
      'animais_permitidos',
      'churrasqueira',
      'condominio_fechado',
      'elevador',
      'piscina',
      'playground',
      'portaria_24h',
      'portao_eletronico',
      'salao_festas'
      ]

In [11]:
mapa_renomear = zip(dataset.columns,novos_nomes)

In [12]:
for antigo, novo in mapa_renomear:
  dataset = dataset.withColumnRenamed(antigo, novo)

dataset.columns

['id',
 'andar',
 'area_util',
 'banheiros',
 'quartos',
 'suites',
 'vaga',
 'bairro',
 'condominio',
 'iptu',
 'valor',
 'zona_central',
 'zona_norte',
 'zona_oeste',
 'zona_sul',
 'academia',
 'animais_permitidos',
 'churrasqueira',
 'condominio_fechado',
 'elevador',
 'piscina',
 'playground',
 'portaria_24h',
 'portao_eletronico',
 'salao_festas']

### Preparando os dados

In [13]:
from pyspark.ml.feature import VectorAssembler

In [14]:
dataset.columns

['id',
 'andar',
 'area_util',
 'banheiros',
 'quartos',
 'suites',
 'vaga',
 'bairro',
 'condominio',
 'iptu',
 'valor',
 'zona_central',
 'zona_norte',
 'zona_oeste',
 'zona_sul',
 'academia',
 'animais_permitidos',
 'churrasqueira',
 'condominio_fechado',
 'elevador',
 'piscina',
 'playground',
 'portaria_24h',
 'portao_eletronico',
 'salao_festas']

In [15]:
X = [
    'andar',
    'area_util',
    'banheiros',
    'quartos',
    'suites',
    'vaga',
    'condominio',
    'iptu',
    'valor',
    'zona_central',
    'zona_norte',
    'zona_oeste',
    'zona_sul',
    'academia',
    'animais_permitidos',
    'churrasqueira',
    'condominio_fechado',
    'elevador',
    'piscina',
    'playground',
    'portaria_24h',
    'portao_eletronico',
    'salao_festas'
 ]

In [16]:
assembler = VectorAssembler(inputCols=X, outputCol='features')

In [17]:
dataset_modelo = assembler.transform(dataset)

In [18]:
dataset_modelo.show(5, truncate=False)

+------------------------------------+-----+---------+---------+-------+------+----+------------+----------+-----+--------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+------------+------------------------------------------------------------------------------------------------------------+
|id                                  |andar|area_util|banheiros|quartos|suites|vaga|bairro      |condominio|iptu |valor   |zona_central|zona_norte|zona_oeste|zona_sul|academia|animais_permitidos|churrasqueira|condominio_fechado|elevador|piscina|playground|portaria_24h|portao_eletronico|salao_festas|features                                                                                                    |
+------------------------------------+-----+---------+---------+-------+------+----+------------+----------+-----+--------+------------+----------+----------+--------+--------+----

### Padronização dos dados

Para realizar a clusterização dos dados é preciso que as variáveis tenham valores mais próximos uma das outras. Uma forma de se obter isso é utilizando o StandardScaler que realiza a padronização dos dados subtraindo cada coluna por sua média e dividindo pelo seu desvio padrão. Dessa forma, a distribuição de todas as variáveis terá uma média zero e desvio padrão igual a um.

In [19]:
from pyspark.ml.feature import StandardScaler

In [20]:
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')
scaler_model = scaler.fit(dataset_modelo)
dataset_modelo_scaler = scaler_model.transform(dataset_modelo)   # fit_transform (?)

In [21]:
dataset_modelo_scaler.show(5)

+--------------------+-----+---------+---------+-------+------+----+------------+----------+-----+--------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+------------+--------------------+--------------------+
|                  id|andar|area_util|banheiros|quartos|suites|vaga|      bairro|condominio| iptu|   valor|zona_central|zona_norte|zona_oeste|zona_sul|academia|animais_permitidos|churrasqueira|condominio_fechado|elevador|piscina|playground|portaria_24h|portao_eletronico|salao_festas|            features|     scaled_features|
+--------------------+-----+---------+---------+-------+------+----+------------+----------+-----+--------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+------------+--------------------+--------------------+
|00002dd9-cc74-480.

### Redução de dimensionalidade

Para redução de dimensionalidade foi utilizado o PCA.

In [22]:
from pyspark.ml.feature import PCA

In [23]:
k = len(X)
k

23

In [24]:
pca = PCA(k=k, inputCol='scaled_features', outputCol='pca_features')
model_pca = pca.fit(dataset_modelo_scaler)

In [25]:
cumulative_sum = [sum(model_pca.explainedVariance[0:i+1]) for i in range(k)]
cumulative_sum

[0.26545749360997034,
 0.4375247949776184,
 0.5287992767783259,
 0.5831729986362391,
 0.6354028103524717,
 0.682023156221161,
 0.7262890569978575,
 0.7678911881686676,
 0.8025730095613413,
 0.8297996535128337,
 0.854194399626073,
 0.8742987112956476,
 0.8935090265260494,
 0.9111213166740558,
 0.9266283824348652,
 0.9405231608927351,
 0.9525305733295567,
 0.9638657310284791,
 0.9739753651565556,
 0.983205487506614,
 0.9921039340092773,
 0.9999999999999953,
 1.0]

In [26]:
import numpy as np

k = sum(np.array(cumulative_sum) <= 0.7)
k

6

In [27]:
pca = PCA(k=k, inputCol='scaled_features', outputCol='pca_features')
model_pca = pca.fit(dataset_modelo_scaler)
dataset_modelo_pca = model_pca.transform(dataset_modelo_scaler)

In [28]:
dataset_modelo_pca.select('pca_features').show(5, False)

+--------------------------------------------------------------------------------------------------------------------------+
|pca_features                                                                                                              |
+--------------------------------------------------------------------------------------------------------------------------+
|[-6.165125049533812,1.3380985269405696,-1.7052299823820158,-0.5338289630560162,0.08903815478581815,-0.3134396180129953]   |
|[-3.2529111812184865,-1.1179591836228857,-0.2923895841501507,3.195538820020636,0.1528630698219846,1.302893049066197]      |
|[-1.0611769329629537,-1.6685040058694294,-2.30759482783911,0.10553124125788461,-0.06914386452866851,0.7187181899556143]   |
|[-1.995811900783872,-1.7655796610847843,1.3571962498672347,-0.16336519381168974,-0.021305227846957334,0.12808044918214828]|
|[-0.4181406070145923,-2.131040791141693,-0.1026409926497008,2.809916415039177,-0.042956284698168964,-0.09449068767336455] |


### Criando um pipeline

In [29]:
from pyspark.ml import Pipeline

In [30]:
pca_pipeline = Pipeline(stages=[VectorAssembler(inputCols=X, outputCol='features'),
                                StandardScaler(inputCol='features', outputCol='scaled_features'),
                                PCA(k=6, inputCol='scaled_features', outputCol='pca_features')])

In [31]:
pca_pipeline_model = pca_pipeline.fit(dataset)

In [32]:
dados_pipeline_model = pca_pipeline_model.transform(dataset)

In [33]:
dados_pipeline_model.select('pca_features').show(5, False)

+--------------------------------------------------------------------------------------------------------------------------+
|pca_features                                                                                                              |
+--------------------------------------------------------------------------------------------------------------------------+
|[-6.165125049533812,1.3380985269405696,-1.7052299823820158,-0.5338289630560162,0.08903815478581815,-0.3134396180129953]   |
|[-3.2529111812184865,-1.1179591836228857,-0.2923895841501507,3.195538820020636,0.1528630698219846,1.302893049066197]      |
|[-1.0611769329629537,-1.6685040058694294,-2.30759482783911,0.10553124125788461,-0.06914386452866851,0.7187181899556143]   |
|[-1.995811900783872,-1.7655796610847843,1.3571962498672347,-0.16336519381168974,-0.021305227846957334,0.12808044918214828]|
|[-0.4181406070145923,-2.131040791141693,-0.1026409926497008,2.809916415039177,-0.042956284698168964,-0.09449068767336455] |


### Criando os clusters

In [34]:
from pyspark.ml.clustering import KMeans

In [35]:
kmeans = KMeans(k=50, featuresCol='pca_features', predictionCol='cluster_pca', seed=101)

In [36]:
model_kmeans = kmeans.fit(dados_pipeline_model)

In [37]:
prections_kmeans = model_kmeans.transform(dados_pipeline_model)

In [38]:
prections_kmeans.select('id', 'cluster_pca').show(5,False)

+------------------------------------+-----------+
|id                                  |cluster_pca|
+------------------------------------+-----------+
|00002dd9-cc74-4809-b5a5-850adf0e7526|45         |
|0009ca94-2b37-4381-b8b8-773ce0f92444|33         |
|000e3d28-e3e5-4110-b488-69154931140e|2          |
|000fb707-6cad-496d-8cb7-d8046cb5ef37|32         |
|001b6db0-e88d-4eba-84e5-0ef94b091a64|44         |
+------------------------------------+-----------+
only showing top 5 rows



### Filtrando imoveis do mesmo cluster

In [39]:
cluster_imoveis = prections_kmeans.select('id', 'cluster_pca')

In [40]:
dataset = dataset\
              .join(cluster_imoveis,'id', how='inner')

In [41]:
dataset.show(5)

+--------------------+-----+---------+---------+-------+------+----+------------+----------+-----+--------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+------------+-----------+
|                  id|andar|area_util|banheiros|quartos|suites|vaga|      bairro|condominio| iptu|   valor|zona_central|zona_norte|zona_oeste|zona_sul|academia|animais_permitidos|churrasqueira|condominio_fechado|elevador|piscina|playground|portaria_24h|portao_eletronico|salao_festas|cluster_pca|
+--------------------+-----+---------+---------+-------+------+----+------------+----------+-----+--------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+------------+-----------+
|00002dd9-cc74-480...|    2|       35|        1|      1|   0.0| 0.0|Santo Cristo|     100.0|100.0|245000.0|  

In [44]:
dataset.groupBy('cluster_pca').count().orderBy('cluster_pca').show(50)

+-----------+-----+
|cluster_pca|count|
+-----------+-----+
|          0| 4932|
|          1|  191|
|          2| 1134|
|          3| 4239|
|          4| 1415|
|          5| 1592|
|          6|  615|
|          7|    1|
|          8| 1234|
|          9|  994|
|         10| 1504|
|         11| 3062|
|         12|   34|
|         13| 1730|
|         14|  480|
|         15|  788|
|         16| 1090|
|         17|  653|
|         18| 2213|
|         19|  563|
|         20| 1042|
|         21| 2959|
|         22|  747|
|         23| 3196|
|         24| 1159|
|         25|  445|
|         26| 1738|
|         27|   10|
|         28| 1473|
|         29|  658|
|         30| 2254|
|         31|   79|
|         32| 1156|
|         33| 1198|
|         34| 2142|
|         35| 2292|
|         36|  912|
|         37|  479|
|         38| 1213|
|         39|   16|
|         40| 1163|
|         41| 1255|
|         42| 1801|
|         43| 2409|
|         44| 2680|
|         45|  588|
|         46|  937|


In [48]:
dataset.where('cluster_pca == 27').show(10)

+--------------------+-----+---------+---------+-------+------+----+---------------+----------+---------+----------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+------------+-----------+
|                  id|andar|area_util|banheiros|quartos|suites|vaga|         bairro|condominio|     iptu|     valor|zona_central|zona_norte|zona_oeste|zona_sul|academia|animais_permitidos|churrasqueira|condominio_fechado|elevador|piscina|playground|portaria_24h|portao_eletronico|salao_festas|cluster_pca|
+--------------------+-----+---------+---------+-------+------+----+---------------+----------+---------+----------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+------------+-----------+
|11d27498-f9bc-45e...|    0|      152|        2|      3|   1.0| 1.0|        Ipanem

In [43]:
cluster_means = dataset.groupBy('cluster_pca').mean()
cluster_means.orderBy('cluster_pca').show()

+-----------+-------------------+------------------+------------------+------------------+-------------------+-------------------+------------------+------------------+------------------+--------------------+--------------------+--------------------+-------------------+--------------------+-----------------------+--------------------+-----------------------+-------------------+--------------------+--------------------+--------------------+----------------------+--------------------+----------------+
|cluster_pca|         avg(andar)|    avg(area_util)|    avg(banheiros)|      avg(quartos)|        avg(suites)|          avg(vaga)|   avg(condominio)|         avg(iptu)|        avg(valor)|   avg(zona_central)|     avg(zona_norte)|     avg(zona_oeste)|      avg(zona_sul)|       avg(academia)|avg(animais_permitidos)|  avg(churrasqueira)|avg(condominio_fechado)|      avg(elevador)|        avg(piscina)|     avg(playground)|   avg(portaria_24h)|avg(portao_eletronico)|   avg(salao_festas)|avg(clus