Imports

In [0]:
from pyspark.sql import functions as F, types as T
from dataclasses import dataclass, field
from typing import FrozenSet, Tuple
from functools import reduce
import re
from helpers import BRONZE, TAXI_TYPES

Functions

### 🧹 Leitura e padronização dos dados brutos (camada RAW → BRONZE)

Funções auxiliares para:

- 🔁 **Renomear colunas** para o padrão *snake_case* e uniformizar nomes como `airport_fee` e timestamps;
- 🧼 **Normalizar colunas de data/hora**, ajustando possíveis variações entre datasets (`lpep_*` → `tpep_*`);
- 📁 **Recursivamente listar e ler arquivos `.parquet`** do volume raw;
- ⚠️ **Validar colunas obrigatórias** e mover para a **quarentena** os arquivos com problemas.

🎯 **Objetivo**: padronizar a estrutura e garantir qualidade mínima antes de promover os dados para a camada Bronze.


In [0]:
def to_snake(col_name:str) -> str:
    name = re.sub(r'(.)([A-Z][a-z]+)',  r'\1_\2', col_name)
    name = re.sub(r'([a-z0-9])([A-Z])', r'\1_\2', name)
    return name.lower()

def list_parquet_files(path):
    out=[]
    for e in dbutils.fs.ls(path):
        if e.isDir():  out.extend(list_parquet_files(e.path))
        elif e.path.endswith(".parquet"): out.append(e.path)
    return out

def normalize_datetime_cols(df):
    mappings = {
        "lpep_pickup_datetime" : "tpep_pickup_datetime",
        "lpep_dropoff_datetime": "tpep_dropoff_datetime",
    }
    for src, dst in mappings.items():
        if src in df.columns and dst not in df.columns:
            df = df.withColumnRenamed(src, dst)

    return df

def load_one(p):
    df = spark.read.parquet(p)

    for old in df.columns:
        new = to_snake(old)
        if new != old:
            df = df.withColumnRenamed(old, new)

    if "airport_fee" not in df.columns and "airportfee" in df.columns:
        df = df.withColumnRenamed("airportfee","airport_fee")

    for c in set(BRONZE.COLS_FORCE_DOUBLE).intersection(df.columns):
        df = df.withColumn(c, F.col(c).cast(T.DoubleType()))

    df = normalize_datetime_cols(df)
    
    if "tpep_pickup_datetime" in df.columns:
        df = (df
               .withColumn("trip_year",  F.year("tpep_pickup_datetime"))
               .withColumn("trip_month", F.month("tpep_pickup_datetime")))
    return df

def safe_read(path:str):
    try:
        df = load_one(path)
        missing = [c for c in BRONZE.REQUIRED_COLS if c not in df.columns]
        if missing:
            raise ValueError(f"faltando cols {missing}")
        return df.withColumn("_source_file", F.input_file_name())
    except Exception as e:
        print("Arquivo em quarentena:", path.split('/')[-1], "—", e)
        dbutils.fs.mv(path, f"{BRONZE.QUARANTINE}/{path.split('/')[-1]}")
        return None

### 🪙 Construção da camada Bronze (dados padronizados e validados)

Este trecho:

- 🔁 Itera sobre os tipos de táxi e carrega os arquivos válidos da camada `raw`;
- 🧪 **Valida e concatena** os dados, aplicando constraints como:
  - `total_amount >= 0`
  - `pickup <= dropoff`
  - `trip_year = 2023`, entre outros;
- 🗃️ Escreve os dados em formato **Delta Lake**, particionando por `trip_year` e `trip_month`;
- ♻️ Arquivos inválidos (schema inconsistente, colunas faltantes) são movidos para **quarentena**.

🎯 **Objetivo**: garantir que apenas dados minimamente confiáveis avancem para análises e transformações mais refinadas.


In [0]:
for tt in TAXI_TYPES:
    src_dir = f"{BRONZE.RAW_ROOT}/taxi_type={tt}"
    files   = list_parquet_files(src_dir)

    if not files:
        print(f"Nenhum arquivo para {tt}")
        continue

    print(f"Bronze {tt} – {len(files)} arquivos")

    dfs = [safe_read(p) for p in files]
    dfs = [d for d in dfs if d is not None]
    if not dfs:
        print(f"todos os arquivos de {tt} caíram na quarentena")
        continue

    bronze_df = reduce(lambda d1, d2:
                       d1.unionByName(d2, allowMissingColumns=True), dfs)

    bronze_df = (bronze_df
                 .withColumn("ingestion_ts", F.current_timestamp())
                 .withColumn("trip_year",    F.year("tpep_pickup_datetime"))
                 .withColumn("trip_month",   F.month("tpep_pickup_datetime")))

    for c in bronze_df.columns:
        snake = to_snake(c)
        if c != snake:
            bronze_df = bronze_df.withColumnRenamed(c, snake)

    invalid_cond = (
        (F.col("total_amount") < 0) |
        (F.col("tpep_pickup_datetime") > F.col("tpep_dropoff_datetime")) |
        (F.col("trip_year")  != 2023) |
        (~F.col("trip_month").between(1, 12))
    )

    invalid_df = bronze_df.filter(invalid_cond)
    if invalid_df.head(1):                                    
        (invalid_df.write
            .mode("append")
            .format("delta")
            .option("mergeSchema", "true")
            .save(f"{BRONZE.QUARANTINE}/{tt}"))
        print(f"{invalid_df.count()} linhas quarentenadas")

    bronze_df = bronze_df.filter(~invalid_cond)               

    tbl = f"{BRONZE.SCHEMA}.{tt}_tripdata_bronze"

    (bronze_df.write
        .format("delta")
        .mode("overwrite")
        .partitionBy("trip_year", "trip_month")
        .option("overwriteSchema", "true")
        .option("delta.feature.checkConstraints", "supported")
        .option("delta.constraints.total_amount_nonnegative",
                "total_amount >= 0")
        .option("delta.constraints.pickup_before_dropoff",
                "tpep_pickup_datetime <= tpep_dropoff_datetime")
        .option("delta.constraints.valid_year",  "trip_year = 2023")
        .option("delta.constraints.valid_month", "trip_month BETWEEN 1 AND 12")
        .saveAsTable(tbl))

    print("Arquivo gravado —", spark.table(tbl).count(), "linhas")

print("Camada Bronze concluída")