#Preparação do Ambiente

In [1]:
#instalação da biblioteca pyspark
!pip install pyspark==3.3.1

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark==3.3.1
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 28 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 53.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=0932e833e546ddfa275ece4f41ee21d84b9da9c8df86455866c54e86f773fc0a
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


In [2]:
#importando a função SparkSession da biblioteca pyspark.sql
from pyspark.sql import SparkSession

In [3]:
#criando a sessão spark
spark = SparkSession.builder.master('local[*]').appName("Transformacao com Spark").getOrCreate()

spark

In [4]:
#fazendo o donwload do arquivo compactado e descompactando
!wget "https://caelum-online-public.s3.amazonaws.com/challenge-spark/semanas-3-e-4.zip" && unzip semanas-3-e-4.zip -d dados/

--2022-12-07 20:16:33--  https://caelum-online-public.s3.amazonaws.com/challenge-spark/semanas-3-e-4.zip
Resolving caelum-online-public.s3.amazonaws.com (caelum-online-public.s3.amazonaws.com)... 52.217.131.241, 52.216.27.12, 54.231.193.73, ...
Connecting to caelum-online-public.s3.amazonaws.com (caelum-online-public.s3.amazonaws.com)|52.217.131.241|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 2588308 (2.5M) [application/zip]
Saving to: ‘semanas-3-e-4.zip’


2022-12-07 20:16:33 (17.0 MB/s) - ‘semanas-3-e-4.zip’ saved [2588308/2588308]

Archive:  semanas-3-e-4.zip
   creating: dados/dataset_ml_parquet/
  inflating: dados/dataset_ml_parquet/_SUCCESS  
  inflating: dados/dataset_ml_parquet/._SUCCESS.crc  
  inflating: dados/dataset_ml_parquet/part-00003-a14b227c-f87e-4893-b5f9-4163ed07cb37-c000.snappy.parquet  
  inflating: dados/dataset_ml_parquet/part-00001-a14b227c-f87e-4893-b5f9-4163ed07cb37-c000.snappy.parquet  
  inflating: dados/dataset_ml_parquet/part-

In [5]:
#lendo o arquivo onde está gravado o dataframe no formato parquet
dados = spark.read.parquet('/content/dados/dataset_ml_parquet')

In [6]:
#fazendo uma análise superficial dos dados
dados.show()
dados.count()

+--------------------+-----+---------+---------+-------+------+----+--------------------+----------+------+---------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+
|                  id|andar|area_util|banheiros|quartos|suites|vaga|              bairro|condominio|  iptu|    valor|Zona Central|Zona Norte|Zona Oeste|Zona Sul|Academia|Animais permitidos|Churrasqueira|Condomínio fechado|Elevador|Piscina|Playground|Portaria 24h|Portão eletrônico|Salão de festas|
+--------------------+-----+---------+---------+-------+------+----+--------------------+----------+------+---------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+
|00002dd9-cc74-480...|    2|       35|        1|      1|   0.0| 0.0|        Santo Cristo|     100.0| 100.0

66551

In [7]:
from pyspark.sql import functions as f

In [8]:
bairro = dados.groupBy('id').pivot('bairro').agg(f.lit(1)).na.fill(0)

In [9]:
dados = dados.join(bairro, 'id', how='left').drop('bairro')

In [10]:
dados.show()
dados.count()

+--------------------+-----+---------+---------+-------+------+----+----------+------+---------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+--------+-----------------+--------+-------+----+---------+-----+---------------+------------------+------------+-------+-------------+----------+--------+------------+--------+------+----+-------+--------+------------+-----------------+---------+------+-------+----------+------+-----------+--------------+------+-----------+-------+----------+--------+-----------+------+--------+------------+-------+---------+---------------+------------+-----------------+-----------------+-------+--------+------------------------------+-----------------------+------+------+-------------+------+------+---------+---------+-----+------------+--------------+-------+-------+--------+-------+-----+---------+-----------+------+----------

66551

In [11]:
len(dados.columns)

174

#Fazendo a clusterização

In [12]:
#importando as funções VectorAssembler, StandardScaler, PCA e Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import PCA
from pyspark.ml import Pipeline

