# Projeto Final Curso Data Engineer - Semantix

### Aluno: Wesley Sousa 

- O objetivo deste projeto é desmontrar as habilidades adquiridas neste curso, realizando um projeto prático utilizando diferentes ferramentas de Big Data

In [1]:
# Importações

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as f

## 1. Enviar os dados para o hdfs

- Dados: https://mobileapps.saude.gov.br/esus-vepi/files/unAFkcaNDeXajurGB7LChj8SgQYS2ptm/04bd3419b22b9cc5c6efac2c6528100d_HIST_PAINEL_COVIDBR_06jul2021.rar

In [5]:
# Primeiro passo é criar um diretório chamado covid no hdfs

!hdfs dfs -mkdir /user/wesley/covid

In [2]:
# Verificando se o diretório foi criado

!hdfs dfs -ls /user/wesley/

Found 8 items
drwxr-xr-x   - root supergroup          0 2022-08-06 01:40 /user/wesley/covid
drwxr-xr-x   - root supergroup          0 2022-08-01 13:38 /user/wesley/data
drwxr-xr-x   - root supergroup          0 2022-07-29 11:12 /user/wesley/names_us_parquet
drwxr-xr-x   - root supergroup          0 2022-08-02 11:56 /user/wesley/projeto_python
drwxr-xr-x   - root supergroup          0 2022-08-01 14:31 /user/wesley/relatorio_anual
drwxr-xr-x   - root supergroup          0 2022-08-03 11:50 /user/wesley/stream
drwxr-xr-x   - root supergroup          0 2022-08-01 13:38 /user/wesley/teste_csv
drwxr-xr-x   - root supergroup          0 2022-08-06 19:48 /user/wesley/visual2


- Baixei o arquivo no Linux na seguinte pasta docker-bigdata/input/exercises-data
- sudo curl -O https://mobileapps.saude.gov.br/esus-vepi/files/unAFkcaNDeXajurGB7LChj8SgQYS2ptm/04bd3419b22b9cc5c6efac2c6528100d_HIST_PAINEL_COVIDBR_06jul2021.rar
- Após baixar o arquivo, foi necessário baixar um descompactador, para isto utilizei os comandos abaixo.
- sudo apt-get install rar unrar
- e o comando para descompactar o arquivo
- sudo unrar x 04bd3419b22b9cc5c6efac2c6528100d_HIST_PAINEL_COVIDBR_06jul2021.rar

- Para enviar o arquivo para o hdfs, precisamos acessar o container do namenode utilizando o comando
- docker exec -it namenode bash ou docker exec -it namenode /bin/bash
- Depois de acessar, precisamos enviar os arquivos utilizando os comandos abaixo
- root@namenode:/# hdfs dfs -put /input/exercises-data/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv /user/wesley/covid
- root@namenode:/# hdfs dfs -put /input/exercises-data/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv /user/wesley/covid
- root@namenode:/# hdfs dfs -put /input/exercises-data/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv /user/wesley/covid
- root@namenode:/# hdfs dfs -put /input/exercises-data/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv /user/wesley/covid

In [3]:
# Verificando se os dados estão no hdfs

!hdfs dfs -ls /user/wesley/covid

Found 4 items
-rw-r--r--   3 root supergroup   62492959 2022-08-06 01:32 /user/wesley/covid/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup   76520681 2022-08-06 01:39 /user/wesley/covid/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv
-rw-r--r--   3 root supergroup   91120916 2022-08-06 01:39 /user/wesley/covid/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup    3046774 2022-08-06 01:40 /user/wesley/covid/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv


## 2. Otimizar todos os dados do hdfs para uma tabela Hive particionada por município

In [3]:
# Verificando o conteúdo do arquivo

!hdfs dfs -tail /user/wesley/covid/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv

