Projeto Final Semantix

Aluno: Pedro Augusto Ramos

Neste projeto iremos baixar as tabelas com os dados do site do Ministério da Saúde brasileiro  https://covid.saude.gov.br/ 
sobre a epidemia de Covid-19. 

A proposta é utilizar Spark com linguagem pyspark sql e reproduzir os resumos das informação mostradas no site, como por exemplo número total de casos confirmados, casos recuperados, novos casos, taxa de letalidade, etc.

Importação dos Pacotes:

In [1]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as f

Criado o diretório "projeto-final" no hdfs

In [2]:
!hdfs dfs -mkdir -p /user/pedro/projeto-final

Foi realizada a transferência dos arquivos sobre casos de Covid-19 no Brasil que foram baixados localmente do site do ministério da saúde do governo brasileiro para o HDFS no terminal dentro do container namenode. Podemos conferir abaixo:

In [4]:
!hdfs dfs -ls /user/pedro/projeto-final/dados

Found 7 items
-rw-r--r--   3 root supergroup       6148 2022-08-06 22:29 /user/pedro/projeto-final/dados/.DS_Store
-rw-r--r--   3 root supergroup   62493275 2022-08-06 22:29 /user/pedro/projeto-final/dados/HIST_PAINEL_COVIDBR_2020_Parte1_03ago2022.csv
-rw-r--r--   3 root supergroup   76520606 2022-08-06 22:29 /user/pedro/projeto-final/dados/HIST_PAINEL_COVIDBR_2020_Parte2_03ago2022.csv
-rw-r--r--   3 root supergroup   91120853 2022-08-06 22:29 /user/pedro/projeto-final/dados/HIST_PAINEL_COVIDBR_2021_Parte1_03ago2022.csv
-rw-r--r--   3 root supergroup   93414239 2022-08-06 22:29 /user/pedro/projeto-final/dados/HIST_PAINEL_COVIDBR_2021_Parte2_03ago2022.csv
-rw-r--r--   3 root supergroup   91807916 2022-08-06 22:29 /user/pedro/projeto-final/dados/HIST_PAINEL_COVIDBR_2022_Parte1_03ago2022.csv
-rw-r--r--   3 root supergroup   17330062 2022-08-06 22:29 /user/pedro/projeto-final/dados/HIST_PAINEL_COVIDBR_2022_Parte2_03ago2022.csv


Criamos um dataframe para ler as tabelas e entendermos seu schema e estrutura

In [8]:
df_tabela_covid = spark.read.csv("/user/pedro/projeto-final/dados/*.csv", sep = ";", header = "true", inferSchema = "true")


Podemos agora indentificar as colunas e seus schemas

In [17]:
df_tabela_covid.printSchema()