In [13]:
#criando uma lista com todas as colunas que vão ser utilizadas features
X = dados.columns
X.remove('valor')
X.remove('id')

In [14]:
#criando o pipeline
pca_pipeline = Pipeline(stages=[VectorAssembler(inputCols=X, outputCol='features'),
                                StandardScaler(inputCol='features', outputCol='scaled_features'),
                                PCA(k=2, inputCol='scaled_features', outputCol='pca_features')])
"""
vale destacar que o objetivo do PCA é reduzir o número de colunas das features
onde k=2 irá gerar duas colunas
"""

'\nvale destacar que o objetivo do PCA é reduzir o número de colunas das features\nonde k=2 irá gerar duas colunas\n'

In [15]:
#ajustando o pipeline
pca_pipeline_model = pca_pipeline.fit(dados)

In [16]:
#usando o modelo pipeline para gerar os dados
dados_pca = pca_pipeline_model.transform(dados)

In [17]:
#fazendo uma análise superficial dos resultados obtidos
dados_pca.select('pca_features').show(truncate=False)

+------------------------------------------+
|pca_features                              |
+------------------------------------------+
|[-0.7711108872400125,-4.7384548147398]    |
|[-1.958112747263765,0.7146041200892066]   |
|[-3.586443255682945,-4.478091404519487]   |
|[0.016336446657830128,-1.1870206674483836]|
|[-0.1561673700021381,-1.7016151478360821] |
|[-3.384631757060741,-0.3999738723901571]  |
|[-6.509497532168536,0.05468606396589609]  |
|[-6.993955255405849,-1.1831459997186735]  |
|[-1.2904732771655627,-1.8735588869008821] |
|[-1.663232381911833,-4.255225036099674]   |
|[-3.8033213511286097,-2.6914906805976746] |
|[0.23729644582907988,-2.15353039694625]   |
|[-7.726263325782962,-4.053115456557817]   |
|[-2.7214166514828886,-3.347523964788623]  |
|[-5.552331096275058,-3.498624525789144]   |
|[-0.6257632323225967,-3.435612452288389]  |
|[-7.200001670302826,-1.8260112514052471]  |
|[0.05008050062729885,-3.1989013661101766] |
|[-1.5647305792262554,0.12976198386258972] |
|[-6.54077

In [18]:
#importando a função KMeans
from pyspark.ml.clustering import KMeans

In [19]:
#determinando a constante de aleatoriedade
SEED = 1224

In [20]:
#criando o kMeans
kmeans = KMeans(featuresCol='pca_features', predictionCol='cluster_pca').setK(8).setSeed(SEED)

"""
vale destacar que a função setk() é utilizada para determinar o número de 
clusters onde.
"""

'\nvale destacar que a função setk() é utilizada para determinar o número de \nclusters onde.\n'

In [21]:
#ajustando o kmeans
model_kmeans = kmeans.fit(dados_pca)

In [22]:
#usando o modelo kmeans para gerar os dados
prections_kmeans = model_kmeans.transform(dados_pca)

In [23]:
#fazendo uma análise superficial dos resultados obtidos
prections_kmeans.select('pca_features', 'cluster_pca').show(truncate=False)

+------------------------------------------+-----------+
|pca_features                              |cluster_pca|
+------------------------------------------+-----------+
|[-0.7711108872400125,-4.7384548147398]    |2          |
|[-1.958112747263765,0.7146041200892066]   |7          |
|[-3.586443255682945,-4.478091404519487]   |0          |
|[0.016336446657830128,-1.1870206674483836]|1          |
|[-0.1561673700021381,-1.7016151478360821] |1          |
|[-3.384631757060741,-0.3999738723901571]  |7          |
|[-6.509497532168536,0.05468606396589609]  |3          |
|[-6.993955255405849,-1.1831459997186735]  |6          |
|[-1.2904732771655627,-1.8735588869008821] |1          |
|[-1.663232381911833,-4.255225036099674]   |2          |
|[-3.8033213511286097,-2.6914906805976746] |0          |
|[0.23729644582907988,-2.15353039694625]   |1          |
|[-7.726263325782962,-4.053115456557817]   |5          |
|[-2.7214166514828886,-3.347523964788623]  |0          |
|[-5.552331096275058,-3.4986245

#Plotando o clustering

Com a plotagem é possível avaliar se os grupos estão distribuitos em blocos bem definidos e facilitar a validadação do método.

In [24]:
from pyspark.ml.functions import vector_to_array
import plotly.express as px

In [25]:
pca_features_xy = prections_kmeans.withColumn('x', vector_to_array('pca_features')[0])\
                .withColumn('y', vector_to_array('pca_features')[1])\
                .select(['x', 'y', 'cluster_pca', 'valor'])

In [26]:
pca_features_xy.show()

+--------------------+-------------------+-----------+---------+
|                   x|                  y|cluster_pca|    valor|
+--------------------+-------------------+-----------+---------+
| -0.7711108872400125|   -4.7384548147398|          2|4600000.0|
|  -1.958112747263765| 0.7146041200892066|          7| 360000.0|
|  -3.586443255682945| -4.478091404519487|          0|1200000.0|
|0.016336446657830128|-1.1870206674483836|          1| 750000.0|
| -0.1561673700021381|-1.7016151478360821|          1|1025000.0|
|  -3.384631757060741|-0.3999738723901571|          7|1100000.0|
|  -6.509497532168536|0.05468606396589609|          3| 593036.0|
|  -6.993955255405849|-1.1831459997186735|          6|1031576.0|
| -1.2904732771655627|-1.8735588869008821|          1| 800000.0|
|  -1.663232381911833| -4.255225036099674|          2|1150000.0|
| -3.8033213511286097|-2.6914906805976746|          0| 497000.0|
| 0.23729644582907988|  -2.15353039694625|          1|1580000.0|
|  -7.726263325782962| -4

In [27]:
fig = px.scatter(pca_features_xy.toPandas(), x='x', y='y', color='cluster_pca', hover_data=['x', 'y', 'valor'])
fig.show()

In [28]:
pca_pipeline_model.stages[2].explainedVariance

DenseVector([0.0378, 0.0222])

#Filtrando imóveis do mesmo cluster

In [29]:
id = prections_kmeans.select('id').collect()[0][0]
id

'00002dd9-cc74-4809-b5a5-850adf0e7526'

In [30]:
prections_kmeans.filter(prections_kmeans.id == id).show(truncate=False)

+------------------------------------+-----+---------+---------+-------+------+----+----------+-----+--------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+--------+-----------------+--------+-------+----+---------+-----+---------------+------------------+------------+-------+-------------+----------+--------+------------+--------+------+----+-------+--------+------------+-----------------+---------+------+-------+----------+------+-----------+--------------+------+-----------+-------+----------+--------+-----------+------+--------+------------+-------+---------+---------------+------------+-----------------+-----------------+-------+--------+------------------------------+-----------------------+------+------+-------------+------+------+---------+---------+-----+------------+--------------+-------+-------+--------+-------+-----+---------+-----------+---

In [31]:
cluster = prections_kmeans.filter(prections_kmeans.id == id).select('cluster_pca').collect()[0][0]
cluster

3

In [32]:
imoveis_recomendados = prections_kmeans.filter(prections_kmeans.cluster_pca == cluster)
imoveis_recomendados.show()


+--------------------+-----+---------+---------+-------+------+----+----------+------+---------+------------+----------+----------+--------+--------+------------------+-------------+------------------+--------+-------+----------+------------+-----------------+---------------+--------+-----------------+--------+-------+----+---------+-----+---------------+------------------+------------+-------+-------------+----------+--------+------------+--------+------+----+-------+--------+------------+-----------------+---------+------+-------+----------+------+-----------+--------------+------+-----------+-------+----------+--------+-----------+------+--------+------------+-------+---------+---------------+------------+-----------------+-----------------+-------+--------+------------------------------+-----------------------+------+------+-------------+------+------+---------+---------+-----+------------+--------------+-------+-------+--------+-------+-----+---------+-----------+------+----------