30010;53001;DISTRITO FEDERAL;2020-07-22;30;3015268;87801;1725;1176;18;;;1
Centro-Oeste;DF;Brasília;53;530010;53001;DISTRITO FEDERAL;2020-07-23;30;3015268;90023;2222;1218;42;;;1
Centro-Oeste;DF;Brasília;53;530010;53001;DISTRITO FEDERAL;2020-07-24;30;3015268;92414;2391;1244;26;;;1
Centro-Oeste;DF;Brasília;53;530010;53001;DISTRITO FEDERAL;2020-07-25;30;3015268;94187;1773;1275;31;;;1
Centro-Oeste;DF;Brasília;53;530010;53001;DISTRITO FEDERAL;2020-07-26;31;3015268;96332;2145;1308;33;;;1
Centro-Oeste;DF;Brasília;53;530010;53001;DISTRITO FEDERAL;2020-07-27;31;3015268;98480;2148;1339;31;;;1
Centro-Oeste;DF;Brasília;53;530010;53001;DISTRITO FEDERAL;2020-07-28;31;3015268;100726;2246;1391;52;;;1
Centro-Oeste;DF;Brasília;53;530010;53001;DISTRITO FEDERAL;2020-07-29;31;3015268;102342;1616;1419;28;;;1
Centro-Oeste;DF;Brasília;53;530010;53001;DISTRITO FEDERAL;2020-07-30;31;3015268;104442;2100;1444;25;;;1
Centro-Oeste;DF;Brasília;53;530010;53001;DISTRITO FEDERAL;2020-07-31;31;3015268;1

In [4]:
# Lendo os arquivos para um dataframe

df = spark.read.csv("/user/wesley/covid/*.csv", sep=';', header=True, inferSchema = True)
df.show(5)

+------+------+---------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|regiao|estado|municipio|coduf|codmun|codRegiaoSaude|nomeRegiaoSaude|               data|semanaEpi|populacaoTCU2019|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|interior/metropolitana|
+------+------+---------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|Brasil|  null|     null|   76|  null|          null|           null|2020-02-25 00:00:00|        9|       210147125|             0|         0|              0|          0|            null|                 null|                  null|
|Brasil|  null|     null|   76|  null|          null|           null

In [5]:
# Verificando o schema da tabela

