### Se√ß√£o 1: Importa√ß√µes de Bibliotecas

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, when, to_timestamp, year, month, dayofmonth, hour
from pyspark.sql.types import IntegerType, StringType, TimestampType
import os
import time
import psycopg2

### Se√ß√£o 2: Inicializa√ß√£o da Sess√£o Spark

In [None]:
print("Iniciando a sess√£o Spark...")
spark = SparkSession.builder \
    .appName("Acidentes_Aereos_ETL_Raw_to_Silver") \
    .getOrCreate()
print("Sess√£o Spark iniciada com sucesso!")

### Se√ß√£o 3: Defini√ß√£o de Caminhos

In [None]:
raw_path = "../Data_Layer/raw"
print(f"Lendo arquivos da origem (RAW) de: {raw_path}")

print("Lendo arquivo da camada RAW...")
try:
    df_raw = spark.read.option("header", "true").option("sep", ";").option("encoding", "UTF-8").csv(os.path.join(raw_path, "data_raw.csv"))
    print("Arquivo RAW carregado com sucesso!")
    print(f"Total de registros: {df_raw.count()}")
except Exception as e:
    print(f"Erro ao ler o arquivo CSV. Verifique o caminho e a permiss√£o dos arquivos. Erro: {e}")
    spark.stop()
    raise e

### Se√ß√£o 4: Leitura dos Dados Brutos (RAW)

In [None]:
print("Visualizando estrutura dos dados RAW...")
df_raw.printSchema()
print("\nPrimeiras 5 linhas:")
df_raw.show(5, truncate=False)

### Se√ß√£o 5: Transforma√ß√£o e Unifica√ß√£o dos Dados

In [None]:
print("Iniciando a transforma√ß√£o e limpeza dos dados...")

# 1. Remover espa√ßos em branco das colunas
print("1Ô∏è‚É£  Removendo espa√ßos em branco dos nomes das colunas...")
for old_col in df_raw.columns:
    new_col = old_col.strip()
    df_raw = df_raw.withColumnRenamed(old_col, new_col)
print("   ‚úÖ Colunas padronizadas")

# 2. Converter coluna de data e hora
print("\n2Ô∏è‚É£  Convertendo data e hora da ocorr√™ncia...")
df_silver = df_raw.withColumn(
    "Data e Hora da Ocorrencia",
    to_timestamp(col("Data e Hora da Ocorrencia"), "dd/MM/yyyy HH:mm")
)
print("   ‚úÖ Data convertida para TimestampType")

# 3. Extrair componentes de data
print("\n3Ô∏è‚É£  Extraindo componentes de data (Ano, M√™s, Dia, Hora)...")
df_silver = df_silver.withColumn("Ano", year(col("Data e Hora da Ocorrencia")))
df_silver = df_silver.withColumn("Mes", month(col("Data e Hora da Ocorrencia")))
df_silver = df_silver.withColumn("Dia", dayofmonth(col("Data e Hora da Ocorrencia")))
df_silver = df_silver.withColumn("Hora", hour(col("Data e Hora da Ocorrencia")))
print("   ‚úÖ Componentes de data extra√≠dos")

# 4. Converter colunas num√©ricas
print("\n4Ô∏è‚É£  Convertendo colunas num√©ricas...")
numeric_cols = [
    "Total de Fatalidades no Acidente",
    "Total de Recomendacoes",
    "Total de Aeronaves Envolvidas",
    "Quantidade de Assentos na Aeronave",
    "Ano de Fabricacao da Aeronave"
]

for col_name in numeric_cols:
    if col_name in df_silver.columns:
        df_silver = df_silver.withColumn(col_name, col(col_name).cast(IntegerType()))

print(f"   ‚úÖ {len([c for c in numeric_cols if c in df_silver.columns])} colunas convertidas para IntegerType")

