## Projeto final Semantix - Jupyter Notebook ##

Utizando o Docker, foi realizado download do arquivo do link informado abaixo:
https://mobileapps.saude.gov.br/esus-vepi/files/unAFkcaNDeXajurGB7LChj8SgQYS2ptm/04bd3419b22b9cc5c6efac2c6528100d_HIST_PAINEL_COVIDBR_06jul2021.rar

Os comandos usados foram:
wget https://mobileapps.saude.gov.br/esus-vepi/files/
unAFkcaNDeXajurGB7LChj8SgQYS2ptm/04bd3419b22b9cc5c6efac2c6528100d_HIST_PAINEL_COVIDBR_06jul2021.rar

Após o download do arquivo, foi realizado a descompressão através do comando:
unrar e 04bd3419b22b9cc5c6efac2c6528100d_HIST_PAINEL_COVIDBR_06jul2021.rar /home/wbendelak/treinamentos/spark/input/dados_covid/

  **1. Enviar os dados para o hdfs**

In [None]:
### Transferir dados o HDFS

!hdfs dfs -put HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv /user/william/projeto_final/dados_covid
!hdfs dfs -put HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv /user/william/projeto_final/dados_covid
!hdfs dfs -put HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv /user/william/projeto_final/dados_covid
!hdfs dfs -put HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv /user/william/projeto_final/dados_covid


In [None]:
# Validação dos dados no HDFS
!hdfs dfs -ls /user/william/projeto_final/dados_covid

In [21]:
#Análise do conteúdo e estrutura dos arquivos

!hdfs dfs -cat /user/william/projeto_final/dados_covid/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv | head

regiao;estado;municipio;coduf;codmun;codRegiaoSaude;nomeRegiaoSaude;data;semanaEpi;populacaoTCU2019;casosAcumulado;casosNovos;obitosAcumulado;obitosNovos;Recuperadosnovos;emAcompanhamentoNovos;interior/metropolitana
Brasil;;;76;;;;2020-02-25;9;210147125;0;0;0;0;;;
Brasil;;;76;;;;2020-02-26;9;210147125;1;1;0;0;;;
Brasil;;;76;;;;2020-02-27;9;210147125;1;0;0;0;;;
Brasil;;;76;;;;2020-02-28;9;210147125;1;0;0;0;;;
Brasil;;;76;;;;2020-02-29;9;210147125;2;1;0;0;;;
Brasil;;;76;;;;2020-03-01;10;210147125;2;0;0;0;;;
Brasil;;;76;;;;2020-03-02;10;210147125;2;0;0;0;;;
Brasil;;;76;;;;2020-03-03;10;210147125;2;0;0;0;;;
Brasil;;;76;;;;2020-03-04;10;210147125;3;1;0;0;;;
cat: Unable to write to output stream.


In [105]:
# Carregar os dados .csv para o DataFrame dados_covid_csv e posteriormente salvando os dados no HDFS no formato Parquet com compressão SNAPPY
dados_covid_csv = spark.read.csv("/user/william/projeto_final/dados_covid",sep=";",header="True",inferSchema="True")
dados_covid_csv.write.parquet("/user/william/projeto_final/tabelas/dadoscovid.parquet.snappy.partitioned.by.municipio",partitionBy="municipio",compression="SNAPPY",mode="overwrite")

