In [0]:
from pyspark.sql.functions import col, round, year, month
from pyspark.sql import SparkSession

### Função processar_df_taxi

A função utilitária `processar_df_taxi` é responsável por limpar, filtrar e padronizar os dados dos táxis Yellow/Green para a camada Silver:

- Remove registros nulos em colunas obrigatórias
- Filtra viagens inválidas (ex: valores negativos)
- Faz cast dos tipos e arredondamento dos valores
- Renomeei VendorID => vendor_id para manter a nomenclatura, assim como outras colunas que também estavam em camel case
- filtra todo mundo que está em 2023, entre os meses janeiro e maio (percebi que no dataset tinha alguns anos diferentes)

In [0]:
def processar_df_taxi(df):
    if 'tpep_pickup_datetime' in df.columns:
        pickup_col = 'tpep_pickup_datetime'
        dropoff_col = 'tpep_dropoff_datetime'
    elif 'lpep_pickup_datetime' in df.columns:
        pickup_col = 'lpep_pickup_datetime'
        dropoff_col = 'lpep_dropoff_datetime'
    else:
        raise ValueError("Colunas de data/hora não encontradas no DataFrame.")

    # Padroniza nomes das colunas para facilitar análises futuras
    df_proc = (
        df
        .withColumnRenamed(pickup_col, "pickup_datetime")
        .withColumnRenamed(dropoff_col, "dropoff_datetime")
        .dropna(subset=["VendorID", "passenger_count", "total_amount", "pickup_datetime", "dropoff_datetime", "trip_distance"])
        .filter((year("pickup_datetime") == 2023) & (month("pickup_datetime").between(1, 5)))
        .filter(col("passenger_count") >= 1)
        .filter(col("trip_distance") > 0)
        .filter(col("total_amount") > 0)
        .filter(col("dropoff_datetime") > col("pickup_datetime"))
        .withColumn("VendorID", col("VendorID").cast("integer"))
        .withColumn("passenger_count", col("passenger_count").cast("integer"))
        .withColumn("total_amount", round(col("total_amount").cast("double"), 2).cast("double"))
        .withColumn("trip_distance", round(col("trip_distance").cast("double"), 2).cast("double"))
        .withColumn("pickup_datetime", col("pickup_datetime").cast("timestamp"))
        .withColumn("dropoff_datetime", col("dropoff_datetime").cast("timestamp"))
        .withColumnRenamed("VendorID", "vendor_id")
        .withColumnRenamed("RatecodeID", "ratecode_id")
        .withColumnRenamed("PULocationID", "pu_location_id")
        .withColumnRenamed("DOLocationID", "do_location_id")

    )
    return df_proc

### Criação do Schema Silver

Garante que o schema (database) silver está disponível para persistência das tabelas tratadas.

In [0]:
# Cria o schema silver caso não exista
spark.sql("CREATE DATABASE IF NOT EXISTS silver")

Out[99]: DataFrame[]

### Leitura dos Dados Bronze e Processamento

Nesta etapa, são lidos os dados da camada bronze e aplicadas as regras de limpeza, padronização e filtragem dos registros, preparando os dados para análises e consumo pela camada Silver.

In [0]:
# Leitura dos dados bronze
df_bronze_yellow = spark.read.table("bronze.yellow_tripdata")
df_bronze_green = spark.read.table("bronze.green_tripdata")

In [0]:
# Limpeza e transformação
df_silver_yellow = processar_df_taxi(df_bronze_yellow)
df_silver_green = processar_df_taxi(df_bronze_green)

### Persistência e Registro das Tabelas Silver

Os dados tratados são salvos no formato Delta Lake e registrados como tabelas no schema Silver, facilitando o consumo posterior por SQL ou ferramentas de BI.


In [0]:
# Caminhos da camada silver
silver_base = "/FileStore/tables/silver"
silver_yellow = f"{silver_base}/yellow_tripdata"
silver_green = f"{silver_base}/green_tripdata"

# Salva como Delta
df_silver_yellow.write.format("delta").mode("overwrite").save(silver_yellow)
df_silver_green.write.format("delta").mode("overwrite").save(silver_green)

# Registra tabelas no schema silver
spark.sql("DROP TABLE IF EXISTS silver.yellow_tripdata")
spark.sql("DROP TABLE IF EXISTS silver.green_tripdata")
spark.sql(f"CREATE TABLE silver.yellow_tripdata USING DELTA LOCATION '{silver_yellow}'")
spark.sql(f"CREATE TABLE silver.green_tripdata USING DELTA LOCATION '{silver_green}'")

Out[103]: DataFrame[]

## Validação Final

Aqui validamos o registro das tabelas e mostramos um exemplo de consulta SQL nos dados finais da camada Silver.

In [0]:
# Validação final das tabelas Silver
total_yellow = spark.read.table("silver.yellow_tripdata").count()
total_green = spark.read.table("silver.green_tripdata").count()
print(f"Linhas Yellow Silver: {total_yellow}")
print(f"Linhas Green Silver: {total_green}")

# Exemplo de consulta SQL
display(spark.sql("SELECT * FROM silver.yellow_tripdata LIMIT 10"))

Linhas Yellow Silver: 15167894
Linhas Green Silver: 298406


vendor_id,pickup_datetime,dropoff_datetime,passenger_count,trip_distance,ratecode_id,store_and_fwd_flag,pu_location_id,do_location_id,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
2,2023-01-01T00:32:10.000+0000,2023-01-01T00:40:36.000+0000,1,0.97,1.0,N,161,141,2,9.3,1.0,0.5,0.0,0.0,1.0,14.3,2.5,0.0
2,2023-01-01T00:55:08.000+0000,2023-01-01T01:01:27.000+0000,1,1.1,1.0,N,43,237,1,7.9,1.0,0.5,4.0,0.0,1.0,16.9,2.5,0.0
2,2023-01-01T00:25:04.000+0000,2023-01-01T00:37:49.000+0000,1,2.51,1.0,N,48,238,1,14.9,1.0,0.5,15.0,0.0,1.0,34.9,2.5,0.0
2,2023-01-01T00:10:29.000+0000,2023-01-01T00:21:19.000+0000,1,1.43,1.0,N,107,79,1,11.4,1.0,0.5,3.28,0.0,1.0,19.68,2.5,0.0
2,2023-01-01T00:50:34.000+0000,2023-01-01T01:02:52.000+0000,1,1.84,1.0,N,161,137,1,12.8,1.0,0.5,10.0,0.0,1.0,27.8,2.5,0.0
2,2023-01-01T00:09:22.000+0000,2023-01-01T00:19:49.000+0000,1,1.66,1.0,N,239,143,1,12.1,1.0,0.5,3.42,0.0,1.0,20.52,2.5,0.0
2,2023-01-01T00:27:12.000+0000,2023-01-01T00:49:56.000+0000,1,11.7,1.0,N,142,200,1,45.7,1.0,0.5,10.74,3.0,1.0,64.44,2.5,0.0
2,2023-01-01T00:21:44.000+0000,2023-01-01T00:36:40.000+0000,1,2.95,1.0,N,164,236,1,17.7,1.0,0.5,5.68,0.0,1.0,28.38,2.5,0.0
2,2023-01-01T00:39:42.000+0000,2023-01-01T00:50:36.000+0000,1,3.01,1.0,N,141,107,2,14.9,1.0,0.5,0.0,0.0,1.0,19.9,2.5,0.0
2,2023-01-01T00:53:01.000+0000,2023-01-01T01:01:45.000+0000,1,1.8,1.0,N,234,68,1,11.4,1.0,0.5,3.28,0.0,1.0,19.68,2.5,0.0
