## Questão 1

In [None]:
# Criando o diretório no HDFS:

!hdfs dfs -mkdir -p /user/projeto_basico/dados 

In [None]:
# Transferindo os arquivos da pasta local para o HDFS:

!hdfs dfs -put /input/*.csv /user/projeto_basico/dados


## Questão 2

In [2]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import *


spark = SparkSession \
    .builder \
    .appName("Projeto Basico") \
    .getOrCreate()

In [3]:
# Lendo os dados em .csv como DataFrame:

df = spark.read.csv("hdfs:///user/projeto_basico/dados/*.csv", sep=";", header=True, inferSchema=True)

In [None]:
# Criando a database e verificando se existe

spark.sql("CREATE DATABASE IF NOT EXISTS covidbr")
spark.sql("show databases").show()

In [None]:
# Salvando o df no Hive e confirmando que a tabela foi salva:

df_output = df.write.mode("overwrite").partitionBy("municipio").saveAsTable("covidbr.initial_table")
!hdfs dfs -ls -h /user/hive/warehouse/covidbr.db/initial_table

In [4]:
# Verificando novamente a tabela criada

spark.sql("use covidbr")
spark.sql("show tables").show()

+--------+-------------+-----------+
|database|    tableName|isTemporary|
+--------+-------------+-----------+
| covidbr|initial_table|      false|
| covidbr|visualizacao1|      false|
| covidbr|visualizacao3|      false|
+--------+-------------+-----------+



In [5]:
df.toPandas()

Unnamed: 0,regiao,estado,municipio,coduf,codmun,codRegiaoSaude,nomeRegiaoSaude,data,semanaEpi,populacaoTCU2019,casosAcumulado,casosNovos,obitosAcumulado,obitosNovos,Recuperadosnovos,emAcompanhamentoNovos,interior/metropolitana
0,Brasil,,,76,,,,2021-07-01,26,210147125.0,18622304,65163,520095,2029,16931272.0,1170937.0,
1,Brasil,,,76,,,,2021-07-02,26,210147125.0,18687469,65165,521952,1857,16989351.0,1176166.0,
2,Brasil,,,76,,,,2021-07-03,26,210147125.0,18742025,54556,523587,1635,17033808.0,1184630.0,
3,Brasil,,,76,,,,2021-07-04,27,210147125.0,18769808,27783,524417,830,17082876.0,1162515.0,
4,Brasil,,,76,,,,2021-07-05,27,210147125.0,18792511,22703,525112,695,17151673.0,1115726.0,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
33709,Centro-Oeste,DF,Brasília,53,530010.0,53001.0,DISTRITO FEDERAL,2021-07-02,26,3015268.0,431771,620,9276,12,,,1.0
33710,Centro-Oeste,DF,Brasília,53,530010.0,53001.0,DISTRITO FEDERAL,2021-07-03,26,3015268.0,432492,721,9289,13,,,1.0
33711,Centro-Oeste,DF,Brasília,53,530010.0,53001.0,DISTRITO FEDERAL,2021-07-04,27,3015268.0,433505,1013,9304,15,,,1.0
33712,Centro-Oeste,DF,Brasília,53,530010.0,53001.0,DISTRITO FEDERAL,2021-07-05,27,3015268.0,434114,609,9312,8,,,1.0


## Questão 3

In [14]:
# Criando um dataframe para trabalhar as visualizações:

df_visualizacao = df.select("regiao", "data","municipio", "populacaoTCU2019", "casosAcumulado", "casosNovos",\
                            "obitosAcumulado", "obitosNovos", "Recuperadosnovos", "emAcompanhamentoNovos")


In [8]:
df_visualizacao.show()

+------+-------------------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+
|regiao|               data|populacaoTCU2019|casosAcumulado|casosNovos|obitosAcumulado|obitosNovos|Recuperadosnovos|emAcompanhamentoNovos|
+------+-------------------+----------------+--------------+----------+---------------+-----------+----------------+---------------------+
|Brasil|2020-02-25 00:00:00|       210147125|             0|         0|              0|          0|            null|                 null|
|Brasil|2020-02-26 00:00:00|       210147125|             1|         1|              0|          0|            null|                 null|
|Brasil|2020-02-27 00:00:00|       210147125|             1|         0|              0|          0|            null|                 null|
|Brasil|2020-02-28 00:00:00|       210147125|             1|         0|              0|          0|            null|                 null|
|Brasil|2020-02-29 00:00:00

In [9]:
# Criando uma variável para retornar a data máxima da tabela:

data_max_conv_unix =  df_visualizacao.select(max("data")).withColumn("unix_time", unix_timestamp(col("max(data)"),"yyyy-MM-dd HH:mm:ss"))

data_max_conv_data = data_max_conv_unix.withColumn("data", from_unixtime("unix_time", "yyyy-MM-dd"))

data_max = data_max_conv_data.select("data").collect()[0][0]

print(data_max)

2021-07-06


In [10]:
#Visualização 1 - Casos Recuperados e em Acompanhamento:

df_vis_1 = df_visualizacao.select(
                       format_number(col("Recuperadosnovos"),0).alias("Casos_Recuperados"),
                       format_number(col("emAcompanhamentoNovos"),0).alias("Em_Acompanhamento")
                       ).filter((col("data") == data_max) & (col("regiao") == "Brasil"))

df_vis_1.show()

+-----------------+-----------------+
|Casos_Recuperados|Em_Acompanhamento|
+-----------------+-----------------+
|       17,262,646|        1,065,477|
+-----------------+-----------------+



In [11]:
# Visualização 2 - CASOS CONFIRMADOS (Acumulados, Novos e Incidência/100.000 Habitantes):

df_vis_2 = df_visualizacao.select(
                        format_number(col("casosAcumulado"), 0).alias("Casos_Acumulados"),
                        format_number(col("casosNovos"),0).alias("Casos_Novos"),
                        format_number((col("casosAcumulado")/col("populacaoTCU2019")*100000),1).alias("Incidência")
                        ).filter((col("data") == data_max) & (col("regiao") == "Brasil"))
df_vis_2.show()

+----------------+-----------+----------+
|Casos_Acumulados|Casos_Novos|Incidência|
+----------------+-----------+----------+
|      18,855,015|     62,504|   8,972.3|
+----------------+-----------+----------+



In [20]:
# Visualização 3 - ÓBITOS CONFIRMADOS (Acumulados, Novos, Letalidade e Mortalidade):

df_vis_3 = df_visualizacao.select(
                        format_number(col("obitosAcumulado"), 0).alias("Óbitos_Acumulados"),
                        format_number(col("obitosNovos"),0).alias("Óbitos_Novos"),
                        format_number((col("obitosAcumulado")/col("casosAcumulado")*100),1).alias("Incidência_[%]"),
                        format_number((col("obitosAcumulado")/col("populacaoTCU2019")*100000),1).alias("Mortalidade")
                        ).filter((col("data") == data_max) & (col("regiao") == "Brasil"))
df_vis_3.show()

+-----------------+------------+--------------+-----------+
|Óbitos_Acumulados|Óbitos_Novos|Incidência_[%]|Mortalidade|
+-----------------+------------+--------------+-----------+
|          526,892|       1,780|           2.8|      250.7|
+-----------------+------------+--------------+-----------+



## Questão 4

In [235]:
# Salvando a primeira vizualização como tabela Hive:

output_tabela_hive = df_vis_1.write.mode("overwrite").saveAsTable("covidbr.visualizacao1")

In [51]:
# Verificando a tabela no Hive:

spark.sql("show tables").show()

spark.sql("select * from covidbr.visualizacao1").show()

+--------+-------------+-----------+
|database|    tableName|isTemporary|
+--------+-------------+-----------+
| covidbr|initial_table|      false|
| covidbr|visualizacao1|      false|
| covidbr|visualizacao3|      false|
+--------+-------------+-----------+

+-----------------+-----------------+
|Casos_Recuperados|Em_Acompanhamento|
+-----------------+-----------------+
|       17,262,646|        1,065,477|
+-----------------+-----------------+



## Questão 5

In [255]:
# Salvando a segunda vizualização em formato parquet e compressão snappy:

output_tabela_casos_confirmados = df_vis_2.write.format("parquet")\
                                .save("/user/projeto_basico/casos_confirmados",compression="snappy")

In [258]:
!hdfs dfs -ls /user/projeto_basico/casos_confirmados

Found 2 items
-rw-r--r--   2 root supergroup          0 2022-04-25 00:30 /user/projeto_basico/casos_confirmados/_SUCCESS
-rw-r--r--   2 root supergroup       1003 2022-04-25 00:30 /user/projeto_basico/casos_confirmados/part-00000-daa4870d-db50-4bea-b78f-a7f9f4ddb1aa-c000.snappy.parquet


## Questão 6

In [15]:
# Salvando a terceira vizualização em um tópico no kafka em formato json:


output_tabela_obitos= df_vis_3.select(to_json(struct("*")).alias("value")).write\
                    .format("kafka")\
                    .option("kafka.bootstrap.servers", "kafka:9092")\
                    .option("topic","obitos-confirmados")\
                    .save()

In [16]:
# Verificando a gravação no tópico kafka:

input_tabela_obitos = spark.read \
                    .format("kafka") \
                    .option("kafka.bootstrap.servers", "kafka:9092") \
                    .option("subscribe", "obitos-confirmados") \
                    .load()

print(input_tabela_obitos.selectExpr("CAST(last(value) AS string)").collect()[0][0])

{"Óbitos_Acumulados":"526,892","Óbitos_Novos":"1,780","Incidência_[%]":"2.8","Mortalidade":"250.7"}


## Questão 7

In [163]:
# Criando a visualização dos dados enviados ao HDFS:

df_vis_4 =  df_visualizacao.select(
                        col('regiao').alias("Regiões"),\
                        col("populacaoTCU2019"),
                        col('municipio').cast("string").alias("cidade"),\
                        col("obitosAcumulado").alias("Óbitos_Acumulados"),\
                        col("casosAcumulado").alias("Casos_Acumulados"),\
                        col("data").alias("Atualização")\
                                    ).dropna(subset=["populacaoTCU2019"]).\
                                    sort(desc("obitosAcumulado")).\
                                    filter((col("data") == data_max) & (col("cidade").isNull()))
df_vis_4.show(5)

+-------+----------------+------+-----------------+----------------+-------------------+
|Regiões|populacaoTCU2019|cidade|Óbitos_Acumulados|Casos_Acumulados|        Atualização|
+-------+----------------+------+-----------------+----------------+-------------------+
| Brasil|       210147125|  null|           526892|        18855015|2021-07-06 00:00:00|
|Sudeste|        45919049|  null|           130389|         3809222|2021-07-06 00:00:00|
|Sudeste|        17264943|  null|            56192|          970268|2021-07-06 00:00:00|
|Sudeste|        21168791|  null|            47148|         1836198|2021-07-06 00:00:00|
|    Sul|        11377239|  null|            31867|         1235914|2021-07-06 00:00:00|
+-------+----------------+------+-----------------+----------------+-------------------+
only showing top 5 rows



In [165]:
# Após filtrar valores repetidos da tabela do HDFS, montado a tabela da visualização 4:

df_final_vis_4 = df_vis_4.groupBy("Regiões").agg(
                        format_number(sum("Casos_Acumulados"),0).alias("Casos_Acumulados"),    
                        format_number(sum("Óbitos_Acumulados"), 0).alias("Óbitos_Acumulados"),
                        format_number((sum("Óbitos_Acumulados")/sum("Casos_Acumulados")*100),1).alias("Incidência_[%]"),
                        format_number((sum("Óbitos_Acumulados")/sum("populacaoTCU2019")*100000),1).alias("Mortalidade"),
                        date_format(max("Atualização"), "dd/MM/yyyy hh:mm").alias("Atualização")                           
                                           ).sort("Regiões")

df_final_vis_4.show()

+------------+----------------+-----------------+--------------+-----------+----------------+
|     Regiões|Casos_Acumulados|Óbitos_Acumulados|Incidência_[%]|Mortalidade|     Atualização|
+------------+----------------+-----------------+--------------+-----------+----------------+
|      Brasil|      18,855,015|          526,892|           2.8|      250.7|06/07/2021 12:00|
|Centro-Oeste|       1,916,619|           49,207|           2.6|      301.9|06/07/2021 12:00|
|    Nordeste|       4,455,737|          107,824|           2.4|      188.9|06/07/2021 12:00|
|       Norte|       1,732,815|           43,845|           2.5|      237.9|06/07/2021 12:00|
|     Sudeste|       7,138,803|          245,311|           3.4|      277.6|06/07/2021 12:00|
|         Sul|       3,611,041|           80,705|           2.2|      269.2|06/07/2021 12:00|
+------------+----------------+-----------------+--------------+-----------+----------------+

