In [None]:
import os
import re
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_date

In [None]:
class SparkPostgres:
    def __init__(self, spark, host, port, database, user, password):
        self.spark = spark
        self.url = f"jdbc:postgresql://{host}:{port}/{database}"
        self.properties = {
            "user": user,
            "password": password,
            "driver": "org.postgresql.Driver"
        }
    
    def load_csv(self, file_path):
        if os.path.exists(file_path):
            df = self.spark.read.csv(file_path, header=True, inferSchema=True)
            return df
        else:
            print(f"Error: O arquivo {file_path} não existe.")
            return None
    
    def drop_and_create_table(self, schema, table, df):
        drop_query = f"DROP TABLE IF EXISTS {schema}.{table}"
        self.spark._jsparkSession \
            .sqlContext() \
            .executeUpdate(drop_query, self.properties)
        
        create_columns = ", ".join([f"{self.clean_column_name(col_name)} STRING" for col_name in df.columns])
        create_query = f"CREATE TABLE {schema}.{table} ({create_columns})"
        self.spark._jsparkSession \
            .sqlContext() \
            .executeUpdate(create_query, self.properties)
    
    def save_to_postgres(self, df, schema, table):
        if df is not None:
            self.drop_and_create_table(schema, table, df)
            df = df.withColumn('dt_pst', current_date())
            df.write.jdbc(
                url=self.url,
                table=f"{schema}.{table}",
                mode="overwrite",
                properties=self.properties
            )
        else:
            print(f"Error: DataFrame vazio para a tabela {schema}.{table}")
    
    def run(self, base_path, mapping, schema):
        for file_path, table_name in mapping.items():
            self.process_file(base_path, file_path, table_name, schema)
    
    def clean_column_name(self, column_name):
        column_name = column_name.lower().strip()
        column_name = re.sub(r"[^a-zA-Z0-9_]", "", column_name)
        column_name = re.sub(r"\s+", "_", column_name)
        return column_name
    
    def process_file(self, base_path, file_path, table_name, schema):
        full_path = os.path.join(base_path, file_path)
        
        df = self.load_csv(full_path)
        if df is not None:
            for col_name in df.columns:
                df = df.withColumnRenamed(col_name, self.clean_column_name(col_name))
            
            for col_name in df.columns:
                df = df.withColumn(col_name, col(col_name).cast("string"))
            
            self.save_to_postgres(df, schema, table_name)
            print(f"Tabela {table_name} processada com sucesso.")
        else:
            print(f"Erro ao processar o arquivo: {file_path}")

