In [0]:
# --- 1. Importação de Bibliotecas ---
# Importamos as funções e tipos necessários do PySpark para manipulação de dados.
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, DoubleType, TimestampType

# --- 2. Definição dos Caminhos ---
# Definimos os caminhos para as zonas de landing (dados brutos) e consumo (tabela final).
# Utilizamos Volumes do Unity Catalog, que é a prática recomendada em ambientes Databricks modernos.
landing_path = "/Volumes/workspace/default/ifood_landing_zone/"

# --- 3. Leitura, Normalização e União dos Dados ---
# Para lidar com as inconsistências de esquema entre os diferentes arquivos mensais,
# lemos cada arquivo individualmente em um loop.

print("Iniciando processo de ingestão...")

files = [f.path for f in dbutils.fs.ls(landing_path) if f.path.endswith(".parquet")]
final_df = None

for file in files:
    print(f"📂 Processando arquivo: {file.split('/')[-1]}")
    
    # Lê o arquivo Parquet individualmente
    df = spark.read.format("parquet").load(file)
    
    # Normaliza todos os nomes de colunas para minúsculas para evitar conflitos
    for old_name in df.columns:
        df = df.withColumnRenamed(old_name, old_name.lower())
    
    # Seleciona e converte os tipos das colunas necessárias
    df = df.select(
        col("vendorid").cast(IntegerType()).alias("vendor_id"),
        col("tpep_pickup_datetime").cast(TimestampType()).alias("pickup_datetime"),
        col("tpep_dropoff_datetime").cast(TimestampType()).alias("dropoff_datetime"),
        col("passenger_count").cast(IntegerType()).alias("passenger_count"),
        col("total_amount").cast(DoubleType()).alias("total_amount")
    )
    
    # Une o DataFrame processado ao DataFrame final
    if final_df is None:
        final_df = df
    else:
        final_df = final_df.unionByName(df)

print("\nLeitura e união de todos os arquivos concluída.")

# --- 4. Limpeza dos Dados ---
# Aplicamos as regras de negócio para garantir a qualidade dos dados,
# removendo registros com valores inconsistentes.
cleaned_df = final_df.filter(
    (col("passenger_count") > 0) &
    (col("total_amount") >= 0) &
    col("vendor_id").isNotNull()
)

# --- 5. Carregamento para a Camada de Consumo ---
# Salvamos o DataFrame limpo como uma tabela Delta gerenciada.
# O comando .saveAsTable() é a forma mais robusta e compatível com o Unity Catalog,
# pois ele gerencia automaticamente a localização física dos dados.

print("⚙️ Criando schema 'ifood_challenge' (se não existir)...")
spark.sql("CREATE SCHEMA IF NOT EXISTS ifood_challenge")

print("💾 Salvando dados como tabela gerenciada...")
cleaned_df.write.format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("ifood_challenge.yellow_taxi_trips")

print("✅ Tabela 'ifood_challenge.yellow_taxi_trips' criada com sucesso!")

# --- 6. Verificação ---
# Realizamos uma consulta simples para verificar se a tabela foi criada
# corretamente e se os dados estão acessíveis via SQL.
print("\n📊 Verificando os 10 primeiros registros da tabela criada...")
display(spark.sql("SELECT * FROM ifood_challenge.yellow_taxi_trips LIMIT 10"))



Iniciando processo de ingestão...
📂 Processando arquivo: yellow_tripdata_2023-01.parquet
📂 Processando arquivo: yellow_tripdata_2023-02.parquet
📂 Processando arquivo: yellow_tripdata_2023-03.parquet
📂 Processando arquivo: yellow_tripdata_2023-04.parquet
📂 Processando arquivo: yellow_tripdata_2023-05.parquet

Leitura e união de todos os arquivos concluída.
⚙️ Criando schema 'ifood_challenge' (se não existir)...
💾 Salvando dados como tabela gerenciada...
✅ Tabela 'ifood_challenge.yellow_taxi_trips' criada com sucesso!

📊 Verificando os 10 primeiros registros da tabela criada...


vendor_id,pickup_datetime,dropoff_datetime,passenger_count,total_amount
2,2023-01-01T00:32:10.000Z,2023-01-01T00:40:36.000Z,1,14.3
2,2023-01-01T00:55:08.000Z,2023-01-01T01:01:27.000Z,1,16.9
2,2023-01-01T00:25:04.000Z,2023-01-01T00:37:49.000Z,1,34.9
2,2023-01-01T00:10:29.000Z,2023-01-01T00:21:19.000Z,1,19.68
2,2023-01-01T00:50:34.000Z,2023-01-01T01:02:52.000Z,1,27.8
2,2023-01-01T00:09:22.000Z,2023-01-01T00:19:49.000Z,1,20.52
2,2023-01-01T00:27:12.000Z,2023-01-01T00:49:56.000Z,1,64.44
2,2023-01-01T00:21:44.000Z,2023-01-01T00:36:40.000Z,1,28.38
2,2023-01-01T00:39:42.000Z,2023-01-01T00:50:36.000Z,1,19.9
2,2023-01-01T00:53:01.000Z,2023-01-01T01:01:45.000Z,1,19.68
