# Projeto de conclusão do treinamento de Big Data Engineer da Semantix
Autor: Juvenal Fonseca

### 1. Enviar os dados para o hdfs

#### Copia os dados do covid da estação de trabalho para o container do jupyter-spark


docker cp /home/nal/treinamentos/spark/input/covid/ jupyter-spark:/home

#### Copia os dados do covid para o hdfs

In [1]:
!hdfs dfs -put /home/covid/ /user/juvenal

#### Lista a pasta de covid do destino para checar se arquivos foram copiados

In [2]:
!hdfs dfs -ls /user/juvenal/covid

Found 4 items
-rw-r--r--   2 root supergroup   62492959 2022-04-19 11:31 /user/juvenal/covid/HIST_PAINEL_COVIDBR_2020_Parte1_06jul2021.csv
-rw-r--r--   2 root supergroup   76520681 2022-04-19 11:31 /user/juvenal/covid/HIST_PAINEL_COVIDBR_2020_Parte2_06jul2021.csv
-rw-r--r--   2 root supergroup   91120916 2022-04-19 11:31 /user/juvenal/covid/HIST_PAINEL_COVIDBR_2021_Parte1_06jul2021.csv
-rw-r--r--   2 root supergroup    3046774 2022-04-19 11:31 /user/juvenal/covid/HIST_PAINEL_COVIDBR_2021_Parte2_06jul2021.csv


### 2. Otimizar todos os dados do hdfs para uma tabela Hive particionada por município

#### Cria o banco de dados saude

In [1]:
spark.sql("CREATE DATABASE IF NOT EXISTS saude")

DataFrame[]

#### Ler dados do hdfs

Realiza importações necessárias

In [146]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import Window

Cria estrutura de schema para ler arquivo

In [147]:
struct_field = [
    StructField("regiao"               , StringType() ), 
    StructField("estado"               , StringType() ),
    StructField("municipio"            , StringType() ),
    StructField("coduf"                , IntegerType()), 
    StructField("codmun"               , IntegerType()),
    StructField("codRegiaoSaude"       , IntegerType()),
    StructField("nomeRegiaoSaude"      , StringType() ), 
    StructField("data"                 , DateType()   ),
    StructField("semanaEpi"            , IntegerType()),
    StructField("populacaoTCU2019"     , DecimalType()), 
    StructField("casosAcumulado"       , IntegerType()),
    StructField("casosNovos"           , IntegerType()),
    StructField("obitosAcumulado"      , StringType() ), 
    StructField("obitosNovos"          , IntegerType()),
    StructField("recuperadosNovos"     , IntegerType()),
    StructField("emAcompanhamentoNovos", IntegerType()), 
    StructField("interiorMetropolitana", IntegerType())
]

struct_type = StructType(struct_field)

Realiza leitura dos arquivos csv no diretório do hdfs e atribui ao DataFrame dados

In [148]:
dados = spark \
.read \
.csv(path     = "/user/juvenal/covid", 
     sep      = ";", 
     header   = "true", 
     mode     = "overwrite", 
     encoding = "UTF-8", 
     schema   = struct_type)

#### Salvar dados no hiver particionado por municipio

In [28]:
dados.write.partitionBy("municipio").mode("overwrite").saveAsTable("saude.covid")

### 3. Criar as 3 vizualizações pelo Spark com os dados enviados para o HDFS:

Exibe o schema do DataFrame dados

