# Projeto Semantix - Dados Covid

In [1]:
# Listar arquivos no hive/warehouse`

!hdfs dfs -ls /user/hive/warehouse

Found 2 items
drwxrwxr-x   - root supergroup          0 2022-04-24 19:27 /user/hive/warehouse/dados_covid
drwxrwxr-x   - root supergroup          0 2022-04-24 13:45 /user/hive/warehouse/projeto_semantix.db


##  Leitura dos Dados

In [2]:
# Ler dados da tabela Hive

tabela_dados_covid = spark.read.table("dados_covid")
tabela_dados_covid.printSchema()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codregiaosaude: integer (nullable = true)
 |-- nomeregiaosaude: string (nullable = true)
 |-- data: date (nullable = true)
 |-- semanaepi: integer (nullable = true)
 |-- populacaotcu2019: integer (nullable = true)
 |-- casosacumulado: integer (nullable = true)
 |-- casosnovos: integer (nullable = true)
 |-- obitosacumulado: integer (nullable = true)
 |-- obitosnovos: integer (nullable = true)
 |-- recuperadosnovos: integer (nullable = true)
 |-- emacompanhamentonovos: integer (nullable = true)
 |-- interior_metropolitana: string (nullable = true)



In [3]:
# Queries

total_casos_recuperados = spark.sql("select MAX (recuperadosnovos) as Casos_Recuperados from dados_covid")
casos_em_acompanhamento = spark.sql("select LAST (emacompanhamentonovos) as Em_Acompanhamento from dados_covid WHERE emacompanhamentonovos IS NOT NULL")

### Fórmulas ###


###Calculo incidencia (casos confirmados * 1.000.000) / população.
###Calculo letalidade (mortes totais/casos totais)
###Calculo mortalidade (mortes totais/população)

total_casos_acumulados = spark.sql("select MAX (casosacumulado) as Acumulado from dados_covid ")
casos_novos = spark.sql("select MAX (casosnovos) as Casos_novos from dados_covid where data = ('2021-07-06')")
incidencia = spark.sql("SELECT ROUND(((MAX(casosacumulado) / MAX(populacaotcu2019))*100000),1) as incidencia from dados_covid where data = ('2021-07-06')")

total_obitos_acumulados = spark.sql("select MAX (obitosacumulado) as Obito_acumulado from dados_covid ")
obitos_novos = spark.sql("select MAX (obitosnovos) as Obitos_novos from dados_covid where data = ('2021-07-06')")
letalidade = spark.sql("SELECT ROUND(((MAX(obitosacumulado) / MAX(casosacumulado))*100),1) as letalidade from dados_covid")
mortalidade = spark.sql("SELECT ROUND(((MAX(obitosacumulado) / MAX(populacaotcu2019))*100000),1) as mortalidade from dados_covid")

In [4]:
total_casos_recuperados.show()
casos_em_acompanhamento.show()

total_casos_acumulados.show()
casos_novos.show()
incidencia.show()

total_obitos_acumulados.show()
obitos_novos.show()
letalidade.show()
mortalidade.show()

+-----------------+
|Casos_Recuperados|
+-----------------+
|         17262646|
+-----------------+

+-----------------+
|Em_Acompanhamento|
+-----------------+
|          1065477|
+-----------------+

+---------+
|Acumulado|
+---------+
| 18855015|
+---------+

+-----------+
|Casos_novos|
+-----------+
|      62504|
+-----------+

+----------+
|incidencia|
+----------+
|    8972.3|
+----------+

+---------------+
|Obito_acumulado|
+---------------+
|         526892|
+---------------+

+------------+
|Obitos_novos|
+------------+
|        1780|
+------------+

+----------+
|letalidade|
+----------+
|       2.8|
+----------+

+-----------+
|mortalidade|
+-----------+
|      250.7|
+-----------+



## Criação de Views

In [56]:
# View CASOS RECUPERADOS
spark.sql("CREATE OR REPLACE VIEW Total_Casos_Recuperados AS\
        SELECT MAX (recuperadosnovos) AS Casos_Recuperados, \
        MAX (emacompanhamentonovos) AS Em_Acompanhamento \
        FROM dados_covid \
        WHERE data = ('2021-07-06')")

# View CASOS CONFIRMADOS
spark.sql("CREATE OR REPLACE VIEW Casos_Confirmados AS \
        SELECT MAX (casosacumulado) AS Acumulado, \
        ROUND(((MAX(casosacumulado) / MAX(populacaotcu2019))*100000),1) AS Incidencia, \
        MAX (casosnovos) AS Casos_novos \
        FROM dados_covid \
        WHERE data = ('2021-07-06')")

# View OBITOS CONFIRMADOS
spark.sql("CREATE OR REPLACE VIEW Obitos_Confirmados AS \
        SELECT MAX (obitosacumulado) AS Obitos_acumulados, \
        MAX (casosnovos) AS Casos_novos, \
        ROUND(((MAX(obitosacumulado) / MAX(casosacumulado))*100),1) AS Letalidade, \
        ROUND(((MAX(obitosacumulado) / MAX(populacaotcu2019))*100000),1) AS Mortalidade \
        FROM dados_covid \
        WHERE data = ('2021-07-06')")


DataFrame[]

In [57]:
view1 = spark.sql("SELECT * FROM Total_Casos_Recuperados")
view2 = spark.sql("SELECT * FROM Casos_Confirmados")
view3 = spark.sql("SELECT * FROM Obitos_Confirmados")

view1.show()
view2.show()
view3.show()

+-----------------+-----------------+
|Casos_Recuperados|Em_Acompanhamento|
+-----------------+-----------------+
|         17262646|          1065477|
+-----------------+-----------------+