In [109]:
#Analisar o schema dos campos do DataFrame para usar como referência na criação da tabela hive.
#Contar quantidade de registros para comparar posteriormente com a tabela hive.
dados_covid_csv.printSchema()
dados_covid_csv.count()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- casosAcumulado: decimal(10,0) (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interior/metropolitana: integer (nullable = true)



2624943

**2. Otimizar todos os dados do hdfs para uma tabela Hive particionada por município**

In [96]:
# Criação da Base de Dados a ser usada no Projeto caso o mesmo não exista

spark.sql("create database if not exists projeto_final")
spark.sql("show databases").show()


+-------------+
| databaseName|
+-------------+
|      default|
|projeto_final|
+-------------+



In [None]:
#Para este projeto, a tabela não será incrementada, por isso se faz necessário a exclusão da tabela caso a mesma já exista

In [124]:
spark.sql("DROP TABLE IF EXISTS dados_covid_parquet_snappy")

DataFrame[]

In [None]:
#Criação da tabela externa através do próprio spark.sql otimizando a mesma no formato parquet, compressão SNAPPY e particionando por município


In [127]:
spark.sql("""CREATE EXTERNAL TABLE dados_covid_parquet_snappy(regiao string,
                         estado string,
                         cod_uf int,
                         codmun int,
                         codRegiaoSaude int,
                         nomeRegiaoSaude string,
                         data timestamp,
                         semanaEpi int,
                         populacaoTCU2019 int,
                         casosAcumulado decimal(10,0),
                         casosNovos int,
                         obitosAcumulado int,
                         obitosNovos int,
                         Recuperadosnovos int,
                         emAcompanhamentoNovos int,
                         interior_metropolitana string)
                         stored as parquet
                         partitioned by (municipio string)
                         location 'hdfs://namenode:8020/user/william/projeto_final/tabelas/dadoscovid.parquet.snappy.partitioned.by.municipio'            
                         tblproperties("skip.header.line.count"="1",'parquet.compress'='SNAPPY')
                         """)

DataFrame[]

In [None]:
#Visando minimizar riscos de falhas devido manipulação das partições via HDFS por exemplo, é realizado a reparação da tabela

In [128]:
spark.sql("msck repair table dados_covid_parquet_snappy")

DataFrame[]

In [None]:
#Validação da tabela criada e comparando se a quantidade de registros está igual ao dos arquivos originais em CSV

In [153]:
spark.sql("use projeto_final")
spark.sql("show tables").show()
spark.sql("select count(*) from dados_covid_parquet_snappy").show()

+-------------+--------------------+-----------+
|     database|           tableName|isTemporary|
+-------------+--------------------+-----------+
|projeto_final|   casos_confirmados|      false|
|projeto_final|   casos_recuperados|      false|
|projeto_final|dados_covid_parqu...|      false|
|projeto_final|  obitos_confirmados|      false|
|projeto_final|visualizacao_caso...|      false|
+-------------+--------------------+-----------+



**3. Criar as 3 vizualizações(descritas no pdf do projeto) pelo Spark com os dados enviados para o HDFS**

In [130]:
#atribui a table dados covid a variavel tabela
tabela = spark.read.table("dados_covid_parquet_snappy")
tabela.printSchema()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- cod_uf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- casosAcumulado: decimal(10,0) (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interior_metropolitana: string (nullable = true)
 |-- municipio: string (nullable = true)



In [134]:
#Preparação das querys para obtenção das informações.
#Na apresentação dos resultados, está sendo usado o filtro por data igual a 06/07/2021.
#As fórmulas de Incidência, Letalidade e Mortalidade foram obtidas do próprio site https://covid.saude.gov.br/ na aba "sobre".

#Para o total de casos recuperados, optou-se por buscar o maior registro da coluna Recuperadosnovos.
total_casos_recuperados = spark.sql("select MAX (Recuperadosnovos) as Casos_Recuperados from dados_covid_parquet_snappy").show()

#Para o total de casos em acompanhamento, optou-se por buscar o último registro inserido na tabela, excluindo-se os valores nulos.
casos_em_acompanhamento = spark.sql("select LAST (emAcompanhamentoNovos) as Em_Acompanhamento from dados_covid_parquet_snappy WHERE emAcompanhamentoNovos IS NOT NULL").show()

#Para o total de casos acumulados, optou-se por buscar o maior registro da coluna casosAcumulado.
total_casos_acumulados = spark.sql("select MAX (casosAcumulado) as Acumulado from dados_covid_parquet_snappy ").show()

#Para o total de casos novos, optou-se por buscar o maior registro da coluna casosNovos.
casos_novos = spark.sql("select MAX (casosNovos) as Casos_novos from dados_covid_parquet_snappy where data = ('2021-07-06')").show()

#Incidêcia = (Número de casos confirmados de COVID-19 em residentes X 100.000) / População total residente no período determinado.
incidencia = spark.sql("SELECT ROUND(((MAX(casosAcumulado) / MAX(populacaoTCU2019))*100000),1) as incidencia from dados_covid_parquet_snappy where data = ('2021-07-06')").show()

#Para o total de óbitos acumulados, optou-se por buscar o maior registro da coluna obitosAcumulado.
total_obitos_acumulados = spark.sql("select MAX (obitosAcumulado) as Obito_acumulado from dados_covid_parquet_snappy ").show()

#Para o total de óbitos novos, optou-se por buscar o maior registro da coluna obitosNovos.
obitos_novos = spark.sql("select MAX (obitosNovos) as Obitos_novos from dados_covid_parquet_snappy where data = ('2021-07-06')").show()

#Letalidade = (Número de óbitos confirmados de COVID-19 em determinada área e período X 100) / Número de casos confirmados de COVID-19 em determinada área e período.
letalidade = spark.sql("SELECT ROUND(((MAX(obitosAcumulado) / MAX(casosAcumulado))*100),1) as letalidade from dados_covid_parquet_snappy").show()

#Mortalidade = (Número de óbitos confirmados de COVID-19 em residentes X 100.000) / População total residente no período determinado.
mortalidade = spark.sql("SELECT ROUND(((MAX(obitosAcumulado) / MAX(populacaoTCU2019))*100000),1) as mortalidade from dados_covid_parquet_snappy").show()



+-----------------+
|Casos_Recuperados|
+-----------------+
|         17262646|
+-----------------+

+-----------------+
|Em_Acompanhamento|
+-----------------+
|          1065477|
+-----------------+

+---------+
|Acumulado|
+---------+
| 18855015|
+---------+

+-----------+
|Casos_novos|
+-----------+
|      62504|
+-----------+

+----------+
|incidencia|
+----------+
|    8972.3|
+----------+

+---------------+
|Obito_acumulado|
+---------------+
|         526892|
+---------------+

+------------+
|Obitos_novos|
+------------+
|        1780|
+------------+

+----------+
|letalidade|
+----------+
|       2.8|
+----------+

+-----------+
|mortalidade|
+-----------+
|      250.7|
+-----------+



In [146]:
# Criação de 3 views de apresentação solicitada no projeto

#View 1: Casos recuperados + Casos em Acompanhamento
total_casos_recuperados = spark.sql("CREATE OR REPLACE VIEW casos_Recuperados AS select 'Casos_Recuperados', MAX (Recuperadosnovos) as Total_Casos_Recuperados from dados_covid_parquet_snappy UNION select 'Em_acompanhamento', MAX (emAcompanhamentoNovos) as Em_Acompanhamento from dados_covid_parquet_snappy WHERE data = ('2021-07-06')").show()

#View 2: Casos Confirmados Acumulado + Casos Confirmados Novos + Incidência de Casos Confirmados
casos_confirmados = spark.sql("CREATE OR REPLACE VIEW casos_confirmados AS select 'Casos_Recuperados', (MAX (casosAcumulado)) as Total_Casos_Confirmados from dados_covid_parquet_snappy UNION select 'Casos_novos', MAX (casosNovos) as Casos_novos from dados_covid_parquet_snappy where data = ('2021-07-06') UNION SELECT 'Incidencia', ROUND(((MAX(casosAcumulado) / MAX(populacaotcu2019))*100000),1) as incidencia from dados_covid_parquet_snappy where data = ('2021-07-06')").show()

#View 3: Óbitos Acumulado + Novos Óbitos + Letalidade + Mortalidade
Obitos_confirmados = spark.sql("CREATE OR REPLACE VIEW obitos_confirmados AS select 'Obitos_Confirmados', MAX (obitosAcumulado) as Total_Obitos_Confirmados from dados_covid_parquet_snappy UNION select 'obitos_novos', MAX (obitosNovos) as Obitos_novos from dados_covid_parquet_snappy where data = ('2021-07-06') UNION SELECT 'obitos_acumulado', ROUND(((MAX(obitosAcumulado) / MAX(casosAcumulado))*100),1) as letalidade from dados_covid_parquet_snappy UNION SELECT 'mortalidade', ROUND(((MAX(obitosAcumulado) / MAX(populacaoTCU2019))*100000),1) as mortalidade from dados_covid_parquet_snappy").show()




++
||
++
++

++
||
++
++

++
||
++
++



In [145]:
#Executar as 3 view para validar o resultado

spark.sql("SELECT * from casos_Recuperados").show()
spark.sql("SELECT * from casos_confirmados").show()
spark.sql("SELECT * from obitos_confirmados").show()

+-----------------+-----------------------+
|Casos_Recuperados|total_casos_recuperados|
+-----------------+-----------------------+
|Em_acompanhamento|                1065477|
|Casos_Recuperados|               17262646|
+-----------------+-----------------------+

+-----------+-----------------+
| Acumulados|casos_confirmados|
+-----------+-----------------+
|Casos_novos|          62504.0|
| Incidencia|           8972.3|
| Acumulados|       18855015.0|
+-----------+-----------------+

+----------------+------------------+
|    Total_obitos|Obitos_confirmados|
+----------------+------------------+
|    Total_obitos|          526892.0|
|    obitos_novos|            1780.0|
|obitos_acumulado|               2.8|
|     mortalidade|             250.7|
+----------------+------------------+



**4. Salvar a primeira visualização como tabela Hive.**

In [154]:
# Criar um DataFrame para receber os dados da primeira View casos_Recuperados, e salvar em tabela hive

visualizacao_casos_recuperados = spark.sql("SELECT * from casos_recuperados")
visualizacao_casos_recuperados.write.saveAsTable("visualizacao_casos_recuperados")


KeyboardInterrupt: 

In [149]:
# Validar tabela a tabela visualizacao_casos_recuperados

spark.sql("SELECT * from visualizacao_casos_recuperados").show()


+-----------------+-----------------------+
|Casos_Recuperados|Total_Casos_Recuperados|
+-----------------+-----------------------+
|Em_acompanhamento|                1065477|
|Casos_Recuperados|               17262646|
+-----------------+-----------------------+



**5. Salvar a segunda visualização com formato parquet e compressão snappy.**

In [155]:
# Criar um DataFrame para receber os dados da segunda View casos_confirmados, e salvar no formato parquet com compressão SNAPPY

visualizacao_casos_confirmados = spark.sql("SELECT * from casos_confirmados")
visualizacao_casos_confirmados.write.option("compression","snappy").parquet('hdfs://namenode:8020/user/william/projeto_final/tabelas/visualizacao_casos_confirmados')

In [156]:
# Validar a leitura dos dados

spark.read.parquet("hdfs://namenode:8020/user/william/projeto_final/tabelas/visualizacao_casos_confirmados").show()

+-----------------+-----------------------+
|Casos_Recuperados|Total_Casos_Confirmados|
+-----------------+-----------------------+
|Casos_Recuperados|             18855015.0|
|      Casos_novos|                62504.0|
|       Incidencia|                 8972.3|
+-----------------+-----------------------+



In [157]:
# Validar o diretório criado para a segunda view casos_confirmados

!hdfs dfs -ls /user/william/projeto_final/tabelas/visualizacao_casos_confirmados

Found 5 items
-rw-r--r--   2 root supergroup          0 2022-05-01 02:51 /user/william/projeto_final/tabelas/visualizacao_casos_confirmados/_SUCCESS
-rw-r--r--   2 root supergroup        427 2022-05-01 02:51 /user/william/projeto_final/tabelas/visualizacao_casos_confirmados/part-00000-bdb21380-7e83-4f13-8211-15837cd13883-c000.snappy.parquet
-rw-r--r--   2 root supergroup        851 2022-05-01 02:51 /user/william/projeto_final/tabelas/visualizacao_casos_confirmados/part-00132-bdb21380-7e83-4f13-8211-15837cd13883-c000.snappy.parquet
-rw-r--r--   2 root supergroup        905 2022-05-01 02:51 /user/william/projeto_final/tabelas/visualizacao_casos_confirmados/part-00146-bdb21380-7e83-4f13-8211-15837cd13883-c000.snappy.parquet
-rw-r--r--   2 root supergroup        842 2022-05-01 02:51 /user/william/projeto_final/tabelas/visualizacao_casos_confirmados/part-00151-bdb21380-7e83-4f13-8211-15837cd13883-c000.snappy.parquet


**6. Salvar a terceira visualização em um tópico no Kafka.**

In [166]:
#Enviar os dados para o tópico topic-projeto-final

#importar as functions e types  necessárias
from pyspark.sql.functions import *
from pyspark.sql.types import *

#Criação de dataframe para receber os dados da view obitos_confirmados
obitos_confirmados_view = spark.sql("SELECT * from obitos_confirmados")

#Criação de dataframe para receber os valores das colunas Obitos_Confirmados e Total_Obitos_Confirmados
obitos_confirmados_kafka = obitos_confirmados_view.withColumn("value", struct(col("Obitos_Confirmados"),col("Total_Obitos_Confirmados")))

#Transformação do dataframe para remover o cabeçalho das colunas Obitos_Confirmados e Total_Obitos_Confirmados
obitos_confirmados_kafka = obitos_confirmados_kafka.drop("Obitos_Confirmados","Total_Obitos_Confirmados")

#Transformação da coluna value do dataframe para String 
obitos_confirmados_kafka = obitos_confirmados_kafka.withColumn("value",col("value").cast("string"))


In [163]:
#Validação dos dados do dataframe
obitos_confirmados_kafka.show()

+--------------------+
|               value|
+--------------------+
|[obitos_novos, 17...|
|[Obitos_Confirmad...|
|[obitos_acumulado...|
|[mortalidade, 250.7]|
+--------------------+



In [167]:
#Envio dos dados para o tópico topic-projeto-final
obitos_confirmados_kafka.write.format("kafka").option("kafka.bootstrap.servers", "kafka:9092").option("topic","topic-projeto-final").save()

**7. Criar a visualização pelo Spark com os dados enviados para o HDFS.**

In [168]:
#Será utilizado os dados salvos em parquet com compressão SNAPPY no HDFS /user/william/projeto_final/tabelas/dadoscovid.parquet.snappy.partitioned.by.municipio
dados_covid_parquet = spark.read.parquet("hdfs://namenode:8020/user/william/projeto_final/tabelas/dadoscovid.parquet.snappy.partitioned.by.municipio")


In [226]:
#importar funções de col e lit para utilizar nos agrupamentos
from pyspark.sql.functions import col
from pyspark.sql.functions import lit

#Realizado os agrupamentos e filtros
sintese_covid = dados_covid_parquet.select('regiao','populacaoTCU2019','obitosAcumulado','casosAcumulado','estado','data').where(dados_covid_parquet.regiao != 'regiao')

sintese_covid = sintese_covid.groupBy('estado', 'regiao')
sintese_covid = sintese_covid.max('casosAcumulado','obitosAcumulado','populacaoTCU2019')

sintese_covid = sintese_covid.groupBy('regiao')
sintese_covid = sintese_covid.sum('max(casosAcumulado)','max(obitosAcumulado)','max(populacaoTCU2019)')
sintese_covid = sintese_covid.withColumn('cem_mil', lit(100000))

sintese_covid = sintese_covid.withColumn('Incidência/100mil hab', ((col('sum(max(casosAcumulado))') * col('cem_mil'))/col('sum(max(populacaoTCU2019))')))
sintese_covid = sintese_covid.withColumn('Mortalidade /100mil hab', ((col('sum(max(obitosAcumulado))')/col('sum(max(populacaoTCU2019))'))) * col('cem_mil'))


sintese_covid = sintese_covid.drop('cem_mil','sum(max(populacaoTCU2019))')
sintese_covid = sintese_covid.withColumnRenamed('sum(max(casosAcumulado))', 'Casos').withColumnRenamed('sum(max(obitosAcumulado))', 'Óbitos')




sintese_covid = sintese_covid.sort("regiao")

cabecalho_sintese_covid = "Síntese de casos, óbitos, incidência e mortalidade"
print(cabecalho_sintese_covid)
#usado o toPandas para usar na memória
sintese_covid.toPandas()

Síntese de casos, óbitos, incidência e mortalidade


Unnamed: 0,regiao,Casos,Óbitos,Incidência/100mil hab,Mortalidade /100mil hab
0,Brasil,18855015,526892,8972.2926259,250.725295
1,Centro-Oeste,1916619,49207,11760.5098928,301.937636
2,Nordeste,4455737,107824,7807.2680354,188.927414
3,Norte,1732862,43845,9401.8983255,237.887513
4,Sudeste,7138803,245311,8078.1795176,277.590836
5,Sul,3611041,80705,12046.4469156,269.232196


**8. Salvar a visualização do exercício 6 em um tópico no Elastic**

In [227]:
#Criação de dataframe para receber os dados da view obitos_confirmados
obitos_confirmados_elastic = spark.sql("SELECT * from obitos_confirmados")

#Salvar dados como csv para posterior exportação para o elastic
obitos_confirmados_elastic.write.format('csv').option("inferSchema", "true").option("header","true").save("hdfs://namenode:8020/user/william/projeto_final/elastic_view/obitos_confirmados.csv")


In [229]:
#Validação do csv criado
spark.read.csv('hdfs://namenode:8020/user/william/projeto_final/elastic_view/obitos_confirmados.csv', inferSchema = True, header = True).toPandas()

Unnamed: 0,Obitos_Confirmados,Total_Obitos_Confirmados
0,Obitos_Confirmados,526892.0
1,obitos_acumulado,2.8
2,obitos_novos,1780.0
3,mortalidade,250.7
