In [131]:
import sys
sys.path.append("/home/jovyan/work/Scripts")


In [132]:
import os
usuario = os.getenv("POSTGRES_USER")
senha = os.getenv("POSTGRES_PASSWORD")
url = os.getenv("POSTGRES_URL")


In [133]:
from spark_session import create_spark_session
spark = create_spark_session()

In [134]:
from pyspark.sql import functions as F
from pyspark.sql.types import DateType, IntegerType

In [135]:
ano = "2025"
silver_path = f"s3a://datalake/silver/sih_sus"      
municipio_path = f"s3a://datalake/Auxiliar/Tabela_municipio.csv"
cids_path = f"s3a://datalake/Auxiliar/cid_descricao_comuns.csv" 




In [174]:
df_municipio = spark.read.csv(municipio_path,header=True,sep=";",encoding="UTF-8")
df_cid = spark.read.csv(cids_path,header=True,sep=";",encoding="UTF-8")

In [181]:
df_cid_sel = df_cid.select('SUBCAT','DESCRICAO')

In [172]:
df_silver = spark.read.format("delta").load(silver_path)

In [175]:
df_municipio = df_municipio.select(
    F.substring(F.col("Código IBGE"),1,6).alias("codigo_ibge"),
    F.col("Município").alias("municipio"),
    
)

In [176]:
df_silver.show(5)

+------+---------------+---------------+------------+----+-----+-------------+--------------+-----+---------------+----------+
|    uf|ano_competencia|mes_competencia|municipio_id|sexo|idade|cid_principal|   valor_total|morte|data_internacao|data_saida|
+------+---------------+---------------+------------+----+-----+-------------+--------------+-----+---------------+----------+
|354340|           2025|             01|      354340|   3|   21|         L989|        190.11|    0|       20241221|  20241224|
|354340|           2025|             01|      354340|   3|   44|         T813|        717.92|    0|       20241220|  20241221|
|354340|           2025|             01|      354340|   1|   29|         K359|        732.66|    0|       20241221|  20241222|
|354340|           2025|             01|      261180|   3|   48|         N201|        683.94|    0|       20241222|  20241223|
|354340|           2025|             01|      354340|   3|   87|         M844|       1054.83|    0|       20250

In [192]:
# Seleção e renomeação de colunas principais
df_gold = df_silver.select(    
    F.col("ano_competencia"),
    F.col("mes_competencia"),
    F.col("uf"),
    F.col("municipio_id").alias("cod_municipio_residencia"),
    F.col("idade").cast(IntegerType()).alias("idade"),
    F.col("sexo"),
    F.to_date("data_internacao", "yyyyMMdd").alias("data_internacao"),
    F.to_date("data_saida", "yyyyMMdd").alias("data_saida"),
    F.col("cid_principal"),
    F.col("valor_total").cast("double").alias("valor_total"),
    F.col("morte")
)

In [193]:
# Criação de colunas derivadas
df_gold = df_gold.withColumn("ano_internacao", F.year("data_internacao")) \
                 .withColumn("mes_internacao", F.month("data_internacao")) \
                 .withColumn("dias_internacao",F.datediff(F.col("data_saida"), F.col("data_internacao"))) \
                 .withColumn(
                     "faixa_etaria",
                     F.when(F.col("idade") < 1, "Menor de 1 ano")
                      .when(F.col("idade") < 5, "1-4 anos")
                      .when(F.col("idade") < 10, "5-9 anos")
                      .when(F.col("idade") < 20, "10-19 anos")
                      .when(F.col("idade") < 60, "20-59 anos")
                      .otherwise("60+ anos")
                 )

In [185]:
df_cid_sel.show(5)

+------+--------------------+
|SUBCAT|           DESCRICAO|
+------+--------------------+
|  I64 |                 AVC|
|  A90 |     Dengue clássica|
|  A000|Cólera devida a V...|
|  A001|Cólera devida a V...|
|  A009|Cólera não especi...|
+------+--------------------+
only showing top 5 rows



In [194]:
df_gold = df_gold.join(
    df_cid_sel,
    df_gold.cid_principal == df_cid.SUBCAT,
    "left"
).withColumnRenamed("DESCRICAO", "descricao_cid")




In [195]:
df_enriquecido = df_gold.join(df_municipio, df_gold.cod_municipio_residencia == df_municipio.codigo_ibge, "left")

In [196]:
df_enriquecido.show(5)


+---------------+---------------+------+------------------------+-----+----+---------------+----------+-------------+-----------+-----+--------------+--------------+---------------+------------+------+--------------------+-----------+---------------+
|ano_competencia|mes_competencia|    uf|cod_municipio_residencia|idade|sexo|data_internacao|data_saida|cid_principal|valor_total|morte|ano_internacao|mes_internacao|dias_internacao|faixa_etaria|SUBCAT|       descricao_cid|codigo_ibge|      municipio|
+---------------+---------------+------+------------------------+-----+----+---------------+----------+-------------+-----------+-----+--------------+--------------+---------------+------------+------+--------------------+-----------+---------------+
|           2025|             08|350000|                  350750|   59|   3|     2025-06-17|2025-06-24|         K561|    3568.65|    0|          2025|             6|              7|  20-59 anos|  K561|      Intussuscepção|     350750|       Botuca

In [197]:
df_enriquecido.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("ano_competencia", "mes_competencia") \
    .option("overwriteSchema", "true") \
    .save("s3a://datalake/gold/sih_sus/")


In [32]:
# Configurações da conexão
url = "jdbc:postgresql://postgres_lab:5432/postgres"
tabela = "sih_sus_gold"
usuario = "admin"
senha = "SenhaForte123!"

In [33]:
#Escrever no PostgreSQL
df_enriquecido.write \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable", tabela) \
    .option("user", usuario) \
    .option("password", senha) \
    .option("driver", "org.postgresql.Driver") \
    .mode("overwrite") \
    .save()

In [199]:
import os

postgres_host = os.getenv("POSTGRES_HOST")
postgres_db = os.getenv("POSTGRES_DB")
postgres_user = os.getenv("POSTGRES_USER")
postgres_password = os.getenv("POSTGRES_PASSWORD")

url = f"jdbc:postgresql://{postgres_host}:5432/{postgres_db}"
tabela = "sih_sus_gold"

print("Iniciando carga no PostgreSQL...")

(
    df_enriquecido
    .repartition(4)
    .write
    .format("jdbc")
    .option("url", url)
    .option("dbtable", tabela)
    .option("user", postgres_user)
    .option("password", postgres_password)
    .option("driver", "org.postgresql.Driver")
    .option("batchsize", 10000)
    .mode("overwrite")
    .save()
)

print("Carga no PostgreSQL concluída com sucesso.")


Iniciando carga no PostgreSQL...
Carga no PostgreSQL concluída com sucesso.


In [130]:
spark.stop()