# Listagem dos arquivos CSV que foram importados para o HDFS

In [None]:
!hdfs dfs -ls /user/hive/warehouse/hist_painel_covidbr/

# Criação da tabela particionada por município

In [None]:
from pyspark.sql.types import StructType

schema = (
    StructType()
    .add('regiao', 'string')
    .add('estado', 'string')
    .add('municipio', 'string')
    .add('coduf', 'integer')
    .add('codmun', 'integer')
    .add('codRegiaoSaude', 'integer')
    .add('nomeRegiaoSaude', 'string')
    .add('data', 'timestamp')
    .add('semanaEpi', 'integer')
    .add('populacaoTCU2019', 'integer')
    .add('casosAcumulado', 'integer')
    .add('casosNovos', 'integer')
    .add('obitosAcumulado', 'integer')
    .add('obitosNovos', 'integer')
    .add('Recuperadosnovos', 'integer')
    .add('emAcompanhamentoNovos', 'integer')
    .add('interior/metropolitana', 'integer')
)

df_csv = (
    spark.read
    .schema(schema)
    .option('header', True)
    .option('encoding', 'UTF-8')
    .option('delimiter', ';')
    .format('csv')
    .load('/user/hive/warehouse/hist_painel_covidbr')
)

In [None]:
df_csv.printSchema()

In [None]:
(df_csv.write
 .mode("overwrite")
 .partitionBy('municipio')
 .saveAsTable('painel_covidbr.hist_por_municipio'))

In [None]:
!hdfs dfs -ls /user/hive/warehouse/painel_covidbr.db

# Leitura da tabela particionada

In [None]:
from pyspark.sql.functions import col

In [None]:
df = (
    spark.read
    .table('painel_covidbr.hist_por_municipio')
    .select(
        col('regiao'),
        col('estado'),
        col('municipio'),
        col('populacaoTCU2019').alias('populacao'),
        col('casosAcumulado'),
        col('casosNovos'),
        col('obitosAcumulado'),
        col('obitosNovos'),
        col('Recuperadosnovos').alias('recuperadosNovos'),
        col('emAcompanhamentoNovos'),
        col('data')
    )
)

# Criação da tabela intermediária de síntese de casos por Estados

Tornou-se necessária por limitações de hardware do equipamento do autor

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, desc, lit, row_number

In [None]:
w_regioes_estados = Window.partitionBy('regiao', 'estado').orderBy(desc('data'))

df_estados = (
    df
    .withColumn('row', row_number().over(w_regioes_estados))
    .filter(col('row') == 1)
    .drop('row')
)

In [None]:
df_sintese_casos_estados = (
    df_estados
    .withColumn('incidencia', (col('casosAcumulado') / col('populacao')) * lit(100_000))
    .withColumn('mortalidade', (col('obitosAcumulado') / col('populacao')) * lit(100_000))
    .withColumn('letalidade', col('obitosAcumulado') / col('casosAcumulado'))
    .select(
        col('regiao'),
        col('estado'),
        col('recuperadosNovos'),
        col('emAcompanhamentoNovos'),
        col('casosAcumulado'),
        col('casosNovos'),
        col('incidencia'),
        col('obitosAcumulado'),
        col('obitosNovos'),
        col('letalidade'),
        col('mortalidade'),
        col('populacao'),
        col('data')
    )
)

In [None]:
(df_sintese_casos_estados.write
 .mode('overwrite')
 .partitionBy('estado')
 .saveAsTable('painel_covidbr.sintese_casos_estados'))

# Painel da COVID-19

In [None]:
from pyspark.sql.functions import min, max, col, asc, desc, lit, sum, format_number

In [None]:
df_regioes_estados = spark.read.table('painel_covidbr.sintese_casos_estados')

## Visualização do painel da COVID-19 no Brasil

In [None]:
df_brasil = df_regioes_estados.filter(col('regiao') == 'Brasil')

## (Visão 1) - Casos recuperados e em acompanhamento

In [None]:
df_visao1 = (
    df_brasil
    .select(
        col('recuperadosNovos'),
        col('emAcompanhamentoNovos')
    )
)

In [None]:
df_visao1.show()

In [None]:
(df_visao1.write
 .mode('overwrite')
 .saveAsTable('painel_covidbr.painel_casos_recuperados_acompanhamento'))

In [None]:
spark.read.table('painel_covidbr.painel_casos_recuperados_acompanhamento').count()

## (Visão 2) - Casos confirmados

In [None]:
df_visao2 = (
    df_brasil
    .select(
        col('casosAcumulado'), 
        col('casosNovos'), 
        col('incidencia')
    )
)

In [None]:
df_visao2.show()

In [None]:
(df_visao2.write
 .mode('overwrite')
 .option('compression', 'snappy')
 .parquet('/user/hive/warehouse/casos_confirmados_parquet'))

In [None]:
!hdfs dfs -ls /user/hive/warehouse/casos_confirmados_parquet

## (Visão 3) - Óbitos confirmados

In [None]:
from pyspark.sql.functions import to_json, struct

df_visao3 = (
    df_brasil
    .select(to_json(struct(
        col('obitosAcumulado'),
        col('obitosNovos'),
        col('letalidade'),
        col('mortalidade'))).alias("value")
    )
)

In [None]:
df_visao3.show(1, False)

In [None]:
(df_visao3.write
 .format('kafka')
 .option('kafka.bootstrap.servers', 'kafka:29092')
 .option('topic', 'covid19-obitos')
 .save())

Para testar o resultado do kafka, execute:

```bash
sh scripts/teste-kafkaconsumer.sh
```

## Visualização da síntese de casos de COVID-19 no Brasil e Regiões

## (Visão 1) - Síntese de casos do Brasil e Regiões

In [None]:
df_sintese_regioes = (
    df_regioes_estados
    .groupBy('regiao')
    .agg(
        sum('casosAcumulado').alias('casosAcumulado'),
        sum('obitosAcumulado').alias('obitosAcumulado'),
        sum('populacao').alias('populacao'),
        max('data').alias('data')
    )
    .withColumn('incidencia', (col('casosAcumulado') / col('populacao')) * lit(100_000))
    .withColumn('mortalidade', (col('obitosAcumulado') / col('populacao')) * lit(100_000))
    .select(
        col('regiao'),
        col('casosAcumulado'),
        col('obitosAcumulado'),
        col('incidencia'),
        col('mortalidade'),
        col('data')
    )
    .orderBy(desc('casosAcumulado'))
)

In [None]:
df_sintese_regioes.printSchema()

In [None]:
df_sintese_regioes.show()