<a href="https://colab.research.google.com/github/kennenvi/Challange-Data-Science-Alura/blob/main/Analisando_base_de_dados_com_pyspark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Utilizando o Spark no Google Colab

Para facilitar o desenvolvimento de nosso projeto neste curso vamos utilizar o Google Colab como ferramenta e para configurar o PySpark basta executar os comandos abaixo na própria célula do seu *notebook*.

In [1]:
# instalar as dependências
!apt-get update -qq
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.1.2/spark-3.1.2-bin-hadoop2.7.tgz
!tar xf spark-3.1.2-bin-hadoop2.7.tgz
!pip install -q findspark

In [2]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop2.7"

In [3]:
import findspark
findspark.init()

# Carregamento de Dados
---

## [SparkSession](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.SparkSession.html)

O ponto de entrada para programar o Spark com a API Dataset e DataFrame.

Uma SparkSession pode ser utilizada para criar DataFrames, registrar DataFrames como tabelas, executar consultas SQL em tabelas, armazenar em cache e ler arquivos parquet. Para criar uma SparkSession, use o seguinte padrão de construtor:

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Iniciando com Spark") \
    .config('spark.ui.port', '4050') \
    .getOrCreate()

In [5]:
spark

# Montando drive

In [9]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


### Avaliando a estrutura dos dados

In [10]:
import pyspark.sql.functions as f
from pyspark.sql.types import DoubleType, IntegerType, StringType

In [11]:
dados = spark.read.json('/content/drive/Othercomputers/Fino do Fino 2.0/Python Notebook/Challange_Data_Science/Semana01/dataset_bruto.json')

In [12]:
dados.printSchema()

root
 |-- anuncio: struct (nullable = true)
 |    |-- andar: long (nullable = true)
 |    |-- area_total: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- area_util: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- banheiros: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- caracteristicas: array (nullable = true)
 |    |    |-- element: string (containsNull = true)
 |    |-- endereco: struct (nullable = true)
 |    |    |-- bairro: string (nullable = true)
 |    |    |-- cep: string (nullable = true)
 |    |    |-- cidade: string (nullable = true)
 |    |    |-- estado: string (nullable = true)
 |    |    |-- latitude: double (nullable = true)
 |    |    |-- longitude: double (nullable = true)
 |    |    |-- pais: string (nullable = true)
 |    |    |-- rua: string (nullable = true)
 |    |    |-- zona: string (nullable = true)
 |    |-- id: string (nullable = true)
 |    |-

In [13]:
print(f'Quantidade de linhas = {dados.count()}, Quantidade de colunas {len(dados.columns)}')

Quantidade de linhas = 89083, Quantidade de colunas 3


### Separando os campos da coluna anúncio

In [14]:
dados = dados.select('anuncio.*')

In [15]:
dados.show()

