In [0]:
import os
import re
import unicodedata
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import lit, input_file_name, when, col, to_timestamp, date_format
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, LongType
from functools import reduce

In [0]:
datas = ["202301", "202302", "202303","202304","202305"]
bucket = "landing-layer-ifood"

In [0]:
def normalize_dataframe_columns(df: DataFrame) -> DataFrame:
    """
    Normaliza os nomes das colunas de um DataFrame PySpark.

    Operações realizadas:
    1. Remove acentos e diacríticos.
    2. Remove caracteres especiais, mantendo apenas letras, números e underscore.
    3. Converte para minúsculas.

    Parâmetros:
        df (pyspark.sql.DataFrame): DataFrame com nomes de colunas a normalizar.

    Retorna:
        pyspark.sql.DataFrame: Novo DataFrame com nomes de colunas normalizados.
    """
    def normalize(col_name):
        col_name = unicodedata.normalize('NFKD', col_name).encode('ASCII', 'ignore').decode('utf-8')
        col_name = re.sub(r'[^a-zA-Z0-9_]', '', col_name)
        return col_name.lower()

    new_column_names = [normalize(col) for col in df.columns]

    for old_name, new_name in zip(df.columns, new_column_names):
        df = df.withColumnRenamed(old_name, new_name)

    return df

def transform_data(df: DataFrame) -> DataFrame:
    """
    Transforms coded columns in the NYC Taxi dataset into human-readable descriptive fields
    and extracts cleaned pickup/dropoff date and time components, removing the original timestamp columns.

    Parameters:
        df (DataFrame): Input Spark DataFrame containing the raw taxi trip data.
                        Expected columns include: VendorID, RatecodeID, store_and_fwd_flag, payment_type,
                        tpep_pickup_datetime, tpep_dropoff_datetime.

    Returns:
        DataFrame: A new Spark DataFrame with additional descriptive and timestamp-derived columns:
                   - vendor_name
                   - rate_code_name
                   - store_and_fwd_desc
                   - payment_type_desc
                   - pickup_date
                   - pickup_time
                   - dropoff_date
                   - dropoff_time
    """
    df = (
        df
        .withColumn(
            "vendor_name",
            when(col("VendorID") == 1, "Creative Mobile Technologies, LLC")
            .when(col("VendorID") == 2, "Curb Mobility, LLC")
            .when(col("VendorID") == 6, "Myle Technologies Inc")
            .when(col("VendorID") == 7, "Helix")
            .otherwise("Unknown")
        )
        .withColumn(
            "rate_code_name",
            when(col("RatecodeID") == 1, "Standard rate")
            .when(col("RatecodeID") == 2, "JFK")
            .when(col("RatecodeID") == 3, "Newark")
            .when(col("RatecodeID") == 4, "Nassau or Westchester")
            .when(col("RatecodeID") == 5, "Negotiated fare")
            .when(col("RatecodeID") == 6, "Group ride")
            .when(col("RatecodeID") == 99, "Null/unknown")
            .otherwise("Unknown")
        )
        .withColumn(
            "store_and_fwd_desc",
            when(col("store_and_fwd_flag") == "Y", "Store and forward trip")
            .when(col("store_and_fwd_flag") == "N", "Not a store and forward trip")
            .otherwise("Unknown")
        )
        .withColumn(
            "payment_type_desc",
            when(col("payment_type") == 0, "Flex Fare trip")
            .when(col("payment_type") == 1, "Credit card")
            .when(col("payment_type") == 2, "Cash")
            .when(col("payment_type") == 3, "No charge")
            .when(col("payment_type") == 4, "Dispute")
            .when(col("payment_type") == 5, "Unknown")
            .when(col("payment_type") == 6, "Voided trip")
            .otherwise("Unknown")
        )
        .withColumn("pickup_date", date_format("tpep_pickup_datetime", "yyyy-MM-dd"))
        .withColumn("pickup_time", date_format("tpep_pickup_datetime", "HH:mm:ss"))
        .withColumn("dropoff_date", date_format("tpep_dropoff_datetime", "yyyy-MM-dd"))
        .withColumn("dropoff_time", date_format("tpep_dropoff_datetime", "HH:mm:ss"))
    )

    return df


def filter_by_pickup_datetime_range(df: DataFrame) -> DataFrame:
    """
    Filters rows where 'tpep_pickup_datetime' is between January 1st, 2023 and May 31st, 2023 (inclusive).

    Parameters:
    - df: A PySpark DataFrame containing the 'tpep_pickup_datetime' column

    Returns:
    - A filtered DataFrame with records only in the specified date range
    """

    df = df.withColumn("tpep_pickup_datetime", to_timestamp("tpep_pickup_datetime"))

    df_filtered = df.filter(
        (col("tpep_pickup_datetime") >= "2023-01-01") &
        (col("tpep_pickup_datetime") <= "2023-05-31 23:59:59")
    )

    return df_filtered

def read_files_by_dates_s3_uc_select_columns(bucket_name: str, dates: list, file_format: str = "parquet", options: dict = None):
    """
    Lê arquivos do bucket S3, normaliza colunas, e une todos os DataFrames.

    Parâmetros:
    - bucket_name: nome do bucket S3 (ex: 'landing-layer-ifood')
    - dates: lista de datas no formato 'YYYYMM' (ex: ['202301', '202302'])
    - file_format: formato dos arquivos (default 'parquet')
    - options: opções adicionais para spark.read

    Retorna:
    - DataFrame unificado contendo todas as colunas normalizadas + coluna source_file (YYYY-MM)
    """
    spark = SparkSession.builder.getOrCreate()
    if options is None:
        options = {}

    dfs = []
    for date in dates:
        path = f"s3a://{bucket_name}/yellow_tripdata_{date[:4]}-{date[4:]}.{file_format}"
        try:
            df_temp = spark.read.format(file_format).options(**options).load(path)
            df_temp = normalize_dataframe_columns(df_temp)
            df_temp = filter_by_pickup_datetime_range(df_temp)
            
            match = re.search(r"yellow_tripdata_(\d{4}-\d{2})", path)
            prefix_date = match.group(1) if match else "unknown"

            df_temp = df_temp.withColumn("source_file", lit(prefix_date))
            dfs.append(df_temp)
            print(f"✅ Arquivo carregado e normalizado: {path}")
        except Exception as e:
            print(f"❌ Erro ao ler o arquivo {path}: {e}")

    if dfs:
        df_union = reduce(lambda df1, df2: df1.unionByName(df2, allowMissingColumns=True), dfs)
        return transform_data(df_union)
    else:
        return spark.createDataFrame([], schema=None)

In [0]:
df = read_files_by_dates_s3_uc_select_columns(bucket, datas)