<a href="https://colab.research.google.com/github/tiagoassun/keyrus_teste_tecnico/blob/main/keyrus_teste_tecnico.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Teste Técnico de Engenharia de Dados

## 1 - Baixando, instalando e configurando o Apache Spark



In [1]:
# Instalar as dependências
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Baixando Apche Spark que será salvo em no diretorio "/content/"
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz

In [2]:
# Baixando bibliotecas
!pip install -q findspark
!pip install pyspark[sql]
!pip install seaborn
!pip install geopandas

Collecting pyspark[sql]
  Downloading pyspark-3.2.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 39 kB/s 
[?25hCollecting py4j==0.10.9.2
  Downloading py4j-0.10.9.2-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 54.2 MB/s 
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.0-py2.py3-none-any.whl size=281805912 sha256=1bb5e3a796b3fe5267eebd0e871e4060d0e804c2e5df203257993b503b6a2c64
  Stored in directory: /root/.cache/pip/wheels/0b/de/d2/9be5d59d7331c6c2a7c1b6d1a4f463ce107332b1ecd4e80718
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.2 pyspark-3.2.0
Collecting geopandas
  Downloading geopandas-0.10.2-py2.py3-none-any.whl (1.0 MB)
[K     |████████████████████████████████| 1.0 MB 4.2 MB/s 
[?25hCollecting pyproj>=2.2.0
  Downloading pyproj-3.2.1-cp37-cp37m-manylinu

In [3]:
# Configurar as variáveis de ambiente do Linux que o Google Colab roda em cima
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

In [4]:
# Tornar o pyspark "importável"
import findspark
findspark.init('spark-3.2.0-bin-hadoop3.2')

In [5]:
# Iniciar uma sessão local
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext

spark = SparkSession.builder.master('local[*]').getOrCreate()
sqlContext = SQLContext(spark)



In [112]:
# Import de tipos do Spark
from pyspark.sql.functions import *
from pyspark.sql.window import Window

## 2 - Transformações dos dados

### 2.1 - Importando e lendo .CSV

In [8]:
# Download do CSV para local, que será salvo no diretorio "/content/"
!wget --quiet --show-progress https://raw.githubusercontent.com/tiagoassun/keyrus_teste_tecnico/main/cadastro-individual-MUNICIPIO-fim.csv



In [9]:
# Carregar dados dos municípios
df_municipio_csv = spark.read \
                .format("csv") \
                .option("header", "true") \
                .option("encoding", "ISO-8859-1") \
                .load("./cadastro-individual-MUNICIPIO-fim.csv")

In [21]:
df_municipio_csv.printSchema()

root
 |-- uf: string (nullable = true)
 |-- ibge: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- parametro: string (nullable = true)
 |-- valor: string (nullable = true)
 |-- ano_quadrimestre: string (nullable = true)
 |-- ano: string (nullable = true)
 |-- quadrimestre: string (nullable = true)



### 2.2 - Modificando schema do DF

In [100]:
# Convertando colunas para valores inteiros
# Renomeando colunas
# Retirando colunas
df_municipio_cast = df_municipio_csv \
                        .withColumn('transacao', col("valor").cast('int')) \
                        .withColumnRenamed("uf", "estado") \
                        .withColumnRenamed("ano", "data_atualizacao") \
                        .drop('ibge', 'parametro', 'ano_quadrimestre', 'quadrimestre', 'valor')

In [101]:
df_municipio_cast.printSchema()

root
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- data_atualizacao: string (nullable = true)
 |-- transacao: integer (nullable = true)



In [102]:
df_municipio_cast.show(10, False)

+------+-----------------------+----------------+---------+
|estado|municipio              |data_atualizacao|transacao|
+------+-----------------------+----------------+---------+
|ES    |CACHOEIRO DE ITAPEMIRIM|2018            |130253   |
|ES    |SOORETAMA              |2018            |14226    |
|ES    |VENDA NOVA DO IMIGRANTE|2018            |22816    |
|ES    |ITAPEMIRIM             |2018            |26584    |
|ES    |IÚNA                   |2018            |20007    |
|ES    |MUCURICI               |2018            |4905     |
|ES    |NOVA VENÉCIA           |2018            |39970    |
|ES    |RIO NOVO DO SUL        |2018            |8900     |
|ES    |SANTA LEOPOLDINA       |2018            |6231     |
|ES    |SANTA MARIA DE JETIBÁ  |2018            |24870    |
+------+-----------------------+----------------+---------+
only showing top 10 rows



### 2.3 - Agrupando dados

In [107]:
df_municipio_agrupado = df_municipio_cast \
            .groupBy("estado", "municipio", "data_atualizacao")\
            .agg(sum('transacao').alias("transacao_por_ano")) \
            .orderBy("data_atualizacao")

In [108]:
df_municipio_agrupado.where("municipio = 'VITÓRIA'").show(10, False)

+------+---------+----------------+-----------------+
|estado|municipio|data_atualizacao|transacao_por_ano|
+------+---------+----------------+-----------------+
|ES    |VITÓRIA  |2018            |296246           |
|ES    |VITÓRIA  |2019            |399998           |
|ES    |VITÓRIA  |2020            |531479           |
|ES    |VITÓRIA  |2021            |223296           |
+------+---------+----------------+-----------------+



### 2.4 - Criando ordem da transação

#### 2.4.1 - Em SQL

In [109]:
# Criando tabela temporaria no Spark para poder usar consulta SQL
df_municipio_agrupado.select('estado', 'municipio', 'data_atualizacao', 'transacao_por_ano').createOrReplaceTempView('tb_df_municipio_agrupado_temp')

In [117]:
query_sql = """
SELECT 
    tb_temp.*, 
    RANK() OVER (PARTITION BY municipio ORDER BY municipio, data_atualizacao) AS ordem_transacao
FROM tb_df_municipio_agrupado_temp AS tb_temp
"""

df_municipio_final_sql = spark.sql(query_sql)

In [None]:
# Dropando a tabela temporaria pois não precisamos mais dela
spark.catalog.dropTempView("tb_df_municipio_agrupado_temp")

In [118]:
df_municipio_final_sql.show(10, False)

+------+--------------+----------------+-----------------+---------------+
|estado|municipio     |data_atualizacao|transacao_por_ano|ordem_transacao|
+------+--------------+----------------+-----------------+---------------+
|ES    |AFONSO CLÁUDIO|2018            |89996            |1              |
|ES    |AFONSO CLÁUDIO|2019            |96079            |2              |
|ES    |AFONSO CLÁUDIO|2020            |99651            |3              |
|ES    |AFONSO CLÁUDIO|2021            |33977            |4              |
|ES    |ALEGRE        |2018            |33262            |1              |
|ES    |ALEGRE        |2019            |33679            |2              |
|ES    |ALEGRE        |2020            |45179            |3              |
|ES    |ALEGRE        |2021            |24106            |4              |
|ES    |ALFREDO CHAVES|2018            |34501            |1              |
|ES    |ALFREDO CHAVES|2019            |40936            |2              |
+------+--------------+--

##### 2.4.1.1 - Gravando dados

In [123]:
#Salvandos os dados localmente em "/content/municipio_final_sql"
PATH = "./municipio_final_sql"

df_municipio_final_sql.coalesce(1) \
        .write \
        .mode("overwrite") \
        .save(PATH, format="parquet")

#### 2.4.2 - Em PySpark

In [114]:
janela = Window().partitionBy("municipio").orderBy("municipio", "data_atualizacao")

df_municipio_final_pyspark = df_municipio_agrupado \
                                    .withColumn("ordem_transacao", rank().over(janela))

In [115]:
df_municipio_final_pyspark.show(10, False)

+------+--------------+----------------+-----------------+---------------+
|estado|municipio     |data_atualizacao|transacao_por_ano|ordem_transacao|
+------+--------------+----------------+-----------------+---------------+
|ES    |AFONSO CLÁUDIO|2018            |89996            |1              |
|ES    |AFONSO CLÁUDIO|2019            |96079            |2              |
|ES    |AFONSO CLÁUDIO|2020            |99651            |3              |
|ES    |AFONSO CLÁUDIO|2021            |33977            |4              |
|ES    |ALEGRE        |2018            |33262            |1              |
|ES    |ALEGRE        |2019            |33679            |2              |
|ES    |ALEGRE        |2020            |45179            |3              |
|ES    |ALEGRE        |2021            |24106            |4              |
|ES    |ALFREDO CHAVES|2018            |34501            |1              |
|ES    |ALFREDO CHAVES|2019            |40936            |2              |
+------+--------------+--

##### 2.4.2.1 - Gravando dados

In [124]:
#Salvandos os dados localmente em "/content/municipio_final_pyspark"

PATH = "./municipio_final_pyspark"

df_municipio_final_pyspark.coalesce(1) \
    .write \
    .mode("overwrite") \
    .save(PATH, format="orc")