In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, current_date
import os

In [13]:
class SparkPostgres:
    def __init__(self, spark, host, port, database, user, password):
        self.spark = spark
        self.url = f"jdbc:postgresql://{host}:{port}/{database}"
        self.properties = {
            "user": user,
            "password": password,
            "driver": "org.postgresql.Driver"
        }        

    def load_csv(self, file_path):
        if os.path.exists(file_path):            
            df = self.spark.read.csv(file_path, header=True, inferSchema=True)
            return df
        else:
            print(f"Error: O Arquivo {file_path} não existe.")
    
    def clear_table(self, schema, table):
        query = f"TRUNCATE TABLE {schema}.{table}"
        self.spark.sql(query)

    def save_to_postgres(self, df, schema, table):
        self.clear_table(schema, table)
        # Adicionar a coluna 'data processamento' com a data atual
        df = df.withColumn('dt_pst', current_date())
        df.write.jdbc(
            url=self.url,
            table=f"{schema}.{table}",
            mode="overwrite",
            properties=self.properties
            )
    
    def run(self, org_table, schema):
        for table in org_table:
            df = self.load_csv(table["file"])
            if table["table"] == "tb_eia366_pbrent366":
                if df is not None:
                    df = df.withColumn("valdata", to_date(col("valdata")))
                    df = df.withColumn("valvalor", col("valvalor").cast("float"))
            self.save_to_postgres(df, schema, table["table"])


In [14]:
spark = SparkSession.builder \
    .appName("CSV to PostgreSQL") \
    .getOrCreate()

host = os.getenv("DB_HOST")
port = os.getenv("DB_PORT")
database = os.getenv("DB_DATABASE")
user = os.getenv("DB_USER")
password = os.getenv("DB_PASSWORD")
schema = os.getenv("DB_SCHEMA")
org_table = [ { "table": "tb_eia366_pbrent366", "file": "./output/sor/ipeadata_estudo.csv" },
              { "table": "tb_eia366_pbrent366_forecast", "file": "./output/sor/forecast_estudo.csv" } ]

processar = SparkPostgres(spark, host, port, database, user, password)
processar.run(org_table, schema)
spark.stop()

                                                                                