In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, col, to_date, date_format, regexp_replace
import os
import sys
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [2]:
# Lista de caminhos para os arquivos CSV
datasets = [f'datasets/part-0000{i}-aca9a996-ed21-4acc-b0e0-8c92f4b8f369.c000.csv' for i in range(5)]

In [4]:
# Cria uma SparkSession configurada para trabalhar com SQL e especifica parâmetros de memória e núcleos de processamento
spark = SparkSession.builder \
    .appName("SQL") \
    .config("spark.jars", "drivers/sqlite-jdbc-3.46.0.0.jar") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "4") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()
    

# Ler e combinar todos os arquivos CSV em um DataFrame, inferindo o esquema, definindo o cabeçalho, delimitador e formato de data
df = spark.read.csv(datasets, header=True, inferSchema=True, sep=';', encoding="utf-8", dateFormat='%d/%m/%Y')

# Conta o número de linhas do DataFrame
numero_de_linhas = df.count()
print(f"O DataFrame no inicio possui {numero_de_linhas} registros.")

# Remove linhas duplicadas com base na coluna 'document_id'
df = df.dropDuplicates(['document_id'])

# Preenche valores ausentes em colunas específicas com valores padrão dependendo do tipo de dado
for col_name, dtype in df.dtypes:
    if dtype == "int":
        df = df.fillna({col_name : 0})
    elif dtype == "double":
        df = df.fillna({col_name : 0.0})
    elif dtype == "string":
        df = df.fillna({col_name : 'SEM INFORMACAO'})
    elif dtype == "boolean":
        df = df.fillna({col_name : False})
    elif dtype == "timestamp":
        df = df.fillna({col_name : '1900-01-01'})
    elif dtype == "date":
        df = df.fillna({col_name : '1900-01-01'})

# Converte as colunas de data para o formato adequado
df = df.withColumn("vacina_dataAplicacao", to_date(col("vacina_dataAplicacao"), "dd/MM/yyyy"))
df = df.withColumn("paciente_dataNascimento", to_date(col("paciente_dataNascimento"), "dd/MM/yyyy"))

# Adiciona uma coluna 'ano' extraída da data de aplicação da vacina
df = df.withColumn("ano", year(df["vacina_dataAplicacao"]))

# Ordena o DataFrame pela data de aplicação da vacina
df = df.orderBy("vacina_dataAplicacao")

# Filtra os registros para manter apenas aqueles com a data de aplicação da vacina entre 2021 e 2024
df = df.filter((year(df['vacina_dataAplicacao']) >= 2021) & (year(df['vacina_dataAplicacao']) <= 2024))

# Converte as colunas de data para o formato 'yyyy-MM-dd'
df = df.withColumn("vacina_dataAplicacao", date_format(col("vacina_dataAplicacao"), "yyyy-MM-dd"))
df = df.withColumn("paciente_dataNascimento", date_format(col("paciente_dataNascimento"), "yyyy-MM-dd"))

# Remove a coluna 'ano' que foi usada apenas para filtragem
df = df.drop("ano")

# Conta o número de linhas do DataFrame após o tratamento de dados
numero_de_linhas = df.count()
print(f"O DataFrame apos o tratamento de dados possui {numero_de_linhas} registros.")

O DataFrame no inicio possui 8503892 regristos.
O DataFrame apos o tratamento de dados possui 8503885 regristos.


In [None]:
# Corrigir valores "PFIZER - PEDI?TRICA" para "PFIZER - PEDIÁTRICA"
df = df.withColumn(
    "vacina_fabricante_nome",
    regexp_replace("vacina_fabricante_nome", "PFIZER - PEDI\\?TRICA", "PFIZER - PEDIÁTRICA")
)

# Corrigir valores "Pendente Identifica??o" para "Pendente Identificação"
df = df.withColumn(
    "vacina_fabricante_nome",
    regexp_replace("vacina_fabricante_nome", "Pendente Identifica\\?\\?o", "Pendente Identificação")
)

In [6]:
# Define nome da tabela para o banco de dados
nomeTabela = 'vacinacao_covid_df'

In [7]:
# Reduz o número de partições do DataFrame para 1 antes de escrever no banco de dados
df.coalesce(1).write \
    .format("jdbc") \
    .option("url", f"jdbc:sqlite:datasets/vacinacao_covid_df.db") \
    .option("dbtable", nomeTabela) \
    .option("driver", "org.sqlite.JDBC") \
    .mode("overwrite") \
    .save()