+-----+----------+---------+---------+--------------------+--------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+
|andar|area_total|area_util|banheiros|     caracteristicas|            endereco|                  id|quartos|suites|tipo_anuncio|tipo_unidade|   tipo_uso|vaga|             valores|
+-----+----------+---------+---------+--------------------+--------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+
|    0|        []|     [16]|      [0]|                  []|{Centro, 20061003...|47d553e0-79f2-4a4...|    [0]|   [0]|       Usado|      Outros|  Comercial| [1]|[{260, 107, Venda...|
|    0|        []|     [14]|      [0]|                  []|{Centro, 20051040...|b6ffbae1-17f6-487...|    [0]|    []|       Usado|      Outros|  Comercial| [0]|[{260, 107, Venda...|
|    0|    [1026]|   [1026]|      [0]|                  []|{Maria da Graça, ...|1fb030a5-9e3e-4

## Convertendo colunas para seus devidos tipos

### Retirando colunas de arrays e mudando o tipo para Integer

In [16]:
colunas_retirar_array = ['area_total', 'area_util', 'banheiros', 'quartos', 'suites', 'vaga']

In [18]:
for colunas in colunas_retirar_array:
    dados = dados.withColumn(colunas, dados[colunas][0].cast(IntegerType()))

In [24]:
dados = dados.withColumn('caracteristicas', dados['caracteristicas'][0])

In [25]:
dados.show()

+-----+----------+---------+---------+-----------------+--------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+
|andar|area_total|area_util|banheiros|  caracteristicas|            endereco|                  id|quartos|suites|tipo_anuncio|tipo_unidade|   tipo_uso|vaga|             valores|
+-----+----------+---------+---------+-----------------+--------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+
|    0|      null|       16|        0|             null|{Centro, 20061003...|47d553e0-79f2-4a4...|      0|     0|       Usado|      Outros|  Comercial|   1|[{260, 107, Venda...|
|    0|      null|       14|        0|             null|{Centro, 20051040...|b6ffbae1-17f6-487...|      0|  null|       Usado|      Outros|  Comercial|   0|[{260, 107, Venda...|
|    0|      1026|     1026|        0|             null|{Maria da Graça, ...|1fb030a5-9e3e-4a1...|      0|  nu

## Filtrando a base de dados

In [31]:
dados\
    .groupBy('tipo_anuncio')\
    .count()\
    .show()

+------------+-----+
|tipo_anuncio|count|
+------------+-----+
|       Usado|88827|
|  Lançamento|  256|
+------------+-----+



In [32]:
dados\
    .groupBy('tipo_unidade')\
    .count()\
    .show()

+------------+-----+
|tipo_unidade|count|
+------------+-----+
|      Outros|11963|
| Apartamento|66801|
|        Casa|10319|
+------------+-----+



In [33]:
dados\
    .groupBy('tipo_uso')\
    .count()\
    .show()

+-----------+-----+
|   tipo_uso|count|
+-----------+-----+
|  Comercial| 4542|
|Residencial|84541|
+-----------+-----+



In [44]:
dados = dados.where("tipo_uso = 'Residencial' and tipo_unidade = 'Apartamento' and tipo_anuncio = 'Usado'")

### Capturando apenas o bairro e zona da coluna endereço

In [53]:
dados = dados.withColumn('bairro', dados['endereco.bairro'])
dados = dados.withColumn('zona', dados['endereco.zona'])

In [133]:
dados = dados.drop('endereco')

In [134]:
dados.show()

+-----+----------+---------+---------+------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+--------------------+----------+----------+----+-----+-----+
|andar|area_total|area_util|banheiros|   caracteristicas|                  id|quartos|suites|tipo_anuncio|tipo_unidade|   tipo_uso|vaga|             valores|              bairro|      zona|condominio|iptu| tipo|valor|
+-----+----------+---------+---------+------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+--------------------+----------+----------+----+-----+-----+
|    3|        43|       43|        1|          Academia|d2e3a3aa-09b5-45a...|      2|  null|       Usado| Apartamento|Residencial|   1|{245, null, Venda...|           Paciência|Zona Oeste|       245|null|Venda|15000|
|    2|        42|       42|        1|     Churrasqueira|085bab2c-87ad-452...|      2|  null|       Usado| Apartamento|Residenci

### Transformando os dados da coluna valores em novas colunas separadas

In [60]:
dados.printSchema()

root
 |-- andar: long (nullable = true)
 |-- area_total: integer (nullable = true)
 |-- area_util: integer (nullable = true)
 |-- banheiros: integer (nullable = true)
 |-- caracteristicas: string (nullable = true)
 |-- endereco: struct (nullable = true)
 |    |-- bairro: string (nullable = true)
 |    |-- cep: string (nullable = true)
 |    |-- cidade: string (nullable = true)
 |    |-- estado: string (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- pais: string (nullable = true)
 |    |-- rua: string (nullable = true)
 |    |-- zona: string (nullable = true)
 |-- id: string (nullable = true)
 |-- quartos: integer (nullable = true)
 |-- suites: integer (nullable = true)
 |-- tipo_anuncio: string (nullable = true)
 |-- tipo_unidade: string (nullable = true)
 |-- tipo_uso: string (nullable = true)
 |-- vaga: integer (nullable = true)
 |-- valores: array (nullable = true)
 |    |-- element: struct (containsNull = true)


In [122]:
dados = dados\
    .withColumn('valores', f.explode('valores'))

In [124]:
colunas_valores = dados.select('valores.*').columns

In [128]:
for col in colunas_valores:
    dados = dados.withColumn(col, dados[f'valores.{col}'])

In [135]:
dados = dados.drop('valores')

In [136]:
dados.show()

+-----+----------+---------+---------+------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+----------+----------+----+-----+-----+
|andar|area_total|area_util|banheiros|   caracteristicas|                  id|quartos|suites|tipo_anuncio|tipo_unidade|   tipo_uso|vaga|              bairro|      zona|condominio|iptu| tipo|valor|
+-----+----------+---------+---------+------------------+--------------------+-------+------+------------+------------+-----------+----+--------------------+----------+----------+----+-----+-----+
|    3|        43|       43|        1|          Academia|d2e3a3aa-09b5-45a...|      2|  null|       Usado| Apartamento|Residencial|   1|           Paciência|Zona Oeste|       245|null|Venda|15000|
|    2|        42|       42|        1|     Churrasqueira|085bab2c-87ad-452...|      2|  null|       Usado| Apartamento|Residencial|   1|           Paciência|Zona Oeste|         0|   0|Venda|15000|
|    1|        

### Selecionando apenas amostras cujo "tipo" seja Venda

In [140]:
dados = dados\
    .where('tipo = "Venda"')

### Salvando dados em formato parquet

In [150]:
dados.write\
        .parquet(
            '/content/drive/Othercomputers/Fino do Fino 2.0/Python Notebook/Challange_Data_Science/Semana01/dados_processados_parquet',
            mode='overwrite'
            )

### Salvando dados em formato csv

In [157]:
dados.write\
        .csv(
            '/content/drive/Othercomputers/Fino do Fino 2.0/Python Notebook/Challange_Data_Science/Semana01/dados_processados_csv',
            mode='overwrite',
            header=True
            )

In [160]:
dados.write.orc(
    '/content/drive/Othercomputers/Fino do Fino 2.0/Python Notebook/Challange_Data_Science/Semana01/dados_processados_orc',
    mode='overwrite',    
)

### Comparando desempenho de leitura

In [153]:
%%time
spark.read.parquet(
    '/content/drive/Othercomputers/Fino do Fino 2.0/Python Notebook/Challange_Data_Science/Semana01/dados_processados_parquet'
)

CPU times: user 2.67 ms, sys: 55 µs, total: 2.72 ms
Wall time: 108 ms


DataFrame[andar: bigint, area_total: int, area_util: int, banheiros: int, caracteristicas: string, id: string, quartos: int, suites: int, tipo_anuncio: string, tipo_unidade: string, tipo_uso: string, vaga: int, bairro: string, zona: string, condominio: string, iptu: string, tipo: string, valor: string]

In [159]:
%%time
spark.read.csv(
    '/content/drive/Othercomputers/Fino do Fino 2.0/Python Notebook/Challange_Data_Science/Semana01/dados_processados_csv',
    inferSchema=True,
    header=True
)

CPU times: user 3.73 ms, sys: 1.23 ms, total: 4.96 ms
Wall time: 553 ms


DataFrame[andar: int, area_total: int, area_util: int, banheiros: int, caracteristicas: string, id: string, quartos: int, suites: int, tipo_anuncio: string, tipo_unidade: string, tipo_uso: string, vaga: int, bairro: string, zona: string, condominio: int, iptu: int, tipo: string, valor: int]

In [161]:
%%time
spark.read.orc(
    '/content/drive/Othercomputers/Fino do Fino 2.0/Python Notebook/Challange_Data_Science/Semana01/dados_processados_orc'
)

CPU times: user 4.04 ms, sys: 0 ns, total: 4.04 ms
Wall time: 173 ms


DataFrame[andar: bigint, area_total: int, area_util: int, banheiros: int, caracteristicas: string, id: string, quartos: int, suites: int, tipo_anuncio: string, tipo_unidade: string, tipo_uso: string, vaga: int, bairro: string, zona: string, condominio: string, iptu: string, tipo: string, valor: string]

O desempenho de leitura de formatos parquet se mostrou mais rápido que a leitura de csv e de ORC