#### Configurar o Spark e Criar a sessão

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, col, explode
from pyspark.sql.types import DateType
from dotenv import load_dotenv
import glob
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/opt/spark/spark-3.5.5-bin-hadoop3"

spark = SparkSession.builder \
    .appName("Teste Minsait") \
    .master("local[*]") \
    .config("spark.jars", "jars/postgresql-42.7.5.jar") \
    .config("spark.sql.debug.maxToStringFields", "200") \
    .getOrCreate()

your 131072x1 screen size is bogus. expect trouble
25/03/14 12:15:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


#### Configurar conexão com PostgreSQL

In [2]:
load_dotenv()

jdbc_url = f"jdbc:postgresql://{os.getenv('DB_HOST')}:{os.getenv('DB_PORT')}/{os.getenv('DB_NAME')}"
properties = {
    "user": os.getenv('DB_USER'),
    "password": os.getenv('DB_PASSWD'),
    "driver": "org.postgresql.Driver"
}

#### Testar conexão

In [3]:
query = "(SELECT now()) AS query"
spark.read.jdbc(url=jdbc_url, table=query, properties=properties).show(truncate=False)

+--------------------------+
|now                       |
+--------------------------+
|2025-03-14 12:15:27.845356|
+--------------------------+



#### Listar todos os arquivos JSON na pasta 'data' e configurar tamanho do lote de processamento

In [4]:
json_files = sorted(glob.glob("data/*.json"))
batch_size = 1000

#### Função para processar um lote de arquivos

In [5]:
def process_batch(file_batch, df_existing_patients, df_existing_conditions, df_existing_medications):
    # Ler o lote de arquivos JSON
    df = spark.read.option("multiline", "true").json(file_batch)

    # Explodir a coluna 'entry'
    entries_df = df.select(explode(col("entry")).alias("entry"))

    # Extrair recursos e criar DataFrames
    patients_df = entries_df.filter(col("entry.resource.resourceType") == "Patient").select(
        col("entry.resource.id").alias("id"),
        col("entry.resource.gender").alias("gender"),
        col("entry.resource.birthDate").cast(DateType()).alias("birth_date")
    )

    conditions_df = entries_df.filter(col("entry.resource.resourceType") == "Condition").select(
        col("entry.resource.id").alias("id"),
        regexp_replace(col("entry.resource.subject.reference"), "urn:uuid:", "").alias("patient_id"),
        col("entry.resource.code.text").alias("condition_text"),
        col("entry.resource.recordedDate").cast(DateType()).alias("recorded_date")
    )

    medications_df = entries_df.filter(col("entry.resource.resourceType") == "MedicationRequest").select(
        col("entry.resource.id").alias("id"),
        regexp_replace(col("entry.resource.subject.reference"), "urn:uuid:", "").alias("patient_id"),
        col("entry.resource.medicationCodeableConcept.text").alias("medication_text"),
        col("entry.resource.authoredOn").cast(DateType()).alias("authored_on")
    )

    # Escrever os DataFrames no PostgreSQL retirando os já existentes
    df_to_insert_patients = patients_df.join(df_existing_patients, patients_df["id"] == df_existing_patients["id"], "left_anti")
    df_to_insert_patients.write.jdbc(url=jdbc_url, table="patients", mode="append", properties=properties)

    df_to_insert_conditions = conditions_df.join(df_existing_conditions, conditions_df["id"] == df_existing_conditions["id"], "left_anti")
    df_to_insert_conditions.write.jdbc(url=jdbc_url, table="conditions", mode="append", properties=properties)

    df_to_insert_medications = medications_df.join(df_existing_medications, medications_df["id"] == df_existing_medications["id"], "left_anti")
    df_to_insert_medications.write.jdbc(url=jdbc_url, table="medication_requests", mode="append", properties=properties)

#### Verificar as tabelas e caso não existam, criá-las

In [6]:
# Diretório onde estão os arquivos SQL
sql_directory = "sql/"

# Listar todos os arquivos SQL na pasta
sql_files = sorted(glob.glob(os.path.join(sql_directory, "*.sql")))  # Ordena para manter a sequência

# Função para executar o SQL usando JDBC
def execute_sql_file(sql_file, stmt):
    with open(sql_file, "r", encoding="utf-8") as file:
        sql_script = file.read()

    print(f"Executando {sql_file}...")
    # Executa o script inteiro como uma única string
    try:
        stmt.execute(sql_script)
        print(f"SQL executado com sucesso: {sql_file}")
    except Exception as sql_err:
        print(f"Erro ao executar o comando do arquivo {sql_file}: {sql_err}")

# Iniciar conexão JDBC com o banco de dados
try:
    # Iniciar a conexão JDBC
    conn = spark._sc._gateway.jvm.java.sql.DriverManager.getConnection(
        jdbc_url, properties["user"], properties["password"]
    )

    # Habilita autocommit para evitar problemas de transação
    conn.setAutoCommit(True)

    stmt = conn.createStatement()

    # Loop pelos arquivos SQL e executa cada um
    if not sql_files:
        print("Nenhum arquivo SQL encontrado na pasta 'sql/'.")
    else:
        for sql_file in sql_files:
            execute_sql_file(sql_file, stmt)

    # Fechar conexão
    stmt.close()
    conn.close()
    print("Todos os arquivos SQL foram executados.")

except Exception as e:
    print(f"Erro ao executar os arquivos SQL: {e}")

Executando sql/1 - create_table_patients.sql...
SQL executado com sucesso: sql/1 - create_table_patients.sql
Executando sql/2 - create_table_conditions.sql...
SQL executado com sucesso: sql/2 - create_table_conditions.sql
Executando sql/3 - create_table_medication_requests.sql...
SQL executado com sucesso: sql/3 - create_table_medication_requests.sql
Todos os arquivos SQL foram executados.


#### Processar os arquivos em lotes com tamanho pré-definido

In [7]:
df_existing_patients = spark.read.jdbc(jdbc_url, table="public.patients", properties=properties)
df_existing_conditions = spark.read.jdbc(jdbc_url, table="public.conditions", properties=properties)
df_existing_medications = spark.read.jdbc(jdbc_url, table="public.medication_requests", properties=properties)

for i in range(0, len(json_files), batch_size):
    batch = json_files[i:i + batch_size]
    print(f"Processando lote de {i} a {i + len(batch) - 1} (tamanho do lote: {len(batch)})")
    process_batch(batch, df_existing_patients, df_existing_conditions, df_existing_medications)

Processando lote de 0 a 999 (tamanho do lote: 1000)


                                                                                

Processando lote de 1000 a 1179 (tamanho do lote: 180)


                                                                                

#### Encerrar a sessão Spark

In [8]:
spark.stop()