+---------+----------+-----------+
|Acumulado|Incidencia|Casos_novos|
+---------+----------+-----------+
| 18855015|    8972.3|      62504|
+---------+----------+-----------+

+-----------------+-----------+----------+-----------+
|Obitos_acumulados|Casos_novos|Letalidade|Mortalidade|
+-----------------+-----------+----------+-----------+
|           526892|      62504|       2.8|      250.7|
+-----------------+-----------+----------+-----------+



## Salvar views

In [54]:
# Salvar view 1 como tabela Hive

view1.write.saveAsTable("view1")

# Salvar view 2 como formato parquet e compressão snappy

view2.write.option("compression", "snappy").parquet("/user/hive/warehouse/view2")

In [163]:
# Verificar arquivo salvo em /user/hive/warehouse

!hdfs dfs -ls /user/hive/warehouse/view1

Found 2 items
-rw-r--r--   2 root supergroup          0 2022-04-24 23:25 /user/hive/warehouse/view1/_SUCCESS
-rw-r--r--   2 root supergroup        679 2022-04-24 23:25 /user/hive/warehouse/view1/part-00000-6e6b2967-e529-427d-9850-332238bf7e37-c000.snappy.parquet


In [164]:
# Verificar arquivo salvo em /user/hive/warehouse

!hdfs dfs -ls /user/hive/warehouse/view2

Found 2 items
-rw-r--r--   2 root supergroup          0 2022-04-24 23:29 /user/hive/warehouse/view2/_SUCCESS
-rw-r--r--   2 root supergroup        894 2022-04-24 23:29 /user/hive/warehouse/view2/part-00000-391af4a2-357c-4237-8a86-f4317b93ec4d-c000.snappy.parquet


In [107]:
# Salvar view 3 em um tópico no Kafka

from pyspark.sql.functions import *
from pyspark.sql.types import *

view3_convertido = view3.withColumn(
            "value", struct(
            col("Obitos_acumulados"), \
            col("Casos_novos"), \
            col("Letalidade"), \
            col("Mortalidade")))
view3_convertido = view3_convertido.withColumn("value",col("value").cast("string"))

view3_convertido.show()

view3_convertido.write.format("kafka")
        \.option("kafka.bootstrap.servers", "kafka:9092")
        \.option("topic","view3").save()

+-----------------+-----------+----------+-----------+--------------------+
|Obitos_acumulados|Casos_novos|Letalidade|Mortalidade|               value|
+-----------------+-----------+----------+-----------+--------------------+
|           526892|      62504|       2.8|      250.7|[526892, 62504, 2...|
+-----------------+-----------+----------+-----------+--------------------+



## Criar visualização com os dados enviados para o HDFS

In [154]:
from pyspark.sql.functions import col
from pyspark.sql.functions import lit

view_spark = tabela_dados_covid.select(
    'regiao',\
    'populacaotcu2019',\
    'obitosacumulado',\
    'casosacumulado',\
    'estado').where(tabela_dados_covid.regiao != 'regiao')

view_spark = view_spark.groupBy('estado', 'regiao')
view_spark = view_spark.max('casosacumulado','obitosacumulado','populacaotcu2019')

view_spark = view_spark.groupBy('regiao')
view_spark = view_spark.sum('max(casosacumulado)','max(obitosacumulado)','max(populacaotcu2019)')
view_spark = view_spark.withColumn('mult', lit(100000))

view_spark = view_spark.withColumn('Incidência/mil hab.', \
        format_number(((col('sum(max(casosacumulado))') * col('mult'))/col('sum(max(populacaotcu2019))')),1))
view_spark = view_spark.withColumn('Mortalidade/mil hab.', \
        format_number(((col('sum(max(obitosacumulado))')/col('sum(max(populacaotcu2019))'))) * col('mult'),1))

view_spark = view_spark.drop('mult','sum(max(populacaotcu2019))')
view_spark = view_spark.withColumnRenamed('sum(max(casosacumulado))', 'Casos').\
        withColumnRenamed('sum(max(obitosacumulado))', 'Óbitos').\
        withColumnRenamed('regiao', 'Região')

view_spark.toPandas()

Unnamed: 0,Região,Casos,Óbitos,Incidência/mil hab.,Mortalidade/mil hab.
0,Nordeste,4455737,107824,7807.3,188.9
1,Sul,3611041,80705,12046.4,269.2
2,Sudeste,7138803,245311,8078.2,277.6
3,Centro-Oeste,1916619,49207,11760.5,301.9
4,Brasil,18855015,526892,8972.3,250.7
5,Norte,1732862,43845,9401.9,237.9


## Salvar view3 em um tópico no Elastic

In [161]:
view3_el = view3.write.format('csv')\
    .option("inferSchema", "true")\
    .option("header","true")\
    .save("/user/hive/warehouse/elastic_search/view3_elastic.csv")

!hdfs dfs -ls /user/hive/warehouse/elastic_search/view3_elastic.csv

spark.read.csv('/user/hive/warehouse/elastic_search/view3_elastic.csv', inferSchema = True, header = True).toPandas()

Found 2 items
-rw-r--r--   2 root supergroup          0 2022-04-25 02:00 /user/hive/warehouse/elastic_search/view3_elastic.csv/_SUCCESS
-rw-r--r--   2 root supergroup         76 2022-04-25 02:00 /user/hive/warehouse/elastic_search/view3_elastic.csv/part-00000-4ceed0e5-d0b7-4ee9-ab11-b377f0a97bd1-c000.csv


Unnamed: 0,Obitos_acumulados,Casos_novos,Letalidade,Mortalidade
0,526892,62504,2.8,250.7
