# Projeto Final Big Data Engineer Semantix Academy

## 01. Enviar os dados para o HDFS

Etapa realizada via terminal
1. Acessando o contêiner namenode: 
`docker exec -it namenode bash`
2. Criando a estrutura de pastas:
`hdfs dfs -mkdir -p /user/raoni`
3. Testando se a criação aconteceu com sucesso:
`hdfs dfs -ls /user/`
4. Enviar os arquivos para o HDFS:
`hdfs dfs -put input/dados_covid/ /user/raoni/`


In [23]:
# Verificando se os arquivos constam no HDFS
!hdfs dfs -ls /user/raoni/dados_covid

Found 4 items
-rw-r--r--   3 root supergroup   62492959 2022-04-25 15:31 /user/raoni/dados_covid/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup  138681606 2022-04-25 15:31 /user/raoni/dados_covid/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv
-rw-r--r--   3 root supergroup  167521915 2022-04-25 15:31 /user/raoni/dados_covid/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup   93906103 2022-04-25 15:31 /user/raoni/dados_covid/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv


### Estruturando melhor os dados 

In [53]:
# Importação das bibliotecas necessárias para o projeto
from pyspark.sql.types import StructType
from pyspark.sql import functions as f
from pyspark.sql.functions import unix_timestamp, col, when

In [3]:
# Construindo o schema a ser associado ao banco de dados
dados_schema = StructType()\
    .add("regiao", "string")\
    .add("estado", "string")\
    .add("municipio", "string")\
    .add("coduf", "integer")\
    .add("codmun", "integer")\
    .add("codRegiaoSaude", "integer")\
    .add("nomeRegiaoSaude", "string")\
    .add("data", "timestamp")\
    .add("semanaEpi", "integer")\
    .add("populacaoTCU2019", "integer")\
    .add("casosAcumulado","integer")\
    .add("casosNovos","integer")\
    .add("obitosAcumulado","integer")\
    .add("obitosNovos","integer")\
    .add("Recuperadosnovos","integer")\
    .add("emAcompanhamentoNovos","integer")\
    .add("interior/metropolitana","integer")

In [4]:
# Leitura dos dados
dados = spark.read.csv("/user/raoni/dados_covid/*.csv", header="true", sep=";", schema=dados_schema)