if __name__ == "__main__":
    spark = SparkSession.builder.appName("CSV to PostgreSQL").getOrCreate()

    host = os.getenv("DB_HOST")
    port = os.getenv("DB_PORT")
    database = os.getenv("DB_DATABASE")
    user = os.getenv("DB_USER")
    password = os.getenv("DB_PASSWORD")
    schema = os.getenv("DB_SCHEMA")

    base_path = "source/"

    mapping = {
        "auxiliares/Originais anonimizados/TbSituacaoAlunoDisciplina.csv": "tb_sit_alun_disc",
        "auxiliares/Originais anonimizados/TbTipoSituacaoAlunoDisciplina.csv": "tb_tip_sit_alun_disc",
        "auxiliares/Outras tabelas/TbCentroResultado.csv": "tb_outr_tabel_cent_result",
        "auxiliares/Outras tabelas/TbCursoFases.csv": "tb_outr_tabel_curs_fas",
        "auxiliares/Outras tabelas/TbDisciplina.csv": "tb_outr_tabel_disc",
        "auxiliares/Outras tabelas/TbFormaIngresso.csv": "tb_outr_tabel_form_ingr",
        "auxiliares/Outras tabelas/TbFreqQuadroHorario.csv": "tb_outr_tabel_freq_quad_hor",
        "auxiliares/Outras tabelas/TbGradeCurricular.csv": "tb_outr_tabel_grad_curr",
        "auxiliares/Outras tabelas/TbMotivoInativacao.csv": "tb_outr_tabel_mot_inat",
        "auxiliares/Outras tabelas/TbMunicipio.csv": "tb_outr_tabel_munic",
        "auxiliares/Outras tabelas/TbPais.csv": "tb_outr_tabel_pais",
        "auxiliares/Outras tabelas/TbPeriodo.csv": "tb_outr_tabel_periodo",
        "auxiliares/Outras tabelas/TbTipoOcorrencia.csv": "tb_outr_tabel_tip_ocorr",
        "auxiliares/TbAbatimento/Originais anonimizados/TbAbatimento.csv": "tb_abatim",
        "auxiliares/TbAbatimento/Originais anonimizados/TbAbatimentoTipo.csv": "tb_abatim_tip",
        "auxiliares/TbAluno/Originais anonimizados/TbAluno.csv": "tb_aluno",
        "auxiliares/TbAluno/Originais anonimizados/TbAlunoObs.csv": "tb_aluno_obs",
        "auxiliares/TbAluno/Originais anonimizados/TbAlunoProprioResponsavel.csv": "tb_aluno_prop_resp",
        "auxiliares/TbAluno/Originais anonimizados/TbAlunoRotinaEducacaoInfantil.csv": "tb_aluno_rot_educ_inf",
        "auxiliares/TbAluno/Originais anonimizados/TbAlunoTurma.csv": "tb_aluno_turma",
        "auxiliares/TbAluno/Originais anonimizados/TbAlunoTurmaHistorico.csv": "tb_aluno_turma_hist",
        "auxiliares/TbCampoDinamico/Originais anonimizados/TbCampoDinamico.csv": "tb_camp_dinam",
        "auxiliares/TbCampoDinamico/Originais anonimizados/TbCampoDinamicoConjunto.csv": "tb_camp_dinam_conj",
        "auxiliares/TbCampoDinamico/Originais anonimizados/TbCampoDinamicoConjuntoElemento.csv": "tb_camp_dinam_conj_elem",
        "auxiliares/TbCaptacao/Originais anonimizados/TbCaptacaoCursoInteresse.csv": "tb_capt_curso_int",
        "auxiliares/TbCaptacao/Originais anonimizados/TbCaptacaoMotivoDesistencia.csv": "tb_capt_mot_desist",
        "auxiliares/TbCaptacao/Originais anonimizados/TbCaptacaoOrigemLead.csv": "tb_capt_orig_lead",
        "auxiliares/TbCaptacao/Originais anonimizados/TbCaptacaoSituacaoLead.csv": "tb_capt_sit_lead",
        "auxiliares/TbDiario/Originais anonimizados/TbDiario.csv": "tb_diario",
        "auxiliares/TbDiario/Originais anonimizados/TbDiarioAluno.csv": "tb_diario_aluno",
        "auxiliares/TbDiario/Originais anonimizados/TbDiarioAula.csv": "tb_diario_aula",
        "auxiliares/TbDiario/Originais anonimizados/TbDiarioFrequencia.csv": "tb_diario_freq",
        "auxiliares/TbFase/Originais anonimizados/TbFaseNota.csv": "tb_fase_nota",
        "auxiliares/TbFase/Originais anonimizados/TbFaseNotaAluno.csv": "tb_fase_nota_aluno",
        "auxiliares/TbFase/Originais anonimizados/TbFaseNotaDisciplinaTurma.csv": "tb_fase_nota_disc_turma",
        "auxiliares/TbFase/Originais anonimizados/TbFaseNotaOrigemDestino.csv": "tb_fase_nota_orig_dest",
        "auxiliares/TbHistorico/Originais anonimizados/TbHistorico.csv": "tb_hist",
        "auxiliares/TbHistorico/Originais anonimizados/TbHistoricoNotas.csv": "tb_hist_nota",
        "auxiliares/TbMeta/TbMeta.csv": "tb_meta",
        "auxiliares/TbMeta/TbMetaConceito.csv": "tb_meta_conceito",
        "auxiliares/TbMeta/TbMetaFaseNota.csv": "tb_meta_fase_nota",
        "auxiliares/TbMeta/TbMetaFaseNotaAluno.csv": "tb_meta_fase_nota_aluno",
        "auxiliares/TbMeta/TbMetaSituacaoAlunoDisciplina.csv": "tb_meta_sit_alun_disc",
        "auxiliares/TbMeta/TbTipoMeta.csv": "tb_meta_tipo",
        "auxiliares/TbProfessor/Originais anonimizados/TbProfessor.csv": "tb_professor",
        "auxiliares/TbProfessor/Originais anonimizados/TbProfessorDisciplina.csv": "tb_prof_disc",
        "auxiliares/TbProfessor/Originais anonimizados/TbProfessorHorario.csv": "tb_prof_horario",
        "auxiliares/TbResponsavel/Originais anonimizados/TbResponsavel.csv": "tb_resp",
        "auxiliares/TbResponsavel/Originais anonimizados/TbTipoResponsavel.csv": "tb_tipo_resp",
        "auxiliares/TbResponsavel/Originais anonimizados/TbTipoVinculoAlunoResponsavel.csv": "tb_tipo_vinc_alun_resp"
    }


    processar = SparkPostgres(spark, host, port, database, user, password)
    processar.run(base_path, mapping, schema)

    spark.stop()
