## Instalando o PySpark no Google Colab

In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.1-py2.py3-none-any.whl size=311285398 sha256=aad00ce03bb05bd9861e994716fc2887691242a0494a4b7c76e4c100aed28e86
  Stored in directory: /root/.cache/pip/wheels/0d/77/a3/ff2f74cc9ab41f8f594dabf0579c2a7c6de920d584206e0834
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.1


# ETL com PySpark

Primeiramente, vamos iniciar nossa SparkSession

In [None]:
from pyspark.sql import SparkSession
sc = SparkSession.builder.master('local[*]').getOrCreate()

## Extração

Vamos ler um arquivo .csv disponibilizado pelo Airbnb utilizando o método `read.csv()`

In [None]:
!wget --quiet --show-progress https://rtvad.blob.core.windows.net/misc-data/listings.csv



In [None]:
df_spark = sc.read.csv("./listings.csv", inferSchema=True, header=True)

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: date (nullable = true)
 |-- reviews_per_month: double (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: integer (nullable = true)
 |-- number_of_reviews_ltm: integer (nullable = true)
 |-- license: string (nullable = true)



In [None]:
df_spark.printSchema()

root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- host_id: integer (nullable = true)
 |-- host_name: string (nullable = true)
 |-- neighbourhood_group: string (nullable = true)
 |-- neighbourhood: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- room_type: string (nullable = true)
 |-- price: integer (nullable = true)
 |-- minimum_nights: integer (nullable = true)
 |-- number_of_reviews: integer (nullable = true)
 |-- last_review: date (nullable = true)
 |-- reviews_per_month: double (nullable = true)
 |-- calculated_host_listings_count: integer (nullable = true)
 |-- availability_365: integer (nullable = true)
 |-- number_of_reviews_ltm: integer (nullable = true)
 |-- license: string (nullable = true)



In [None]:
df_spark.show()

+------------------+--------------------+---------+---------------+-------------------+--------------------+------------------+------------------+---------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+---------------------+-------+
|                id|                name|  host_id|      host_name|neighbourhood_group|       neighbourhood|          latitude|         longitude|      room_type|price|minimum_nights|number_of_reviews|last_review|reviews_per_month|calculated_host_listings_count|availability_365|number_of_reviews_ltm|license|
+------------------+--------------------+---------+---------------+-------------------+--------------------+------------------+------------------+---------------+-----+--------------+-----------------+-----------+-----------------+------------------------------+----------------+---------------------+-------+
|          29051942|Rental unit in Ip...|  4307081|        Nereu A|   

## Transformação
Vamos fazer um exemplo de transformação utilizando o Pyspark para criar um novo DataFrame agrupado por "neighbourhood_group" e "room_type", calculando a média do preço e a contagem de registros em cada grupo.

In [None]:
from pyspark.sql.functions import avg, count

# Aplicar a transformação
new_df = df_spark.groupBy("neighbourhood", "room_type") \
            .agg(avg("price").alias("avg_price"), count("*").alias("count"))

# Exibir o novo DataFrame
new_df.show()

+-------------------+---------------+------------------+-----+
|      neighbourhood|      room_type|         avg_price|count|
+-------------------+---------------+------------------+-----+
|    Barra da Tijuca|    Shared room|             707.0|   11|
|        Coelho Neto|Entire home/apt|           2587.75|    4|
|    Engenheiro Leal|Entire home/apt|             201.0|    1|
|             Glória|    Shared room|             140.0|    2|
|            Colégio|    Shared room|              50.0|    1|
|Senador Vasconcelos|Entire home/apt|            297.75|    8|
|       Pitangueiras|Entire home/apt|             120.0|    2|
|            Grumari|   Private room|             247.0|    2|
|        Jacarepaguá|Entire home/apt| 768.6031613976705| 1202|
|       Higienópolis|   Private room|            233.75|    4|
|              Bangu|Entire home/apt|  947.047619047619|   21|
|     Jardim Carioca|   Private room|              94.0|    3|
|              Penha|Entire home/apt|261.77777777777777

In [None]:
sorted_df = new_df.sort('avg_price')
sorted_df.show()

+--------------------+------------+-----------------+-----+
|       neighbourhood|   room_type|        avg_price|count|
+--------------------+------------+-----------------+-----+
| Vicente de Carvalho|Private room|             39.0|    1|
|São Francisco Xavier| Shared room|             40.0|    1|
|        Pitangueiras|Private room|             45.0|    1|
|           Madureira| Shared room|             48.0|    1|
|             Colégio| Shared room|             50.0|    1|
|          Praça Seca| Shared room|             52.0|    1|
|   Quintino Bocaiúva|Private room|             55.0|    2|
|         Coelho Neto|Private room|             60.0|    2|
|           Itanhangá| Shared room|             60.0|    1|
|       Bento Ribeiro| Shared room|             60.0|    1|
|             Vidigal| Shared room|61.94117647058823|   17|
|             Sampaio| Shared room|             62.0|    5|
|            Cachambi| Shared room|             62.0|    2|
|              Gamboa| Shared room|     

In [None]:
from pyspark.sql.functions import desc
sorted_df = new_df.sort(desc('avg_price'))
sorted_df.show()

+--------------------+---------------+------------------+-----+
|       neighbourhood|      room_type|         avg_price|count|
+--------------------+---------------+------------------+-----+
|       São Cristóvão|    Shared room|          273925.0|    2|
|             Estácio|Entire home/apt|24106.190476190477|   21|
|          Bonsucesso|Entire home/apt|14592.857142857143|    7|
|                 Joá|Entire home/apt| 6602.509259259259|  108|
|              Cacuia|Entire home/apt| 6058.166666666667|    6|
|        Santa Teresa|     Hotel room|            5487.2|   10|
|         São Conrado|Entire home/apt|3703.7021276595747|  141|
|            Cachambi|   Private room|            3694.0|    6|
|               Ramos|Entire home/apt|            2789.5|    2|
|          Santa Cruz|    Shared room|           2727.25|    4|
|           Pechincha|    Shared room|2597.3333333333335|    3|
|         Coelho Neto|Entire home/apt|           2587.75|    4|
|              Centro|    Shared room|25

## Carregamento

Vamos salvar o resultado em um arquivo .parquet!

*Formato Parquet*

O formato de arquivo Parquet é um formato de armazenamento colunar otimizado para processamento de big data. Ele é amplamente utilizado no ecossistema do Apache Hadoop e é especialmente recomendado ao trabalhar com o PySpark por diversas razões.

O Parquet oferece várias vantagens ao ser usado com o PySpark:

- Eficiência de armazenamento: O Parquet usa compressão colunar e codificação de dados eficientes, o que resulta em tamanhos de arquivo menores em comparação com outros formatos, como CSV ou JSON. Isso reduz os custos de armazenamento e melhora o desempenho de leitura e gravação de dados.

- Eficiência de leitura e gravação: O Parquet é projetado para ser processado em paralelo, o que permite leitura e gravação eficientes em sistemas distribuídos, como o PySpark. Ele suporta a leitura e gravação de dados em paralelo, permitindo que o processamento ocorra de forma mais rápida e escalável.

- Esquema embutido: O Parquet armazena metadados sobre o esquema dos dados diretamente no arquivo, permitindo a leitura eficiente de colunas específicas sem precisar processar o conjunto de dados completo. Isso é particularmente útil ao trabalhar com grandes volumes de dados, pois evita a necessidade de ler dados desnecessários.

- Suporte a tipos de dados complexos: O Parquet oferece suporte a uma ampla gama de tipos de dados complexos, como arrays e estruturas aninhadas. Isso permite que o PySpark trabalhe com estruturas de dados complexas de forma eficiente e nativa.

- Integração com ecossistema Hadoop: O Parquet é compatível com o ecossistema Hadoop e pode ser facilmente usado em conjunto com outras ferramentas e frameworks, como Hive, Impala e Spark. Isso proporciona uma experiência perfeita ao processar e analisar dados em um ambiente distribuído.

Em resumo, ao utilizar o PySpark, o uso do formato de arquivo Parquet traz benefícios significativos em termos de eficiência de armazenamento, desempenho de leitura/gravação e compatibilidade com o ecossistema Hadoop. Ele é especialmente adequado para lidar com grandes volumes de dados e permite aproveitar ao máximo os recursos de processamento distribuído do PySpark.

In [None]:
new_df.write.parquet("bairros.parquet")

In [None]:
parquet_salvo = sc.read.parquet('bairros.parquet')

In [None]:
parquet_salvo.show()

+-------------------+---------------+------------------+-----+
|      neighbourhood|      room_type|         avg_price|count|
+-------------------+---------------+------------------+-----+
|    Barra da Tijuca|    Shared room|             707.0|   11|
|        Coelho Neto|Entire home/apt|           2587.75|    4|
|    Engenheiro Leal|Entire home/apt|             201.0|    1|
|             Glória|    Shared room|             140.0|    2|
|            Colégio|    Shared room|              50.0|    1|
|Senador Vasconcelos|Entire home/apt|            297.75|    8|
|       Pitangueiras|Entire home/apt|             120.0|    2|
|            Grumari|   Private room|             247.0|    2|
|        Jacarepaguá|Entire home/apt| 768.6031613976705| 1202|
|       Higienópolis|   Private room|            233.75|    4|
|              Bangu|Entire home/apt|  947.047619047619|   21|
|     Jardim Carioca|   Private room|              94.0|    3|
|              Penha|Entire home/apt|261.77777777777777