# 📊 AWS Glue & PySpark ETL: Análise de Transações Financeiras

# 1️⃣ Introdução e Objetivo

Este notebook implementa a etapa de **ETL (Extração, Transformação e Carga)** para processamento dos dados brutos.

📌 **Objetivo:**  
- Carregar os dados brutos da pasta `data/raw/`
- Aplicar transformações com **PySpark** (limpeza, normalização e regras de negócio)
- Armazenar os dados processados em formato **Parquet** para análise posterior

🔧 **Tecnologias utilizadas:**  
- **PySpark** para processamento eficiente de grandes volumes de dados
- **AWS Glue** (se rodarmos na nuvem) ou Spark local
- **Parquet** como formato de armazenamento otimizado

---


## 2️⃣ Configuração do do PySpark

In [None]:
import os
import pyspark
from pyspark.sql import SparkSession

# Criar uma sessão do PySpark
spark = SparkSession.builder \
    .appName("ETL_Processing") \
    .config("spark.sql.repl.eagerEval.enabled", True) \
    .getOrCreate()

print("✅ PySpark configurado com sucesso!")

## 3️⃣ Carregamento das Configurações (config.yaml)

In [None]:
import os
import yaml

# 📌 **Determinar caminho absoluto para `config.yaml` na raiz do projeto**
base_path = os.path.abspath(os.path.join(os.getcwd(), ".."))  # Sobe um nível para a raiz do projeto
config_path = os.path.join(base_path, "config", "config.yaml")

print(f"📂 Tentando carregar: {config_path}")

# 📌 **Verificar se o arquivo `config.yaml` existe**
if os.path.exists(config_path):
    with open(config_path, "r", encoding="utf-8") as f:
        config = yaml.safe_load(f)
    print("✅ Configuração carregada com sucesso!")
else:
    raise FileNotFoundError(f"❌ Arquivo 'config.yaml' não encontrado! Verifique o caminho: {config_path}")

# 📌 **Ajustar caminhos dos dados**
raw_data_path = os.path.normpath(config.get("raw_data_path", "data/raw/"))
processed_data_path = os.path.normpath(config.get("data_path", "data/processed/"))

# 📌 **Garantir que os caminhos sejam absolutos**
if not os.path.isabs(raw_data_path):
    raw_data_path = os.path.abspath(os.path.join(base_path, raw_data_path))
if not os.path.isabs(processed_data_path):
    processed_data_path = os.path.abspath(os.path.join(base_path, processed_data_path))

# 📌 **Verificar se os diretórios existem**
if not os.path.exists(raw_data_path):
    raise FileNotFoundError(f"❌ ERRO: O diretório de dados brutos '{raw_data_path}' não existe!")
if not os.path.exists(processed_data_path):
    os.makedirs(processed_data_path)  # Criar o diretório se não existir

# 📌 **Exibir as configurações carregadas**
print(f"📂 Caminho dos dados brutos: {raw_data_path}")
print(f"📂 Caminho dos dados processados: {processed_data_path}")

print("✅ Configurações carregadas com sucesso!")


## 4️⃣ Detectar e Carregar os Dados Brutos

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, stddev, when, date_format, unix_timestamp, lag, concat, lit, count
from pyspark.sql.window import Window
import os

# 📌 **Criar sessão Spark (caso ainda não tenha sido criada)**
spark = SparkSession.builder.appName("ETL - Fraude Financeira").getOrCreate()

# 📌 **Definir caminhos a partir da configuração**
raw_file_path = os.path.join(raw_data_path, "dados-brutos.csv")

# 📌 **Definir caminho de saída**
output_path = os.path.normpath(config.get("processed_data_path", "data/processed/"))
if not os.path.isabs(output_path):
    output_path = os.path.abspath(output_path)

# 📌 **Testar separadores comuns**
separators = [",", "|", ";", "\t"]
for sep in separators:
    try:
        df = spark.read.csv(raw_file_path, header=True, inferSchema=True, sep=sep)
        if len(df.columns) > 5:  # Se há mais de 5 colunas, provavelmente o separador está correto
            print(f"✅ Separador correto detectado: '{sep}'")
            break
    except Exception as e:
        print(f"⚠️ Erro com separador '{sep}': {str(e)}")

# 📌 **Verificar se o DataFrame foi carregado corretamente**
if "df" not in locals():
    raise ValueError("❌ ERRO: Falha ao carregar os dados! Verifique o formato do arquivo.")

# 📌 **Verificar colunas antes do processamento**
print(f"📋 Colunas detectadas antes do processamento: {df.columns}")

# 📌 **Remover duplicatas**
df = df.dropDuplicates()

# 📌 **Remover registros onde colunas críticas sejam nulas**
df = df.na.drop(subset=["cc_num", "amt", "is_fraud"])