In [5]:
# Visualização do Schema
dados.printSchema()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- casosAcumulado: integer (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interior/metropolitana: integer (nullable = true)



In [6]:
dados.show(5)

+------+------+---------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|regiao|estado|municipio|coduf|codmun|codRegiaoSaude|nomeRegiaoSaude|               data|semanaEpi|populacaoTCU2019|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|interior/metropolitana|
+------+------+---------+-----+------+--------------+---------------+-------------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+
|Brasil|  null|     null|   76|  null|          null|           null|2020-08-01 00:00:00|       31|       210147125|       2707877|     45392|          93563|       1088|         1865729|               748585|                  null|
|Brasil|  null|     null|   76|  null|          null|           null

In [7]:
# Ajustando a estrutura da coluna data - Excluir o campo hora (as informações são diárias)
dados = dados.withColumn('data', f.from_unixtime(f.unix_timestamp(dados.data), "yyyy-MM-dd"))

In [8]:
# Apresentando os dados de  forma mais limpa com o auxilio do pandas 
dados.limit(10).toPandas()

Unnamed: 0,regiao,estado,municipio,coduf,codmun,codRegiaoSaude,nomeRegiaoSaude,data,semanaEpi,populacaoTCU2019,casosAcumulado,casosNovos,obitosAcumulado,obitosNovos,Recuperadosnovos,emAcompanhamentoNovos,interior/metropolitana
0,Brasil,,,76,,,,2020-08-01,31,210147125,2707877,45392,93563,1088,1865729,748585,
1,Brasil,,,76,,,,2020-08-02,32,210147125,2733677,25800,94104,541,1883677,755896,
2,Brasil,,,76,,,,2020-08-03,32,210147125,2750318,16641,94665,561,1912319,743334,
3,Brasil,,,76,,,,2020-08-04,32,210147125,2801921,51603,95819,1154,1970767,735335,
4,Brasil,,,76,,,,2020-08-05,32,210147125,2857597,55676,97240,1421,2020637,741180,
5,Brasil,,,76,,,,2020-08-06,32,210147125,2912212,54615,98493,1253,2080916,781954,
6,Brasil,,,76,,,,2020-08-07,32,210147125,2962442,50230,99572,1079,2068394,794476,
7,Brasil,,,76,,,,2020-08-08,32,210147125,3012412,49970,100477,905,2094293,817642,
8,Brasil,,,76,,,,2020-08-09,33,210147125,3035422,23010,101049,572,2118460,815913,
9,Brasil,,,76,,,,2020-08-10,33,210147125,3057470,22048,101752,703,2163812,791906,


## 02. Otimizar todos os dados do HDFS para uma tabela Hive particionada por município

In [14]:
# Salvado como tabela Hive particionada por município  
dados.write.saveAsTable("dados_covid_tabela_hive", format="parquet", partitionBy="municipio")

In [9]:
#Conferindo se a tabela Hive foi salva com sucesso
sqlContext.sql("show partitions dados_covid_tabela_hive").show(5, truncate=False)

+-----------------------------+
|partition                    |
+-----------------------------+
|municipio=Abadia de Goiás    |
|municipio=Abadia dos Dourados|
|municipio=Abadiânia          |
|municipio=Abaetetuba         |
|municipio=Abaeté             |
+-----------------------------+
only showing top 5 rows



## 03. Criar 3 visualizações pelo Spark com os dados enviados para o HDFS

### Construindo 1º visualização 

In [9]:
# Selecionando os dados da cidade de Recife - PE
dados_recife = spark.read.parquet("/user/hive/warehouse/dados_covid_tabela_hive/municipio=Recife")

In [10]:
# Construindo um DataFrame com a soma dos óbitos acumulados
recife_obitos_acumulados = dados_recife.agg({"obitosAcumulado":"sum"})

In [11]:
# Visualizando 
recife_obitos_acumulados.show()

+--------------------+
|sum(obitosAcumulado)|
+--------------------+
|             1159750|
+--------------------+



In [12]:
# Renomeando a coluna, pois tabela Hive não aceita alguns caracteres como o de parênteses 
recife_obitos_acumulados = recife_obitos_acumulados.withColumnRenamed("sum(obitosAcumulado)", "obitosAcumulados")

In [13]:
#Visualizando apôs alteração 
recife_obitos_acumulados.show()

+----------------+
|obitosAcumulados|
+----------------+
|         1159750|
+----------------+



### Construindo 2º visualização 

In [14]:
dados_recife.select("data", "obitosAcumulado", "casosAcumulado").show(10)

+----------+---------------+--------------+
|      data|obitosAcumulado|casosAcumulado|
+----------+---------------+--------------+
|2021-01-01|           2695|         51088|
|2021-01-02|           2696|         51246|
|2021-01-03|           2698|         51324|
|2021-01-04|           2697|         51500|
|2021-01-05|           2699|         51850|
|2021-01-06|           2706|         52064|
|2021-01-07|           2715|         52508|
|2021-01-08|           2726|         52637|
|2021-01-09|           2749|         52778|
|2021-01-10|           2751|         53138|
+----------+---------------+--------------+
only showing top 10 rows



In [21]:
obitos_casos_acumulados_recife = dados_recife.select("data", "obitosAcumulado", "casosAcumulado")

### Construindo 3º visualização 

In [27]:
# Óbitos acumulados e mortalidade por dia na cidade do Recife 
dados_recife_mortalidade = dados_recife.select(
    dados_recife['data'].alias('Data'),\
    dados_recife['obitosAcumulado'].alias('Óbitos_Acumulados'),\
    (f.round(dados_recife['obitosAcumulado']/dados_recife['populacaoTCU2019']*100000,2)).alias('Mortalidade')
)

In [28]:
dados_recife_mortalidade.show(truncate=False)

+----------+-----------------+-----------+
|Data      |Óbitos_Acumulados|Mortalidade|
+----------+-----------------+-----------+
|2021-01-01|2695             |163.76     |
|2021-01-02|2696             |163.82     |
|2021-01-03|2698             |163.94     |
|2021-01-04|2697             |163.88     |
|2021-01-05|2699             |164.0      |
|2021-01-06|2706             |164.43     |
|2021-01-07|2715             |164.97     |
|2021-01-08|2726             |165.64     |
|2021-01-09|2749             |167.04     |
|2021-01-10|2751             |167.16     |
|2021-01-11|2751             |167.16     |
|2021-01-12|2770             |168.31     |
|2021-01-13|2782             |169.04     |
|2021-01-14|2792             |169.65     |
|2021-01-15|2798             |170.02     |
|2021-01-16|2808             |170.62     |
|2021-01-17|2825             |171.66     |
|2021-01-18|2826             |171.72     |
|2021-01-19|2833             |172.14     |
|2021-01-20|2848             |173.05     |
+----------

## 04. Salvar a primeira visualização como tabela Hive

In [18]:
dados.write.saveAsTable("recife_obitos_acumulados", format="parquet")

## 05. Salvar a segunda visualização com formato parquet e compressão snappy

In [25]:
# Salvando em formato parquet e compressão snappy
obitos_casos_acumulados_recife.write.parquet(path="/user/raoni/casosEObitosAcumuladosRecife",compression="snappy")

In [26]:
# Conferindo se o processo anterior aconteceu com sucesso
!hdfs dfs -ls /user/raoni/casosEObitosAcumuladosRecife

Found 5 items
-rw-r--r--   2 root supergroup          0 2022-04-28 22:08 /user/raoni/casosEObitosAcumuladosRecife/_SUCCESS
-rw-r--r--   2 root supergroup       3244 2022-04-28 22:08 /user/raoni/casosEObitosAcumuladosRecife/part-00000-24c1e2f4-7a2c-4e72-8f76-34014a43292b-c000.snappy.parquet
-rw-r--r--   2 root supergroup       2780 2022-04-28 22:08 /user/raoni/casosEObitosAcumuladosRecife/part-00001-24c1e2f4-7a2c-4e72-8f76-34014a43292b-c000.snappy.parquet
-rw-r--r--   2 root supergroup       2540 2022-04-28 22:08 /user/raoni/casosEObitosAcumuladosRecife/part-00002-24c1e2f4-7a2c-4e72-8f76-34014a43292b-c000.snappy.parquet
-rw-r--r--   2 root supergroup        960 2022-04-28 22:08 /user/raoni/casosEObitosAcumuladosRecife/part-00003-24c1e2f4-7a2c-4e72-8f76-34014a43292b-c000.snappy.parquet


## 06. Salvar a terceira visualização em um tópico no Kafka

In [39]:
# Construindo um tópico no Kafka

dados_recife_mortalidade = spark\
    .read\
    .format('kafka')\
    .option('kafka.bootstrap.servers', 'kafka:9092')\
    .option('subscribe', 'mortalidade_recife')\
    .load()


In [42]:
# Realizando a leitura para conferir sua criação 

topic_mortalidade_recife = spark.read\
    .format('kafka')\
    .option('kafka.bootstrap.servers', 'kafka:9092')\
    .option('subscribe','mortalidade_recife') \
    .load()

topic_string = topic_mortalidade_recife.select(col('value').cast('string'))
topic_string.show(truncate = False)
    

+-------------------------------------------------------------------+
|value                                                              |
+-------------------------------------------------------------------+
|{"Data":"2020-03-27","Óbitos_Acumulados":0,"Mortalidade":0.0}      |
|{"Data":"2021-01-01","Óbitos_Acumulados":2695,"Mortalidade":163.76}|
|{"Data":"2021-07-01","Óbitos_Acumulados":4684,"Mortalidade":284.62}|
|{"Data":"2020-08-01","Óbitos_Acumulados":2128,"Mortalidade":129.3} |
|{"Data":"2020-03-28","Óbitos_Acumulados":4,"Mortalidade":0.24}     |
|{"Data":"2021-07-02","Óbitos_Acumulados":4695,"Mortalidade":285.28}|
|{"Data":"2020-08-02","Óbitos_Acumulados":2130,"Mortalidade":129.43}|
|{"Data":"2021-07-03","Óbitos_Acumulados":4718,"Mortalidade":286.68}|
|{"Data":"2020-03-29","Óbitos_Acumulados":4,"Mortalidade":0.24}     |
|{"Data":"2021-01-02","Óbitos_Acumulados":2696,"Mortalidade":163.82}|
|{"Data":"2020-03-30","Óbitos_Acumulados":4,"Mortalidade":0.24}     |
|{"Data":"2021-01-03

## 07. Criar uma visualização pelo Spark com os dados enviados ao HDFS

In [66]:
# Construindo visualização por região com os dados: população, casos acumulados, obítos e mortalidade
sintese_região = dados.groupBy('regiao')\
    .agg({'casosAcumulado':'max', 'obitosAcumulado':'max','populacaoTCU2019':'max'})

sintese_região =(sintese_região\
                 .withColumnRenamed('max(populacaoTCU2019)','População')\
                 .withColumnRenamed('max(casosAcumulado)', 'Casos_Acumulados')\
                 .withColumnRenamed('max(obitosAcumulado)','Óbitos_Acumulados')
                )
sintese_região = (sintese_região
                  .withColumn('Mortalidade',\
                              f.round(sintese_região['Óbitos_Acumulados']/sintese_região['População']*100000,2))
                 )
sintese_região = sintese_região.withColumn('New_col', when(sintese_região.regiao != 'null', "True"))\
    .filter("New_col == True").drop("New_col")

sintese_região = sintese_região.orderBy(col('regiao'))

In [69]:
# visualizando com a ajuda do pandas
sintese_região.toPandas()

Unnamed: 0,regiao,População,Casos_Acumulados,Óbitos_Acumulados,Mortalidade
0,Brasil,210147125,18855015,526892,250.73
1,Centro-Oeste,7018354,686433,19485,277.63
2,Nordeste,14873064,1141612,24428,164.24
3,Norte,8602865,557708,15624,181.61
4,Sudeste,45919049,3809222,130389,283.95
5,Sul,11433957,1308643,31867,278.7
