In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as sum_col, avg, when, to_date, year, month, dayofweek, lit
from pyspark.sql.types import IntegerType, DoubleType, StringType, DateType
import findspark

findspark.init()

spark = SparkSession.builder \
    .appName("BigData") \
    .config("spark.memory.offHeap.enabled", "true") \
    .config("spark.memory.offHeap.size", "5g") \
    .getOrCreate()

In [2]:
# Caminhos para os arquivos
base_path = "datasets_raw/HIST_PAINEL_COVIDBR_31mai2025/"

df_2021_p1 = spark.read.csv(base_path + "HIST_PAINEL_COVIDBR_2021_Parte1_31mai2025.csv", header=True, inferSchema=True, sep=';', encoding='ISO-8859-1')
df_2021_p2 = spark.read.csv(base_path + "HIST_PAINEL_COVIDBR_2021_Parte2_31mai2025.csv", header=True, inferSchema=True, sep=';', encoding='ISO-8859-1')
df_2022_p1 = spark.read.csv(base_path + "HIST_PAINEL_COVIDBR_2022_Parte1_31mai2025.csv", header=True, inferSchema=True, sep=';', encoding='ISO-8859-1')
df_2022_p2 = spark.read.csv(base_path + "HIST_PAINEL_COVIDBR_2022_Parte2_31mai2025.csv", header=True, inferSchema=True, sep=';', encoding='ISO-8859-1')

In [None]:
# Unir todos os DataFrames
df_raw = df_2022_p1.unionByName(df_2022_p2, allowMissingColumns=True) \
                   .unionByName(df_2021_p1, allowMissingColumns=True) \
                   .unionByName(df_2021_p2, allowMissingColumns=True)

df_raw.cache()

DataFrame[regiao: string, estado: string, municipio: string, coduf: int, codmun: int, codRegiaoSaude: int, nomeRegiaoSaude: string, data: date, semanaEpi: int, populacaoTCU2019: int, casosAcumulado: double, casosNovos: int, obitosAcumulado: int, obitosNovos: int, Recuperadosnovos: int, emAcompanhamentoNovos: int, interior/metropolitana: int]

In [4]:
df_raw.printSchema()
print(f"Número total de linhas no DataFrame combinado: {df_raw.count()}")

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: date (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- casosAcumulado: double (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interior/metropolitana: integer (nullable = true)

Número total de linhas no DataFrame combinado: 4101870


In [None]:
df_filtered = df_raw.filter(col("regiao") != "Brasil") \
                    .filter(col("municipio").isNotNull()) # Foca em município/estado

colunas_numericas_fill_zero = ["casosNovos", "obitosNovos", "Recuperadosnovos", "emAcompanhamentoNovos"]
df_cleaned = df_filtered.na.fill(0, subset=colunas_numericas_fill_zero)

# Criar features de tempo
df_features = df_cleaned.withColumn("ano_data", year(col("data"))) \
                        .withColumn("mes_data", month(col("data"))) \
                        .withColumn("dia_semana_num", dayofweek(col("data"))) # 1=Domingo, 7=Sábado
df_features.cache()

# Mapear dia da semana para nomes
df_final_base = df_features.withColumn("dia_semana_nome",
    when(col("dia_semana_num") == 1, "Domingo")
    .when(col("dia_semana_num") == 2, "Segunda")
    .when(col("dia_semana_num") == 3, "Terça")
    .when(col("dia_semana_num") == 4, "Quarta")
    .when(col("dia_semana_num") == 5, "Quinta")
    .when(col("dia_semana_num") == 6, "Sexta")
    .when(col("dia_semana_num") == 7, "Sábado")
    .otherwise("Nao Informado")
)
df_final_base.cache()
df_final_base.printSchema()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- municipio: string (nullable = true)
 |-- coduf: integer (nullable = true)
 |-- codmun: integer (nullable = true)
 |-- codRegiaoSaude: integer (nullable = true)
 |-- nomeRegiaoSaude: string (nullable = true)
 |-- data: date (nullable = true)
 |-- semanaEpi: integer (nullable = true)
 |-- populacaoTCU2019: integer (nullable = true)
 |-- casosAcumulado: double (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- obitosAcumulado: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- Recuperadosnovos: integer (nullable = true)
 |-- emAcompanhamentoNovos: integer (nullable = true)
 |-- interior/metropolitana: integer (nullable = true)
 |-- ano_data: integer (nullable = true)
 |-- mes_data: integer (nullable = true)
 |-- dia_semana_num: integer (nullable = true)
 |-- dia_semana_nome: string (nullable = false)



In [9]:
caminho_para_salvar_parquet = "datasets_base/df_covid_tratado.parquet"

# Salvar o DataFrame em Parquet
# O modo 'overwrite' é útil durante o desenvolvimento para substituir o arquivo se ele já existir.
df_final_base.write.mode("overwrite").parquet(caminho_para_salvar_parquet)

print(f"DataFrame 'df_final_base' salvo com sucesso em: {caminho_para_salvar_parquet}")

DataFrame 'df_final_base' salvo com sucesso em: datasets_base/df_covid_tratado.parquet
