   ### Projeto de conclusão de curso "Projeto Final de Spark"
   
   ### Utilizando dados da "Campanha Nacional de Vacinação contra Covid-19"
   
    
    1.Enviar os dados para o hdfs
    
    2.Otimizar todos os dados do hdfs para uma tabela Hive particionada por município.
    
    3.Criar as 3 vizualizações pelo Spark com os dados enviados para o HDFS:
    
    4.Salvar a primeira visualização como tabela Hive
    
    5.Salvar a segunda visualização com formato parquet e compressão snappy
    
    6.Salvar a terceira visualização em um tópico no Kafka
    
    7.Criar a visualização pelo Spark com os dados enviados para o HDFS:
    
    8.Salvar a visualização do exercício 6 em um tópico no Elastic
    
    9.Criar um dashboard no Elastic para visualização dos novos dados enviados

In [None]:
# Pre-Requisitos
#a) Fazer donwload do arquivo dentro do cluster - curl -O https://mobileapps.saude.gov.br/esus-vepi/files/unAFkcaNDeXajurGB7LChj8SgQYS2ptm/04bd3419b22b9cc5c6efac2c6528100d_HIST_PAINEL_COVIDBR_06jul2021.rar
#b) extrair arquivo - unrar x 04bd3419b22b9cc5c6efac2c6528100d_HIST_PAINEL_COVIDBR_06jul2021.rar

In [26]:
# type para inserir dados do separador, schema, header
# functions para utilizar a conversão de tiemstamp para date

from pyspark.sql.types import *
from pyspark.sql.functions import *

In [4]:
# 1.Enviar os dados para o HDFS

