In [None]:
%sql
USE CATALOG workspace;
USE SCHEMA case_ifood;

CREATE VOLUME IF NOT EXISTS raw_taxi_data;

In [None]:
import requests
import os

# --- 1. Configura√ß√£o ---
TAXI_TYPE = "yellow"
YEAR = 2023
MONTHS_LIST = list(range(1, 6))
BASE_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data/{type}_tripdata_{year}-{month:02d}.parquet"

# --- 2. Defini√ß√£o do Caminho do Volume ---
# Este √© o novo caminho padr√£o para acessar o Volume que criamos.
# √â um caminho de sistema de arquivos simples e direto.
CATALOG_NAME = "workspace"
SCHEMA_NAME = "case_ifood"
VOLUME_NAME = "raw_taxi_data"

VOLUME_PATH = f"/Volumes/{CATALOG_NAME}/{SCHEMA_NAME}/{VOLUME_NAME}/{YEAR}/"

print(f"Diret√≥rio de destino no Volume: {VOLUME_PATH}")

# Cria o subdiret√≥rio do ano dentro do volume se ele n√£o existir
os.makedirs(VOLUME_PATH, exist_ok=True)
print("‚úÖ Diret√≥rio de destino garantido.")

# --- 3. Loop de Download ---
for month in MONTHS_LIST:
    file_name = f"{TAXI_TYPE}_tripdata_{YEAR}-{month:02d}.parquet"
    file_path = os.path.join(VOLUME_PATH, file_name)
    url = BASE_URL.format(type=TAXI_TYPE, year=YEAR, month=month)
    
    # Verifica se o arquivo j√° existe para n√£o baixar novamente
    if os.path.exists(file_path):
        print(f"üü° Arquivo '{file_name}' j√° existe. Pulando.")
        continue
        
    try:
        print(f"üîÑ Baixando '{file_name}'...")
        
        # Faz o download de forma eficiente usando bibliotecas padr√£o
        with requests.get(url, stream=True) as r:
            r.raise_for_status()
            with open(file_path, 'wb') as f:
                for chunk in r.iter_content(chunk_size=8192): 
                    f.write(chunk)
        
        print(f"‚úÖ Arquivo '{file_name}' salvo com sucesso em {file_path}")
        
    except Exception as e:
        print(f"‚ùå ERRO ao baixar o m√™s {month}: {e}")

print("\n‚ú® Etapa de download para o Volume do Unity Catalog conclu√≠da.")

In [None]:
from pyspark.sql.functions import col, lit
from pyspark.sql.types import StringType
from pyspark.sql import DataFrame

# --- 1. Configura√ß√£o ---
CATALOG_NAME = "workspace"
SCHEMA_NAME = "case_ifood"
VOLUME_NAME = "raw_taxi_data"
YEAR = "2023"

SOURCE_DIR = f"/Volumes/{CATALOG_NAME}/{SCHEMA_NAME}/{VOLUME_NAME}/{YEAR}/"
TABLE_NAME = f"{CATALOG_NAME}.{SCHEMA_NAME}.bronze"

# --- 2. Defini√ß√£o do Esquema Fixo e Normalizado (Contrato de Dados) ---
official_columns_lower = [
    "vendorid", "tpep_pickup_datetime", "tpep_dropoff_datetime",
    "passenger_count", "trip_distance", "ratecodeid", "store_and_fwd_flag",
    "pulocationid", "dolocationid", "payment_type", "fare_amount", "extra",
    "mta_tax", "tip_amount", "tolls_amount", "improvement_surcharge",
    "total_amount", "congestion_surcharge", "airport_fee", "cbd_congestion_fee"
]
print("‚úÖ Contrato de dados com a lista oficial de colunas (em min√∫sculas) definido.")

# --- 3. Processamento Robusto: Loop, Conformidade e Uni√£o ---
try:
    files_to_process = [f.path for f in dbutils.fs.ls(SOURCE_DIR) if f.path.endswith('.parquet')]
    list_of_conformed_dfs = []

    print(f"Encontrados {len(files_to_process)} arquivos para processar.")

    for file_path in files_to_process:
        print(f"  - Processando arquivo: {file_path}")
        temp_df = spark.read.parquet(file_path)
        temp_df_lower = temp_df.toDF(*[c.lower() for c in temp_df.columns])
        
        select_exprs = []
        temp_df_columns_set = set(temp_df_lower.columns)
        
        for column_name in official_columns_lower:
            if column_name in temp_df_columns_set:
                select_exprs.append(col(column_name).cast(StringType()).alias(column_name))
            else:
                select_exprs.append(lit(None).cast(StringType()).alias(column_name))
        
        conformed_df = temp_df_lower.select(select_exprs)
        list_of_conformed_dfs.append(conformed_df)

    print("\n‚úÖ Todos os arquivos foram lidos e conformados ao schema oficial.")

    # c. Unir todos os DataFrames (vers√£o compat√≠vel com Spark Connect)
    if not list_of_conformed_dfs:
        raise ValueError("Nenhum DataFrame para unir. Verifique se os arquivos existem.")
    
    # Inicializa o DataFrame final com o primeiro da lista
    bronze_df = list_of_conformed_dfs[0]

    # Faz a uni√£o com os DataFrames restantes em um loop 'for' padr√£o
    for i in range(1, len(list_of_conformed_dfs)):
        bronze_df = bronze_df.unionByName(list_of_conformed_dfs[i])

    print("\n‚úÖ Todos os DataFrames foram unidos com sucesso.")
    
    # --- 4. Escrita na Camada Bronze ---
    (
        bronze_df.write
        .format("delta")
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .saveAsTable(TABLE_NAME)
    )
    
    print(f"\n‚ú® Sucesso! Dados processados e salvos na tabela '{TABLE_NAME}'.")

except Exception as e:
    print(f"‚ùå ERRO durante o processamento com Spark: {e}")
    raise e