root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- casosAcumulado: decimal(10,0) (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interior/metropolitana: integer (nullable = true)



Número de linhas da tabela

In [18]:
df_tabela_covid.count()

4833208

Otimizar todos os dados do hdfs para uma tabela Hive particionada por município

In [23]:
df_tabela_covid.write.mode("overwrite").partitionBy("municipio").saveAsTable("tabela_covid")

Verificando a tabela particionada no hive por município:

In [25]:
!hdfs dfs -ls /user/hive/warehouse/tabela_covid

Found 5299 items
-rw-r--r--   2 root supergroup          0 2022-08-07 03:14 /user/hive/warehouse/tabela_covid/_SUCCESS
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Abadia de Goiás
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Abadia dos Dourados
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Abadiânia
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Abaetetuba
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Abaeté
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Abaiara
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Abaré
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/ta

drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Arabutã
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Aracaju
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Aracati
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Aracatu
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Araci
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Aracitaba
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Aracoiaba
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Aracruz
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio

drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Barra do Ribeiro
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Barra do Rio Azul
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Barra do Rocha
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Barra do Turvo
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Barra dos Coqueiros
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Barracão
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Barras
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Barreira
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13

drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Cabrália Paulista
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Cacaulândia
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Cacequi
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Cachoeira
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Cachoeira Alta
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Cachoeira Dourada
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Cachoeira Grande
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Cachoeira Paulista
drwxr-xr-x   - root supergroup          0 2022-08-0

drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Cerro Largo
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Cerro Negro
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Cesário Lange
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Cezarina
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Chalé
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Chapada
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Chapada Gaúcha
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Chapada da Natividade
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/

drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Dom Inocêncio
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Dom Joaquim
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Dom Macedo Costa
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Dom Pedrito
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Dom Pedro
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Dom Pedro de Alcântara
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Dom Silvério
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Dom Viçoso
drwxr-xr-x   - root supergroup          0 2022-08-07 03:

drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Iraí de Minas
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Irecê
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Iretama
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Irineópolis
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Irituia
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Irupi
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Isaías Coelho
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Israelândia
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_c

drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Manoel Vitorino
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Mansidão
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Mantena
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Mantenópolis
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Maquiné
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Mar Vermelho
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Mar de Espanha
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Mara Rosa
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/wareho

drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Paragominas
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Paraguaçu
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Paraguaçu Paulista
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Paraibano
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Paraibuna
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Paraipaba
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Paraisópolis
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/warehouse/tabela_covid/municipio=Parambu
drwxr-xr-x   - root supergroup          0 2022-08-07 03:13 /user/hive/wareho

drwxr-xr-x   - root supergroup          0 2022-08-07 03:14 /user/hive/warehouse/tabela_covid/municipio=Santa Bárbara do Sul
drwxr-xr-x   - root supergroup          0 2022-08-07 03:14 /user/hive/warehouse/tabela_covid/municipio=Santa Bárbara do Tugúrio
drwxr-xr-x   - root supergroup          0 2022-08-07 03:14 /user/hive/warehouse/tabela_covid/municipio=Santa Carmem
drwxr-xr-x   - root supergroup          0 2022-08-07 03:14 /user/hive/warehouse/tabela_covid/municipio=Santa Cecília
drwxr-xr-x   - root supergroup          0 2022-08-07 03:14 /user/hive/warehouse/tabela_covid/municipio=Santa Cecília do Pavão
drwxr-xr-x   - root supergroup          0 2022-08-07 03:14 /user/hive/warehouse/tabela_covid/municipio=Santa Cecília do Sul
drwxr-xr-x   - root supergroup          0 2022-08-07 03:14 /user/hive/warehouse/tabela_covid/municipio=Santa Clara d%27Oeste
drwxr-xr-x   - root supergroup          0 2022-08-07 03:14 /user/hive/warehouse/tabela_covid/municipio=Santa Clara do Sul
drwxr-xr-x

drwxr-xr-x   - root supergroup          0 2022-08-07 03:14 /user/hive/warehouse/tabela_covid/municipio=São José do Sabugi
drwxr-xr-x   - root supergroup          0 2022-08-07 03:14 /user/hive/warehouse/tabela_covid/municipio=São José do Seridó
drwxr-xr-x   - root supergroup          0 2022-08-07 03:14 /user/hive/warehouse/tabela_covid/municipio=São José do Sul
drwxr-xr-x   - root supergroup          0 2022-08-07 03:14 /user/hive/warehouse/tabela_covid/municipio=São José do Vale do Rio Preto
drwxr-xr-x   - root supergroup          0 2022-08-07 03:14 /user/hive/warehouse/tabela_covid/municipio=São José do Xingu
drwxr-xr-x   - root supergroup          0 2022-08-07 03:14 /user/hive/warehouse/tabela_covid/municipio=São José dos Ausentes
drwxr-xr-x   - root supergroup          0 2022-08-07 03:14 /user/hive/warehouse/tabela_covid/municipio=São José dos Basílios
drwxr-xr-x   - root supergroup          0 2022-08-07 03:14 /user/hive/warehouse/tabela_covid/municipio=São José dos Campos
dr

Abaixo, o campo 'data' contém informação sobre horário que não é relevante. Vamos criar um campo apenas com data (yyyy-MM-dd) usando a função substring

In [49]:
df_tabela_covid.select('data').show(10)

+-------------------+
|               data|
+-------------------+
|2021-07-01 00:00:00|
|2021-07-02 00:00:00|
|2021-07-03 00:00:00|
|2021-07-04 00:00:00|
|2021-07-05 00:00:00|
|2021-07-06 00:00:00|
|2021-07-07 00:00:00|
|2021-07-08 00:00:00|
|2021-07-09 00:00:00|
|2021-07-10 00:00:00|
+-------------------+
only showing top 10 rows



In [101]:
df_tabela = df_tabela_covid.withColumn("data_", substring("data", 1,10))

In [102]:
df_tabela.select('data_').show(5)

+----------+
|     data_|
+----------+
|2021-07-01|
|2021-07-02|
|2021-07-03|
|2021-07-04|
|2021-07-05|
+----------+
only showing top 5 rows



In [111]:
df_tabela = df_tabela.drop('data')

Abaixo, visualizamos as últimas linhas de registros com a função 'monolitically_increasing_id()'

In [112]:
last_row = df_tabela.withColumn("index", monotonically_increasing_id())
last_row.orderBy(desc("index")).drop("index").show(1)

+------------+------+---------+-----+------+--------------+----------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+----------+
|      regiao|estado|municipio|coduf|codmun|codRegiaoSaude| nomeRegiaoSaude|semanaEpi|populacaoTCU2019|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|interior/metropolitana|     data_|
+------------+------+---------+-----+------+--------------+----------------+---------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+----------------------+----------+
|Centro-Oeste|    DF| Brasília|   53|530010|         53001|DISTRITO FEDERAL|       31|         3015268|        832270|       494|          11821|          0|            null|                 null|                     1|2022-08-03|
+------------+------+---------+-----+------+--------------+----------------+

Criando as 3 visualizações pelo spark.sql com os dados enviados para o hdfs

A última atualização da pesquisa na tabela foi em 03 de agosto de 2021.
Vale ressaltar que o site é dinâmico e pude observar que são feitas atualizações quase diárias nos dados.

In [153]:
ultima_data = "2021-08-03"
painel = df_tabela.where((df_tabela.regiao == 'Brasil') & (df_tabela.data_ == ultima_data))

Visualização 1: Casos Recuperados e Casos em acompanhamento

In [141]:
recuperados_painel = painel.select(painel['Recuperadosnovos'].alias('Casos_Recuperados'),\
                           painel['emAcompanhamentoNovos'].alias('Em_Acompanhamento'))
recuperados_painel.show()

+-----------------+-----------------+
|Casos_Recuperados|Em_Acompanhamento|
+-----------------+-----------------+
|         18746865|           680520|
+-----------------+-----------------+



Visualizacão 2: Casos Confirmados: Acumulado, Casos Novos, Incidência

De acordo com Wikipedia, o cálculo de incidência epidemiológica é calculado pelos número de novos casos (aqui representado pelo campo 'casosAcumulado') em dado local (Brasil) e período, dividido pela populacão de risco do mesmo local e período (aqui o campo 'PopulacaoTCU2019' = 210.147.125). Observando o site do governo, podemos observar que o campo incidência está descrito como "Incidência/100mil hab." Ou seja, o resultado desta divisão deve ser multiplicado por 100.000 habitantes.



In [144]:
confirmados_painel = painel.select(painel['casosAcumulado'].alias('Acumulados'),\
                     painel['casosNovos'].alias('Casos_Novos'),\
                     (f.round(painel['casosAcumulado']/painel['PopulacaoTCU2019']*100000,1)).alias('Incidência')) 
confirmados_painel.show()

+----------+-----------+----------+
|Acumulados|Casos_Novos|Incidência|
+----------+-----------+----------+
|  19985817|      32316|    9510.4|
+----------+-----------+----------+



Visualização 3: Óbitos Acumulados, Letalidade, Casos Novos e Mortalidade

De acordo com a Wikipedia, a taxa de letalidade (L) ou coeficiente de letalidade é a proporção entre o número de mortes por uma doença e o número total de doentes que sofrem dessa doença, ao longo de um determinado período de tempo. É geralmente expressa em percentagem.

Já a taxa de mortalidade ou coeficiente de mortalidade é um índice demográfico que reflete o número de mortes registradas, aqui representado pelos óbitos acumulados dividido pela população total. No caso desta tabela, o site do governo mostra que essa taxa é calculada por 100.000 habitantes.

In [160]:
obitos_painel = painel.select(painel['obitosAcumulado'].alias('Óbitos_Acumulados'),\
                              painel['obitosNovos'].alias('Casos_Novos'),\
                        (f.round(painel['obitosAcumulado']/painel['casosAcumulado']*100,1)).alias('Letalidade'),\
                        (f.round(painel['obitosAcumulado']/painel['PopulacaoTCU2019']*100000,1)).alias('Mortalidade'))
obitos_painel.show()


+-----------------+-----------+----------+-----------+
|Óbitos_Acumulados|Casos_Novos|Letalidade|Mortalidade|
+-----------------+-----------+----------+-----------+
|           558432|       1209|       2.8|      265.7|
+-----------------+-----------+----------+-----------+



4. Salvar a primeira visualização como tabela Hive

In [161]:
recuperados_painel.write.mode("overwrite").saveAsTable("recuperados_covid")

Vamos checar:

In [162]:
!hdfs dfs -ls /user/hive/warehouse/recuperados_covid

Found 2 items
-rw-r--r--   2 root supergroup          0 2022-08-08 00:59 /user/hive/warehouse/recuperados_covid/_SUCCESS
-rw-r--r--   2 root supergroup        679 2022-08-08 00:59 /user/hive/warehouse/recuperados_covid/part-00000-2ac6f9a4-1751-43ec-aaa9-964c77bb6001-c000.snappy.parquet


5. Salvar a segunda visualização com formato parquet e compressão snappy
   Por padrão, uma tabela Hive é salva já com formato parquet e compressão snappy

In [163]:
confirmados_painel.write.mode("overwrite").saveAsTable("confirmados_covid")

Vamos checar:

In [165]:
!hdfs dfs -ls /user/hive/warehouse/confirmados_covid

Found 2 items
-rw-r--r--   2 root supergroup          0 2022-08-08 01:05 /user/hive/warehouse/confirmados_covid/_SUCCESS
-rw-r--r--   2 root supergroup       1003 2022-08-08 01:05 /user/hive/warehouse/confirmados_covid/part-00000-1fa1c1fb-8269-470b-b67f-620a6736d51d-c000.snappy.parquet