!hdfs dfs -mkdir /user/ronnan/data/data_covid
!hdfs dfs -put input/data_covid/*csv /user/ronnan/data/data_covid

mkdir: `/user/ronnan/data/data_covid': File exists
put: `input/data_covid/*csv': No such file or directory


In [5]:
# verificando se os dados já estão no HDFS

!hdfs dfs -ls /user/ronnan/data/data_covid

Found 4 items
-rw-r--r--   3 root supergroup   62492959 2022-04-19 02:29 /user/ronnan/data/data_covid/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup   76520681 2022-04-19 02:29 /user/ronnan/data/data_covid/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv
-rw-r--r--   3 root supergroup   91120916 2022-04-19 02:29 /user/ronnan/data/data_covid/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv
-rw-r--r--   3 root supergroup    3046774 2022-04-19 02:29 /user/ronnan/data/data_covid/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv


In [4]:
# criando data_frame data_covid para tratamento dos dados, realizando leitura em csv

data_covid = spark.read.csv("/user/ronnan/data/data_covid/*.csv"
                            ,sep = ";",inferSchema=True, header=True,ignoreLeadingWhiteSpace=True)

In [5]:
# verificando se o esquema foi atribuido

print(data_covid.printSchema())

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- casosAcumulado: decimal(10,0) (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interior/metropolitana: integer (nullable = true)

None


In [9]:
# criando variavel apenas com as colunas que serão utilizadas nas visualizações

dados_filtrados = data_covid.select("data","casosAcumulado","casosNovos","municipio","obitosNovos","Recuperadosnovos","obitosAcumulado","emAcompanhamentoNovos")

In [10]:
# criando variavel para ajustar o campo para novo formato de data

dados_data_formatada = dados_filtrados.withColumn("data",to_timestamp(col("data"))).withColumn("data",to_date(col("data")))


In [11]:
# Criando variavel com todos os dados nulls transformados em "0"

dados_filtrados_limpos = dados_data_formatada.na.fill({'municipio': '0', 'Recuperadosnovos':0, 'emAcompanhamentoNovos':0 })

In [22]:
# 2.Otimizar todos os dados do hdfs para uma tabela Hive particionada por município.

dados_filtrados_limpos.write.saveAsTable("dados_covid", partitionBy="municipio", mode="OverWrite")

In [48]:
# exibindo a tabela para verificar se dados estão transformados

spark.read.table("dados_covid").show(20)


+----------+--------------+----------+-----------+----------------+---------------+---------------------+---------+
|      data|casosAcumulado|casosNovos|obitosNovos|Recuperadosnovos|obitosAcumulado|emAcompanhamentoNovos|municipio|
+----------+--------------+----------+-----------+----------------+---------------+---------------------+---------+
|2021-01-01|       7700578|     24605|        462|         6756284|         195411|               748883|        0|
|2021-01-02|       7716405|     15827|        314|         6769420|         195725|               751260|        0|
|2021-01-03|       7733746|     17341|        293|         6813008|         196018|               724720|        0|
|2021-01-04|       7753752|     20006|        543|         6875230|         196561|               681961|        0|
|2021-01-05|       7810400|     56648|       1171|         6963407|         197732|               649261|        0|
|2021-01-06|       7873830|     63430|       1242|         7036530|     

In [24]:
# verificando se a tabela foi salva dentro do Hive e os diretorios foram criados por municipio

!hdfs dfs -ls /user/hive/warehouse/dados_covid



Found 5299 items
-rw-r--r--   2 root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/_SUCCESS
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=0
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Abadia de Goiás
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Abadia dos Dourados
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Abadiânia
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Abaetetuba
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Abaeté
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Abaiara
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/mu

drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Alvorada do Norte
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Alvorada do Sul
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Além Paraíba
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Amajari
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Amambai
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Amaporã
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Amapá
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Amapá do Maranhão
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/d

drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Barra do Quaraí
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Barra do Ribeiro
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Barra do Rio Azul
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Barra do Rocha
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Barra do Turvo
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Barra dos Coqueiros
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Barracão
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Barras
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 

drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Campos Novos
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Campos Novos Paulista
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Campos Sales
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Campos Verdes
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Campos de Júlio
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Campos do Jordão
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Campos dos Goytacazes
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Camutanga
drwxr-xr-x   - root supergroup          0 2022-04

drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Dom Pedro
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Dom Pedro de Alcântara
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Dom Silvério
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Dom Viçoso
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Domingos Martins
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Domingos Mourão
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Dona Emma
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Dona Euzébia
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/w

drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Jardim
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Jardim Alegre
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Jardim Olinda
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Jardim de Angicos
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Jardim de Piranhas
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Jardim do Mulato
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Jardim do Seridó
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Jardinópolis
drwxr-xr-x   - root supergroup          0 2022-04-21 01:1

drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Morro Reuter
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Morro da Fumaça
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Morro da Garça
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Morro do Chapéu
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Morro do Chapéu do Piauí
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Morro do Pilar
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Morros
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Mortugaba
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 

drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Pouso Novo
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Pouso Redondo
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Poxoréu
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Poá
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Poço Branco
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Poço Dantas
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Poço Fundo
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Poço Redondo
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid

drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Solidão
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Solonópole
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Solânea
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Sombrio
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Sonora
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Sooretama
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Sorocaba
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Sorriso
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Sossê

drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Ventania
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Venturosa
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Venâncio Aires
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Vera
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Vera Cruz
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Vera Cruz do Oeste
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Vera Mendes
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dados_covid/municipio=Veranópolis
drwxr-xr-x   - root supergroup          0 2022-04-21 01:12 /user/hive/warehouse/dado

In [12]:
# 3.Criar as 3 vizualizações pelo Spark com os dados enviados para o HDFS

#3.1ª visualizacao

dados_casos_recuperados = dados_filtrados_limpos.select(max('Recuperadosnovos').alias("Casos_Recuperados"), \
                                                max('emAcompanhamentoNovos').alias("Em_Acompanhamento"))

dados_casos_recuperados_visualização = dados_casos_recuperados.show()

+-----------------+-----------------+
|Casos_Recuperados|Em_Acompanhamento|
+-----------------+-----------------+
|         17262646|          1317658|
+-----------------+-----------------+



In [46]:
#3.2ª visualizacao

dados_casos_confirmados = dados_filtrados_limpos.select(max('casosAcumulado').alias("Casos_Confirmados"), \
                                               max('casosNovos').alias("Casos_Novos"))

dados_casos_confirmados_visualização = dados_casos_confirmados.show()


+-----------------+-----------+
|Casos_Confirmados|Casos_Novos|
+-----------------+-----------+
|         18855015|     115228|
+-----------------+-----------+



In [45]:

#taxa mortalidade = n de obitos x 1000 / n abitantes
# 526892 * 1000 / 210147125 = 250725

total_populacao = data_covid.select(max('populacaoTCU2019'))

total_populacao_visualização = total_populacao.show()

+---------------------+
|max(populacaoTCU2019)|
+---------------------+
|            210147125|
+---------------------+



In [13]:
# 3.3ª visualizacao

dados_obitos_confirmados = dados_filtrados_limpos.select(max('obitosAcumulado').alias("Obitos_Confirmados"), \
                                                max('obitosNovos').alias("Obitos_Novos"))
dados_obitos_confirmados_taxa = dados_obitos_confirmados.withColumn("Taxa_Mortalidade", lit(526892*1000/210147125))

dados_obitos_confirmados_taxa_visualização = dados_obitos_confirmados_taxa.show()


+------------------+------------+------------------+
|Obitos_Confirmados|Obitos_Novos|  Taxa_Mortalidade|
+------------------+------------+------------------+
|            526892|        4249|2.5072529543290205|
+------------------+------------+------------------+



In [49]:
# 4.Salvar a primeira visualização como tabela Hive

dados_casos_recuperados.write.saveAsTable("casos_recuperados")


In [50]:
# visualizar a tabela casos_recuperados salva no Hive

spark.read.table('casos_recuperados').show()

+-----------------+-----------------+
|Casos_Recuperados|Em_Acompanhamento|
+-----------------+-----------------+
|         17262646|          1317658|
+-----------------+-----------------+



In [51]:
# 5.Salvar a segunda visualização com formato parquet e compressão snappy

dados_casos_confirmados.write.saveAsTable("casos_confirmados_snappy", format="parquet", compression="snappy")


In [1]:
# visualizar a tabela salva em parquet e compressa em snappy

spark.sql('''describe formatted casos_confirmados_snappy''').show()


+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|   Casos_Confirmados|       decimal(10,0)|   null|
|         Casos_Novos|                 int|   null|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|            Database|             default|       |
|               Table|casos_confirmados...|       |
|               Owner|                root|       |
|        Created Time|Thu Apr 21 01:31:...|       |
|         Last Access|Thu Jan 01 00:00:...|       |
|          Created By|         Spark 2.4.1|       |
|                Type|             MANAGED|       |
|            Provider|             parquet|       |
|    Table Properties|[transient_lastDd...|       |
|          Statistics|           713 bytes|       |
|            Location|hdfs://namenode:8...|       |
|       Serde Library|org.apache.hadoop...|       |
|         In

In [14]:
# 6.Salvar a terceira visualização em um tópico no Kafka

#importando bibliotecas streaming e StreamingContext
from pyspark.sql.types import StructType


In [34]:
dados_visualizacao3 = dados_obitos_confirmados_taxa


In [49]:
dados_visualizacao3.printSchema()

root
 |-- Obitos_Confirmados: integer (nullable = true)
 |-- Obitos_Novos: integer (nullable = true)
 |-- Taxa_Mortalidade: double (nullable = false)



In [64]:
dados_visualizacao3_stream = dados_obitos_confirmados_taxa.select(col("Obitos_Confirmados")
                                                        .cast("string"),col("Obitos_Novos")
                                                        .cast("string"),col("Taxa_Mortalidade").cast("string"))

In [55]:
dados_visualizacao3_stream.printSchema()

root
 |-- Obitos_Confirmados: string (nullable = true)
 |-- Obitos_Novos: string (nullable = true)
 |-- Taxa_Mortalidade: string (nullable = false)



In [67]:
#dados_visualizacao3_saida = dados_visualizacao3_stream.writeStream\
#    .format("kafka")\
#    .option("kafka.bootstrap.servers", "kafka:9092")\
#    .option("topic","topic-dados-visualizacao")\
#    .option("checkpointLocation", "/user/ronnan/dados_visualizacao11")\
#    .start() """"

In [66]:
#dados_visualizacao3_output.status

In [None]:
# 7.Criar a visualização pelo Spark com os dados enviados para o HDFS:
# 8.Salvar a visualização do exercício 6 em um tópico no Elastic
# 9.Criar um dashboard no Elastic para visualização dos novos dados enviados