df.printSchema()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- casosAcumulado: decimal(10,0) (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interior/metropolitana: integer (nullable = true)



In [6]:
# pegando apenas algumas colunas para entender o dataframe, com isto verifiquei que possui diversos valores nulos

df.select("regiao", "estado", "municipio", 'Recuperadosnovos', 'emAcompanhamentoNovos', "data").show(10)

+------+------+---------+----------------+---------------------+-------------------+
|regiao|estado|municipio|Recuperadosnovos|emAcompanhamentoNovos|               data|
+------+------+---------+----------------+---------------------+-------------------+
|Brasil|  null|     null|            null|                 null|2020-02-25 00:00:00|
|Brasil|  null|     null|            null|                 null|2020-02-26 00:00:00|
|Brasil|  null|     null|            null|                 null|2020-02-27 00:00:00|
|Brasil|  null|     null|            null|                 null|2020-02-28 00:00:00|
|Brasil|  null|     null|            null|                 null|2020-02-29 00:00:00|
|Brasil|  null|     null|            null|                 null|2020-03-01 00:00:00|
|Brasil|  null|     null|            null|                 null|2020-03-02 00:00:00|
|Brasil|  null|     null|            null|                 null|2020-03-03 00:00:00|
|Brasil|  null|     null|            null|                 null|2

In [7]:
# Alterando o padrão de data para o formato brasileiro

df = df.withColumn('data', f.from_unixtime(f.unix_timestamp(df.data), "yyyy-MM-dd"))

In [8]:
# Verificando se foi alterado a data

df.select("regiao", "estado", "municipio", 'Recuperadosnovos', 'emAcompanhamentoNovos', "data").show(10)

+------+------+---------+----------------+---------------------+----------+
|regiao|estado|municipio|Recuperadosnovos|emAcompanhamentoNovos|      data|
+------+------+---------+----------------+---------------------+----------+
|Brasil|  null|     null|            null|                 null|2020-02-25|
|Brasil|  null|     null|            null|                 null|2020-02-26|
|Brasil|  null|     null|            null|                 null|2020-02-27|
|Brasil|  null|     null|            null|                 null|2020-02-28|
|Brasil|  null|     null|            null|                 null|2020-02-29|
|Brasil|  null|     null|            null|                 null|2020-03-01|
|Brasil|  null|     null|            null|                 null|2020-03-02|
|Brasil|  null|     null|            null|                 null|2020-03-03|
|Brasil|  null|     null|            null|                 null|2020-03-04|
|Brasil|  null|     null|            null|                 null|2020-03-05|
+------+----

### Criando o banco no Hive: 
- Para salvar os dados em uma tabela Hive, primeiro precisamos  acessar o Hive e criar um database
- Acessando o Hive: docker exec -it hive-server bash
- Acessando o Beeline: beeline -u jdbc:hive2://Localhost:10000
- create database projeto_semantix comment "banco de dados do para o projeto final do curso data enginner da semantix"


In [20]:
# Salvando os dados em tabela hive particionado pelo municipio

df.write.mode("overwrite").partitionBy('municipio').saveAsTable('projeto_semantix.wesley')

In [8]:
# Confirmando o particionamento por municipio

!hdfs dfs -ls /user/hive/warehouse/projeto_semantix.db/wesley

Found 5298 items
-rw-r--r--   2 root supergroup          0 2022-08-06 03:17 /user/hive/warehouse/projeto_semantix.db/wesley/_SUCCESS
drwxr-xr-x   - root supergroup          0 2022-08-06 03:15 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Abadia de Goiás
drwxr-xr-x   - root supergroup          0 2022-08-06 03:15 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Abadia dos Dourados
drwxr-xr-x   - root supergroup          0 2022-08-06 03:15 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Abadiânia
drwxr-xr-x   - root supergroup          0 2022-08-06 03:15 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Abaetetuba
drwxr-xr-x   - root supergroup          0 2022-08-06 03:15 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Abaeté
drwxr-xr-x   - root supergroup          0 2022-08-06 03:15 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Abaiara
drwxr-xr-x   - root supergroup          0 2022-08-06 03:15 /user/hive/warehouse/proje

drwxr-xr-x   - root supergroup          0 2022-08-06 03:15 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Barão de Monte Alto
drwxr-xr-x   - root supergroup          0 2022-08-06 03:15 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Barão do Triunfo
drwxr-xr-x   - root supergroup          0 2022-08-06 03:15 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Bastos
drwxr-xr-x   - root supergroup          0 2022-08-06 03:15 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Bataguassu
drwxr-xr-x   - root supergroup          0 2022-08-06 03:15 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Batalha
drwxr-xr-x   - root supergroup          0 2022-08-06 03:15 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Batatais
drwxr-xr-x   - root supergroup          0 2022-08-06 03:15 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Batayporã
drwxr-xr-x   - root supergroup          0 2022-08-06 03:15 /user/hive/warehouse/projeto_sema

drwxr-xr-x   - root supergroup          0 2022-08-06 03:16 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Cruzeiro do Sul
drwxr-xr-x   - root supergroup          0 2022-08-06 03:16 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Cruzeta
drwxr-xr-x   - root supergroup          0 2022-08-06 03:16 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Cruzmaltina
drwxr-xr-x   - root supergroup          0 2022-08-06 03:16 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Cruzália
drwxr-xr-x   - root supergroup          0 2022-08-06 03:16 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Cruzília
drwxr-xr-x   - root supergroup          0 2022-08-06 03:16 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Cubati
drwxr-xr-x   - root supergroup          0 2022-08-06 03:16 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Cubatão
drwxr-xr-x   - root supergroup          0 2022-08-06 03:16 /user/hive/warehouse/projeto_semantix.db/wesle

drwxr-xr-x   - root supergroup          0 2022-08-06 03:16 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Junco do Maranhão
drwxr-xr-x   - root supergroup          0 2022-08-06 03:16 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Junco do Seridó
drwxr-xr-x   - root supergroup          0 2022-08-06 03:16 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Jundiaí
drwxr-xr-x   - root supergroup          0 2022-08-06 03:16 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Jundiaí do Sul
drwxr-xr-x   - root supergroup          0 2022-08-06 03:16 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Jundiá
drwxr-xr-x   - root supergroup          0 2022-08-06 03:16 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Junqueiro
drwxr-xr-x   - root supergroup          0 2022-08-06 03:16 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Junqueirópolis
drwxr-xr-x   - root supergroup          0 2022-08-06 03:16 /user/hive/warehouse/proje

drwxr-xr-x   - root supergroup          0 2022-08-06 03:17 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Salvador do Sul
drwxr-xr-x   - root supergroup          0 2022-08-06 03:17 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Salvaterra
drwxr-xr-x   - root supergroup          0 2022-08-06 03:17 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Sambaíba
drwxr-xr-x   - root supergroup          0 2022-08-06 03:17 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Sampaio
drwxr-xr-x   - root supergroup          0 2022-08-06 03:17 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Sananduva
drwxr-xr-x   - root supergroup          0 2022-08-06 03:17 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Sanclerlândia
drwxr-xr-x   - root supergroup          0 2022-08-06 03:17 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Sandolândia
drwxr-xr-x   - root supergroup          0 2022-08-06 03:17 /user/hive/warehouse/projeto_semant

drwxr-xr-x   - root supergroup          0 2022-08-06 03:17 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Vila Nova dos Martírios
drwxr-xr-x   - root supergroup          0 2022-08-06 03:17 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Vila Pavão
drwxr-xr-x   - root supergroup          0 2022-08-06 03:17 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Vila Propício
drwxr-xr-x   - root supergroup          0 2022-08-06 03:17 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Vila Rica
drwxr-xr-x   - root supergroup          0 2022-08-06 03:17 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Vila Valério
drwxr-xr-x   - root supergroup          0 2022-08-06 03:17 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Vila Velha
drwxr-xr-x   - root supergroup          0 2022-08-06 03:17 /user/hive/warehouse/projeto_semantix.db/wesley/municipio=Vilhena
drwxr-xr-x   - root supergroup          0 2022-08-06 03:17 /user/hive/warehouse/pro

## 3.  Criar 3 vizualizações pelo Spark com os dados enviados para o HDFS:

In [9]:
# A primeira visualzização é o total de recuperados e o total em Acompanhamentos
# Utilizando a última data que possui essas informações atualizadas

visual1 = df.select("Recuperadosnovos", "emAcompanhamentoNovos").where(df.data == '2021-07-06')
visual1.show(1)

+----------------+---------------------+
|Recuperadosnovos|emAcompanhamentoNovos|
+----------------+---------------------+
|        17262646|              1065477|
+----------------+---------------------+
only showing top 1 row



In [10]:
# A segunda visualização é o total de casos confirmados Acumulado, Casos Novos e Incidência

df1 = df.where(df.data == "2021-07-06")
visual2 = df1.select("casosAcumulado", 
                     "casosNovos", 
                     f.round(df1['casosAcumulado']/df1['populacaoTCU2019']*100000,1)
                     .alias("incidência"))
visual2.show(1)

+--------------+----------+----------+
|casosAcumulado|casosNovos|incidência|
+--------------+----------+----------+
|      18855015|     62504|    8972.3|
+--------------+----------+----------+
only showing top 1 row



In [11]:
# A terceira visualização é óbitos acumulados, casos novos, letalidade e mortalidade
# A letalidade seria o valor de obitos acumulado dividido pelo número de casos acumulado

visual3 = df1.select("obitosAcumulado", 
                     "obitosNovos",
                     f.round(df1['obitosAcumulado']/df1['casosAcumulado']*100,1)
                     .alias("Letalidade"),
                     f.round(df1['obitosAcumulado']/df1['populacaoTCU2019']*100000,1)
                     .alias("Mortalidade"))
visual3.show(1)

+---------------+-----------+----------+-----------+
|obitosAcumulado|obitosNovos|Letalidade|Mortalidade|
+---------------+-----------+----------+-----------+
|         526892|       1780|       2.8|      250.7|
+---------------+-----------+----------+-----------+
only showing top 1 row



## 4. Salvar a primeira visualização como tabela Hive

In [84]:
visual1.write.mode("overwrite").saveAsTable("projeto_semantix.visual1")

In [92]:
# Verificando se foi salvo no Hive

!hdfs dfs -ls /user/hive/warehouse/projeto_semantix.db

Found 3 items
drwxr-xr-x   - root supergroup          0 2022-08-06 02:55 /user/hive/warehouse/projeto_semantix.db/covid
drwxr-xr-x   - root supergroup          0 2022-08-06 19:42 /user/hive/warehouse/projeto_semantix.db/visual1
drwxr-xr-x   - root supergroup          0 2022-08-06 03:17 /user/hive/warehouse/projeto_semantix.db/wesley


## 5. Salvar a segunda visualização com formato parquet e compressão snappy

In [88]:
visual2.write.parquet("/user/wesley/visual2", compression='snappy')

In [90]:
# Verificando se foi salvo

!hdfs dfs -ls /user/wesley/visual2

Found 3 items
-rw-r--r--   2 root supergroup          0 2022-08-06 19:48 /user/wesley/visual2/_SUCCESS
-rw-r--r--   2 root supergroup        496 2022-08-06 19:48 /user/wesley/visual2/part-00000-f2cf59e9-9535-4e93-9534-041b80c4a120-c000.snappy.parquet
-rw-r--r--   2 root supergroup      56169 2022-08-06 19:48 /user/wesley/visual2/part-00003-f2cf59e9-9535-4e93-9534-041b80c4a120-c000.snappy.parquet


## 6. Salvar a terceira visualização em um tópico no Kafka

In [96]:
visual3.selectExpr("to_json(struct(*)) AS value")\
    .write\
    .format('kafka')\
    .option('kafka.bootstrap.servers', 'kafka:9092')\
    .option('topic', 'obitos_brasil')\
    .save()

+-------------------------------------------------------------------------------------------------------+
|value                                                                                                  |
+-------------------------------------------------------------------------------------------------------+
|{"Região":"Brasil","Óbitos_Acumulados":526892,"Óbitos_Novos":1780,"Letalidade":2.8,"Mortalidade":250.7}|
|{"Região":"Brasil","Óbitos_Acumulados":526892,"Óbitos_Novos":1780,"Letalidade":2.8,"Mortalidade":250.7}|
+-------------------------------------------------------------------------------------------------------+


In [101]:
# Lendo o arquivo que foi salvo no tópico do kafka

obitos = spark.read.format('kafka').option('kafka.bootstrap.servers', 'kafka:9092').option('subscribe', 'visual3').load()

## 7. Criar a visualização pelo Spark com os dados enviados para o HDFS:


In [42]:
# Visual 4 mostra o total de Casos, Óbitos, Incidência/100mil hab, Mortalidade/100mil hab

visual4 = df1.groupBy('regiao')\
            .agg(sum('casosAcumulado')\
            .alias("casos"),sum('obitosAcumulado')\
            .alias('Óbitos'),max('populacaoTCU2019')\
            .alias("Populacao"))

visual4 = (visual4.withColumn('Incidência/100mil hab.', f.round(visual4['casos']/visual4['Populacao']*100000,1))
          .withColumn('Mortalidade/100mil hab.', f.round(visual4['Óbitos']/visual4['Populacao']*100000,1)))

visual4 = visual4.drop('Populacao')

visual4.sort(col('regiao').asc()).show()

+------------+--------+------+----------------------+-----------------------+
|      regiao|   casos|Óbitos|Incidência/100mil hab.|Mortalidade/100mil hab.|
+------------+--------+------+----------------------+-----------------------+
|      Brasil|18855015|526892|                8972.3|                  250.7|
|Centro-Oeste| 3833238| 98414|               54617.3|                 1402.2|
|    Nordeste| 8911474|215648|               59916.9|                 1449.9|
|       Norte| 3465630| 87690|               40284.6|                 1019.3|
|     Sudeste|14277606|490622|               31093.0|                 1068.4|
|         Sul| 7222082|161410|               63163.5|                 1411.7|
+------------+--------+------+----------------------+-----------------------+