# 5. Preencher valores nulos com 0 nas colunas num√©ricas
print("\n5Ô∏è‚É£  Preenchendo valores nulos com 0...")
for col_name in numeric_cols:
    if col_name in df_silver.columns:
        df_silver = df_silver.fillna({col_name: 0})
        
print("   ‚úÖ Valores nulos preenchidos")

# 6. Criar indicador de severidade
print("\n6Ô∏è‚É£  Criando indicador de severidade...")
df_silver = df_silver.withColumn(
    "Nivel_Severidade",
    when(col("Total de Fatalidades no Acidente") > 50, "CR√çTICO")
    .when(col("Total de Fatalidades no Acidente") > 10, "GRAVE")
    .when(col("Total de Fatalidades no Acidente") > 0, "MODERADO")
    .otherwise("LEVE")
)
print("   ‚úÖ Indicador 'Nivel_Severidade' criado")

# 7. Filtrar registros com data v√°lida (opcional)
print("\n7Ô∏è‚É£  Filtrando registros com data v√°lida...")
df_silver = df_silver.filter(col("Data e Hora da Ocorrencia").isNotNull())
print(f"   ‚úÖ Registros ap√≥s filtragem: {df_silver.count()}")

print("\n‚úÖ Transforma√ß√£o de dados conclu√≠da!")
df_silver.printSchema()

### Se√ß√£o 6: Carga dos Dados no Banco de Dados PostgreSQL

In [None]:
print("\nIniciando a carga de dados no banco de dados...")
jdbc_hostname = os.getenv("DB_HOST", "localhost")
jdbc_port     = os.getenv("DB_PORT", "5432")
jdbc_database = os.getenv("DB_NAME", "acidentes_aereos")
db_user       = os.getenv("DB_USER", "postgres")
db_password   = os.getenv("DB_PASSWORD", "postgres")
table_name    = "public.acidentes_silver"
jdbc_url = f"jdbc:postgresql://{jdbc_hostname}:{jdbc_port}/{jdbc_database}"
connection_properties = {"user": db_user, "password": db_password, "driver": "org.postgresql.Driver"}

retries = 10
wait_seconds = 5
for i in range(retries):
    try:
        print("Tentando conectar ao banco de dados...")
        conn = psycopg2.connect(host=jdbc_hostname, dbname=jdbc_database, user=db_user, password=db_password, port=jdbc_port)
        conn.close()
        print("‚úÖ Conex√£o com o banco de dados bem-sucedida!")
        break
    except psycopg2.OperationalError as e:
        print(f"‚è≥ Banco de dados n√£o est√° pronto. Tentando novamente em {wait_seconds} segundos...")
        time.sleep(wait_seconds)
        if i == retries - 1:
            print("‚ùå N√£o foi poss√≠vel conectar ao banco de dados. Abortando.")
            spark.stop()
            raise e

try:
    print(f"Iniciando a escrita de dados na tabela: {table_name}")
    
    df_silver.write \
        .mode("overwrite") \
        .jdbc(url=jdbc_url, table=table_name, properties=connection_properties)
        
    print(f"‚úÖ Tabela '{table_name}' populada com sucesso no banco de dados '{jdbc_database}'!")

except Exception as e:
    print(f"‚ùå Ocorreu um erro ao salvar no banco de dados: {e}")
    spark.stop()
    raise e

### Se√ß√£o 7: Valida√ß√£o e Finaliza√ß√£o

In [None]:
print("\nValidando a tabela SILVER (amostra):")
df_silver.orderBy(col("Ano").desc()).show(10, truncate=False)

print("\n=== ESTAT√çSTICAS FINAIS ===")
print(f"Total de registros processados: {df_silver.count()}")
print(f"Colunas na tabela SILVER: {len(df_silver.columns)}")

print("\nDistribui√ß√£o por N√≠vel de Severidade:")
df_silver.groupBy("Nivel_Severidade").count().orderBy(col("count").desc()).show()

print("\nüöÄ Job ETL (Raw ‚Üí Silver ‚Üí Database) finalizado com sucesso!")
spark.stop()