In [1]:
import os
from configparser import ConfigParser
from pyspark.sql.functions import *

In [16]:
config = ConfigParser()
config.read('settings.cfg')

USUARIO_HDFS = config.get('local', 'usuario_hdfs')
TOPICO_KAFKA = config.get('local', 'topico_kafka')

In [3]:
# Filtrando cabeçalhos dos CSVs retornados mesmo com skip headers definido na tabela.
# Bug relacionado: https://issues.apache.org/jira/browse/SPARK-11374
base_df = spark.read.table('hist_painel_covidbr').filter('regiao != "regiao"')

In [5]:
%%time
# Visualização 1: Casos recuperados & acompanhamento
v1_df = (base_df
    .filter("coduf = 76")
    .select("data", "recuperadosnovos", "emacompanhamentonovos")
    .withColumnRenamed("recuperadosnovos", "recuperados")
    .withColumnRenamed("emacompanhamentonovos", "em_acompanhamento")
)

CPU times: user 7.75 ms, sys: 0 ns, total: 7.75 ms
Wall time: 74.9 ms


In [6]:
%%time
# Visualização 2: Casos confirmados
v2_df = (base_df
    .filter("coduf = 76")
    .select("data", "casosacumulado", "casosnovos", "populacaotcu2019")
    .withColumn(
        "incidencia",
        round(col("casosacumulado").cast("double") * 100000 / col("populacaotcu2019").cast("double"), 1)
    )
    .withColumnRenamed("casosacumulado", "acumulado")
    .withColumnRenamed("casosnovos", "casos_novos")
    .drop("populacaotcu2019")
)

CPU times: user 10.1 ms, sys: 100 µs, total: 10.2 ms
Wall time: 404 ms


In [7]:
%%time
# Visualização 3: Óbitos
v3_df = (base_df
    .filter("coduf = 76")
    .select("data", "obitosacumulado", "obitosnovos", "populacaotcu2019", "casosacumulado")
    .withColumn(
        "letalidade",
        round(col("obitosacumulado") / col("casosacumulado") * 100, 1)
    )
    .withColumn(
        "mortalidade",
        round(col("obitosacumulado").cast("double") * 100000 / col("populacaotcu2019").cast("double"), 1)
    )
    .withColumnRenamed("obitosacumulado", "obitos_acumulados")
    .withColumnRenamed("obitosnovos", "obitos_novos")
    .drop("casosacumulado", "populacaotcu2019")
)

CPU times: user 4.04 ms, sys: 6.24 ms, total: 10.3 ms
Wall time: 108 ms


In [8]:
%%time
# Visualização 4: casos/obitos/incidencia/mortalidade/atualização por região/brasil
v4_df = (base_df
 .filter("codmun is null")
 .select("data", "regiao", "estado", "casosacumulado", "obitosacumulado", "populacaotcu2019")
 .groupBy("data", "regiao")
 .agg(
     sum("casosacumulado").alias("casos"),
     sum("obitosacumulado").alias("obitos"),
     sum("populacaotcu2019").alias("populacao")
 )
 .withColumn(
    "incidencia",
    round(col("casos").cast("double") * 100000 / col("populacao").cast("double"), 1)
 )
 .withColumn(
    "mortalidade",
    round(col("obitos").cast("double") * 100000 / col("populacao").cast("double"), 1)
 )
 .drop("populacao")
)

CPU times: user 24.9 ms, sys: 1.25 ms, total: 26.1 ms
Wall time: 248 ms


In [13]:
v1_df.write.format('orc').saveAsTable('covid_casos_recuperados_acompanhamento', mode='overwrite')

In [14]:
v2_df.write.option('compression', 'snappy').save(f'/user/{USUARIO_HDFS}/covid_casos_confirmados', format='parquet', mode='overwrite')

In [26]:
(v3_df
    .selectExpr("to_json(struct(*)) as value")
    .write
    .format("kafka") 
    .option("kafka.bootstrap.servers", "kafka:9092")\
    .option("topic", TOPICO_KAFKA) \
    .save()
)

In [27]:
v4_df.write.save(f'/user/{USUARIO_HDFS}/covid_casos_regiao', mode='overwrite')

In [None]:
display(v1_df.orderBy(col("data").desc()).toPandas())
display(v2_df.orderBy(col("data").desc()).toPandas())
display(v3_df.orderBy(col("data").desc()).toPandas())
display(v4_df.orderBy(col("data").desc(), col("regiao")).toPandas())