# 📌 **Preencher valores nulos em colunas opcionais**
df = df.fillna({
    "merchant": "Desconhecido",
    "city": "Não informado",
    "state": "Não informado",
    "lat": 0.0,
    "long": 0.0
})

# 📌 **Criar coluna combinando data e hora**
df = df.withColumn("trans_date_trans_time", concat(col("trans_date"), lit(" "), col("trans_time")))
df = df.withColumn("trans_date_trans_time", col("trans_date_trans_time").cast("timestamp"))

# 📌 **Criar colunas de dia da semana e horário**
df = df.withColumn("day_of_week", date_format(col("trans_date_trans_time"), "E"))
df = df.withColumn("hour_of_day", date_format(col("trans_date_trans_time"), "HH").cast("int"))

# 📌 **Criar coluna categorizando período da transação**
df = df.withColumn(
    "transaction_period",
    when(col("hour_of_day") < 6, "Madrugada")
    .when(col("hour_of_day") < 12, "Manhã")
    .when(col("hour_of_day") < 18, "Tarde")
    .otherwise("Noite")
)

# 📌 **Criar flag para transações acima de 10.000**
df = df.withColumn("possible_fraud_high_value", (col("amt") > 10000).cast("integer"))

# 📌 **Definir janela para detecção de transações rápidas**
window_spec_time = Window.partitionBy("cc_num", "merchant").orderBy("trans_date_trans_time")
df = df.withColumn("time_diff", unix_timestamp("trans_date_trans_time") - lag(unix_timestamp("trans_date_trans_time")).over(window_spec_time))
df = df.withColumn("possible_fraud_fast_transactions", (col("time_diff") < 10).cast("integer"))

# 📌 **Ajuste de tipos**
df = df.withColumn("cc_num", col("cc_num").cast("string"))
df = df.withColumn("amt", col("amt").cast("float"))
df = df.withColumn("zip", col("zip").cast("int"))
df = df.withColumn("lat", col("lat").cast("float"))
df = df.withColumn("long", col("long").cast("float"))
df = df.withColumn("city_pop", col("city_pop").cast("int"))
df = df.withColumn("dob", col("dob").cast("string"))
df = df.withColumn("unix_time", col("unix_time").cast("int"))
df = df.withColumn("merch_lat", col("merch_lat").cast("float"))
df = df.withColumn("merch_long", col("merch_long").cast("float"))
df = df.withColumn("is_fraud", col("is_fraud").cast("int"))
df = df.withColumn("possible_fraud_high_value", col("possible_fraud_high_value").cast("int"))
df = df.withColumn("possible_fraud_fast_transactions", col("possible_fraud_fast_transactions").cast("int"))

# 📌 **Contagem de valores nulos**
null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
null_counts.show()

# 📌 **Criar diretório de saída**
if not os.path.exists(output_path):
    os.makedirs(output_path)

# 📌 **Contagem final de registros**
print(f"Total de registros processados: {df.count()}")

# 📌 **Salvar os dados processados em Parquet**
df.write.mode("overwrite").partitionBy("category").parquet(output_path)

print("✅ Dados brutos carregados com Sucesso!")


## 5️⃣ Aplicar Schema e Ajustes Finais

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

# 📌 **Definir Schema**
schema = StructType([
    StructField("trans_date_trans_time", TimestampType(), False),
    StructField("cc_num", StringType(), False),
    StructField("merchant", StringType(), True),
    StructField("category", StringType(), True),
    StructField("amt", DoubleType(), False),
    StructField("first", StringType(), True),
    StructField("last", StringType(), True),
    StructField("gender", StringType(), True),
    StructField("street", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("zip", IntegerType(), True),
    StructField("lat", DoubleType(), True),
    StructField("long", DoubleType(), True),
    StructField("city_pop", IntegerType(), True),
    StructField("job", StringType(), True),
    StructField("dob", StringType(), True),
    StructField("trans_num", StringType(), False),
    StructField("unix_time", IntegerType(), False),
    StructField("merch_lat", DoubleType(), True),
    StructField("merch_long", DoubleType(), True),
    StructField("is_fraud", IntegerType(), False)
])

# 📌 **Carregar os dados processados e aplicar schema**
df = spark.read.schema(schema).parquet(output_path)
print(f"✅ Dados carregados com {df.count()} registros após aplicação de schema.")

# 📌 **Mostrar schema final**
df.printSchema()


## 6️⃣ Salvar Dados Processados

In [None]:
import shutil

partition_path = f"{processed_data_path}/category=grocery_net"
if os.path.exists(partition_path):
    shutil.rmtree(partition_path)
    print(f"🗑️ Partição {partition_path} removida para evitar conflito de schema.")
