In [0]:
import os
import logging
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from dotenv import load_dotenv
import re

# Configuração de logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def configurar_ambiente():
    if not load_dotenv('.env'):
        logger.warning("Arquivo .env não encontrado.")

    # Verifica e solicita configurações faltantes
    verificar_configuracao('AWS_ACCESS_KEY_ID', "AWS Access Key ID")
    verificar_configuracao('AWS_SECRET_ACCESS_KEY', "AWS Secret Access Key")
    verificar_configuracao('S3_BUCKET', "Nome do Bucket S3")

def verificar_configuracao(variavel: str, descricao: str):
    if not os.getenv(variavel):
        print(f"\nConfiguração não encontrada: {descricao}")
        valor = input(f"Por favor, informe o valor para {descricao}: ").strip()
        
        os.environ[variavel] =  valor
        logger.info(f"Configuração {variavel} salva na seção atual")

def criar_spark_session():
    """Cria e configura uma sessão Spark com acesso ao S3."""
    # Configura as credenciais AWS a partir de variáveis de ambiente
    logger.info(f"Criando Seção Spark...")
    try:
        spark = SparkSession.builder \
            .appName("ETL-Yellow-Taxis") \
            .config("fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID")) \
            .config("fs.s3a.secret.key", os.getenv("AWS_SECRET_ACCESS_KEY")) \
            .config("fs.s3a.endpoint", "s3.sa-east-1.amazonaws.com") \
            .config("fs.s3a.region", "sa-east-1") \
            .getOrCreate()
        logger.info(f"Seção Spark criada com Sucesso!")
        return spark
    except Exception as e:
        logger.error(f"Falha ao criar a sessão Spark: {e}")
        raise

def carregar_dados(spark: SparkSession, s3_bucket: str):
    logger.info(f"Lendo dados do bucket {s3_bucket}")
    caminhos = [
        f"s3a://{s3_bucket}/raw/yellow/yellow_tripdata_2023-01.parquet",
        f"s3a://{s3_bucket}/raw/yellow/yellow_tripdata_2023-02.parquet",
        f"s3a://{s3_bucket}/raw/yellow/yellow_tripdata_2023-03.parquet",
        f"s3a://{s3_bucket}/raw/yellow/yellow_tripdata_2023-04.parquet",
        f"s3a://{s3_bucket}/raw/yellow/yellow_tripdata_2023-05.parquet",
        f"s3a://{s3_bucket}/raw/green/green_tripdata_2023-01.parquet",
        f"s3a://{s3_bucket}/raw/green/green_tripdata_2023-02.parquet",
        f"s3a://{s3_bucket}/raw/green/green_tripdata_2023-03.parquet",
        f"s3a://{s3_bucket}/raw/green/green_tripdata_2023-04.parquet",
        f"s3a://{s3_bucket}/raw/green/green_tripdata_2023-05.parquet"
    ]   
    try:
        dfs = [ler_e_padronizar(caminho, spark) for caminho in caminhos]
    except Exception as e:
        logger.error(f"Falha ao ler os dados do S3: {e}")
        raise

    # Combinar todos os DataFrames
    df = dfs[0]
    for temp_df in dfs[1:]:
        df = df.union(temp_df)

    logger.info(f"Dados lidos com Sucesso")
    return df

def ler_e_padronizar(caminho: str, spark: SparkSession):
    df = spark.read.option("mergeSchema", "true").parquet(caminho)
    colunas_necessarias = [
        "VendorID", 
        "passenger_count", 
        "total_amount",
        "pickup_datetime", 
        "dropoff_datetime",
        "taxi_type"
    ]
    #extrai tipo do taxi
    tipo_taxi = re.search(r'/(green|yellow)/', caminho).group(1)
    
    # Passenger_count é double em janeiro e long nos demais, por isso é necessário converter cada um separadamente.
    # Demais colunas ele consegue inferir o schema e os tipos normalmente.
    if "passenger_count" in df.columns:
        df = df.withColumn("passenger_count", df["passenger_count"].cast("long"))

    df = df.withColumn("taxi_type", lit(tipo_taxi))

    #renomeia as colunas que são diferentes de acardo com o tipo de taxi
    if tipo_taxi == "yellow":
        df = df.withColumnRenamed("tpep_pickup_datetime", "pickup_datetime") \
               .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")
    else:
        df = df.withColumnRenamed("lpep_pickup_datetime", "pickup_datetime") \
               .withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")

    return df.select(colunas_necessarias)

def corrigindo_nulo(df):
    print("\n=== Valores Nulos ===")
    for c in df.columns:
        null_count = df.where(col(c).isNull()).count()
        print(f"{c}: {null_count} valores ausentes ({null_count/df.count()*100:.2f}%)")


    # Calcular mediana APENAS para passenger_count
    mediana_passenger = df.approxQuantile("passenger_count", [0.5], 0.01)[0]

    logger.info(f"Corrigindo valores nulos da coluna passenger_count com a mediana {mediana_passenger}")
    # Preencher NULOS apenas nessa coluna
    df_preenchido = df.fillna({"passenger_count": mediana_passenger})

    return df_preenchido

def salva_s3_clean(bucket, df):
    logger.info(f"Salvando dados transformados...")

    #Salva a versão limpa
    clean_path = f"s3a://{bucket}/clean/"

    try:
        df.write \
          .mode("overwrite") \
          .parquet(clean_path)
        logger.info(f"Dados salvos em: {clean_path} com Sucesso!")
    except Exception as e:
        logger.error(f"Falha ao escrever dados transformados: {e}")
        raise


def salva_s3_consumo(bucket, df):
    logger.info(f"Salvando tabela particionada para consumo...")

    #coluna criada para particionar a tabela de consumo
    df = df.withColumn("mes_pickup", month("pickup_datetime"))

    # Cria tabela particionada para consumo
    try:
        df.write \
            .partitionBy("mes_pickup") \
            .mode("overwrite") \
            .option("path", f"s3a://{bucket}/tables/taxi_consumo") \
            .saveAsTable("taxi_consumo")

        logger.info(f"Tabela taxi_consumo salva  com Sucesso!")
    except Exception as e:
        logger.error(f"Falha ao escrever tabela para consumo: {e}")
        raise


def principal():
    # Configura ambiente
    configurar_ambiente()

    # Cria seção Spark
    spark  = criar_spark_session()
    
    df = carregar_dados(spark, os.getenv("S3_BUCKET"))
    df_clean = corrigindo_nulo(df)
    salva_s3_clean(os.getenv("S3_BUCKET"), df_clean)
    salva_s3_consumo(os.getenv("S3_BUCKET"), df_clean)

if __name__ == "__main__":
    principal()