In [149]:
dados.printSchema()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: date (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: decimal(10,0) (nullable = true)
 |-- casosAcumulado: integer (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: string (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- recuperadosNovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interiorMetropolitana: integer (nullable = true)



#### Tratando os dados

Seleciona colunas do DataFrame dados utilizado nas operações;

Filtra os dados por região Brasil, já que as informações solicitadas são encontradas nessas regiões de forma acumulada;

Identifica a última data do boletim informativo para recuperar os dados acumulados;

Filtra os registros que correspondem a última data do boletim informativo;

Remove as colunas não mais necessárias do DataFrame;

Calcula o percentual de letalidade e os índices de mortalidade e incidência;

Corrige o separador decimal para o padrão brasileiro;

Renomeia colunas para melhor exibição;

In [150]:
w = Window.partitionBy('regiao').orderBy("regiao")

resultado = dados \
.select( col("regiao"), col("data"), col("recuperadosNovos"), col("emAcompanhamentoNovos"), col("casosAcumulado"), col("casosNovos"), col("obitosAcumulado"), col("obitosNovos"), col("populacaoTCU2019") ) \
.where( (col("regiao") == "Brasil")          ) \
.withColumn( 'max_data', max('data').over(w) ) \
.where( col("data") == col("max_data")       ) \
.withColumn( 'id', row_number().over(w)      ) \
.drop("regiao", "data", "max_data")            \
.withColumn("Letalidade" , format_number(((col("obitosAcumulado")/col("casosAcumulado"  ))*100   ),1).alias("Letalidade" ) ) \
.withColumn("Mortalidade", format_number(((col("obitosAcumulado")/col("populacaoTCU2019"))*100000),1).alias("Mortalidade") ) \
.withColumn("Incidencia" , format_number(((col("casosAcumulado" )/col("populacaoTCU2019"))*100000),1).alias("Incidência" ) ) \
.withColumn('Letalidade' , translate('Letalidade' , '\.,', '\,')) \
.withColumn('Mortalidade', translate('Mortalidade', '\.,', '\,')) \
.withColumn('Incidencia' , translate('Incidencia' , '\.,', '\,')) \
.withColumnRenamed("recuperadosNovos"     , "Casos recuperados" ) \
.withColumnRenamed("emAcompanhamentoNovos", "Em acompanhamento" ) \
.withColumnRenamed("casosAcumulado"       , "Acumulado"         ) \
.withColumnRenamed("casosNovos"           , "Casos novos"       ) \
.withColumnRenamed("obitosAcumulado"      , "Óbitos Acumulados" ) \
.withColumnRenamed("obitosNovos"          , "Óbitos Casos novos") \
.withColumnRenamed("Incidencia"           , "Incidência"        )


#### Visualização 1 do Spark

Total de pessoas recuperadas e em acompanhamento

In [151]:
view_1 = resultado \
.select(col("Casos recuperados"), col("Em acompanhamento"))

view_1.show()


+-----------------+-----------------+
|Casos recuperados|Em acompanhamento|
+-----------------+-----------------+
|         17262646|          1065477|
+-----------------+-----------------+



#### Visualização 2 do Spark

Total de casos acumulados, novos casos (diário) e incidência

In [152]:
view_2 = resultado \
.select(col("Acumulado"), col("Casos novos"), col("Incidência"))

view_2.show()

+---------+-----------+----------+
|Acumulado|Casos novos|Incidência|
+---------+-----------+----------+
| 18855015|      62504|    8972,3|
+---------+-----------+----------+



#### Visualização 3 do Spark

Total de óbitos, novos casos de óbito, letalidade e mortalidade

In [153]:
view_3 = resultado \
.select(col("id"), col("Óbitos Acumulados"), col("Óbitos Casos novos"), col("Letalidade"), col("Mortalidade"))

view_3.drop("id").show()

+-----------------+------------------+----------+-----------+
|Óbitos Acumulados|Óbitos Casos novos|Letalidade|Mortalidade|
+-----------------+------------------+----------+-----------+
|           526892|              1780|       2,8|      250,7|
+-----------------+------------------+----------+-----------+



### 4. Salvar a primeira visualização como tabela Hive

In [8]:
dfvw1 = view_1 \
.withColumnRenamed("Casos recuperados","casosRecuperados") \
.withColumnRenamed("Em acompanhamento","emAcompanhamento")

In [113]:
dfvw1.write.mode("overwrite").saveAsTable("saude.view_1")

### 5. Salvar a segunda visualização com formato parquet e compressão snappy

In [9]:
dfvw2 = view_2 \
.withColumnRenamed("Acumulado"  ,"acumulado") \
.withColumnRenamed("Casos novos","casosNovos") \
.withColumnRenamed("Incidência" ,"incidencia")

In [118]:
dfvw2.write.saveAsTable(name="saude.view_2", format="parquet", compression="snappy")

### 6. Salvar a terceira visualização em um tópico no Kafka

#### Cria o tópico topicCovid a partir da sua estação de trabalho

docker exec -it kafka kafka-topics.sh kafka-topics --bootstrap-server kafka:9092 --topic topicCovid --create --partitions 1 --replication-factor 1

#### Cria o DataFrame df a partir do resultado do DataFrame view_3 renomeando os seus campos

In [154]:
dfvw3 = view_3 \
.withColumnRenamed("id"                ,"value"            ) \
.withColumnRenamed("Óbitos Acumulados" ,"obitosAcumulado"  ) \
.withColumnRenamed("Óbitos Casos novos","obitosCasosNovos" ) \
.withColumnRenamed("Letalidade"        ,"letalidade"       ) \
.withColumnRenamed("Mortalidade"       ,"mortalidade"      )

#### Escreve no tópico topicCovid do Kafka

In [80]:
dfvw3.selectExpr("CAST(value AS STRING)") \
.write \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("topic", "topicCovid") \
.save()

#### Ler do tópico topicCovid do Kafka

In [81]:
dfK = spark \
.read \
.format("kafka") \
.option("kafka.bootstrap.servers", "kafka:9092") \
.option("subscribe", "topicCovid") \
.option("startingOffsets", "earliest")\
.load()

#### Exibe o histórico do tópico

In [82]:
dfK.show()

+----+-----+----------+---------+------+--------------------+-------------+
| key|value|     topic|partition|offset|           timestamp|timestampType|
+----+-----+----------+---------+------+--------------------+-------------+
|null| [31]|topicCovid|        0|     0|2022-04-20 14:16:...|            0|
|null| [31]|topicCovid|        0|     1|2022-04-20 14:21:...|            0|
+----+-----+----------+---------+------+--------------------+-------------+



### 7. Criar a visualização pelo Spark com os dados enviados para o HDFS:

<img src="sintese_de_casos_covid.png" >

Seleciona colunas do DataFrame dados utilizado nas operações;

Filtra os dados que não tenham código de município, já que as informações solicitadas são encontradas nas regiões de forma acumulada;

Identifica a última data do boletim informativo para recuperar os dados acumulados;

Filtra os registros que corresponde a última data do boletim informativo;

Remove as colunas não mais necessárias do DataFrame;

Agrupa os registros por região e data, já que cada região tem mais de um estado e esses registros precisam ser somados;

Calcula os índices de mortalidade e incidência;

Corrige o separador decimal para o padrão brasileiro;

Renomeia colunas para melhor exibição;

In [155]:
w = Window.partitionBy('regiao').orderBy("regiao")

resultado_geral = dados \
.select( col("municipio"), col("regiao"), col("data"), col("casosAcumulado"), col("obitosAcumulado"), col("populacaoTCU2019") ) \
.withColumn( 'max_data', max('data').over(w) ) \
.where( (col("data") == col("max_data")) & (col("codmun").isNull()) ) \
.drop("max_data")            \
.groupBy("regiao", "data") \
.agg(sum("casosAcumulado").alias("casosAcumulado"), sum("obitosAcumulado").alias("obitosAcumulado"), sum("populacaoTCU2019").alias("populacaoTCU2019")) \
.withColumn("mortalidade", format_number(((col("obitosAcumulado")/col("populacaoTCU2019"))*100000),1).alias("Mortalidade") ) \
.withColumn("incidencia" , format_number(((col("casosAcumulado" )/col("populacaoTCU2019"))*100000),1).alias("Incidência" ) ) \
.withColumn('mortalidade'    , translate('mortalidade', '\.,', '\,')) \
.withColumn('incidencia'     , translate('incidencia' , '\.,', '\,')) \
.withColumn('obitosAcumulado', translate('incidencia' , '\.,', '\ ')) \
.drop("populacaoTCU2019")            

In [156]:
resultado_geral.select(col("regiao").alias("Região"), col("casosAcumulado").alias("Casos"), col("obitosAcumulado").alias("Óbitos"), col("incidencia").alias("Incidência/100mil hab."), col("mortalidade").alias("Mortalidade/100mil hab."), col("data").alias("Atualização")) \
.show()

+------------+--------+------+----------------------+-----------------------+-----------+
|      Região|   Casos|Óbitos|Incidência/100mil hab.|Mortalidade/100mil hab.|Atualização|
+------------+--------+------+----------------------+-----------------------+-----------+
|    Nordeste| 4455737| 78073|                7807,3|                  188,9| 2021-07-06|
|         Sul| 3611041|120464|               12046,4|                  269,2| 2021-07-06|
|     Sudeste| 7138803| 80782|                8078,2|                  277,6| 2021-07-06|
|Centro-Oeste| 1916619|117605|               11760,5|                  301,9| 2021-07-06|
|      Brasil|18855015| 89723|                8972,3|                  250,7| 2021-07-06|
|       Norte| 1732815| 94016|                9401,6|                  237,9| 2021-07-06|
+------------+--------+------+----------------------+-----------------------+-----------+



### 8. Salvar a visualização do exercício 6 em um tópico no Elastic

#### Importa a biblioteca requests

In [99]:
import requests

#### Define variáveis para utilização na requisição

In [95]:
server  = "http://elasticsearch:9200"
headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'}
index   = "covid"

#### Recupera id do DataFrame para geração da url

In [135]:
id = dfvw3.toPandas().value[0]

#### Gera url da requisição

In [137]:
url = server + "/" + index + "/" + "_doc" + "/" + str(id)

#### Define o documento da requisição

In [130]:
dfe = dfvw3.select("*").drop("value")

In [131]:
doc = dfe.toJSON().first()

#### Realiza a requisição para o elastic para incluir o documento no índice codiv

In [138]:
res = requests.post(url=url, data=doc, headers=headers)

#### Exibe resposta da requisição

In [139]:
res.json()

{'_index': 'covid',
 '_type': '_doc',
 '_id': '1',
 '_version': 2,
 'result': 'updated',
 '_shards': {'total': 2, 'successful': 1, 'failed': 0},
 '_seq_no': 2,
 '_primary_term': 1}

#### Consulta o documento enviado

In [140]:
res = requests.get(url)

In [141]:
res.json()

{'_index': 'covid',
 '_type': '_doc',
 '_id': '1',
 '_version': 2,
 '_seq_no': 2,
 '_primary_term': 1,
 'found': True,
 '_source': {'obitosAcumulado': '526892',
  'obitosCasosNovos': 1780,
  'letalidade': '2,8',
  'mortalidade': '250,7'}}

### 9. Criar um dashboard no Elastic para visualização dos novos dados enviados

#### Tabela criada com os dados do elastic

<img src="data_table_covid.png">