# Exploração de Dados – Camada Silver

Este notebook explora os registros de viagens limpos e validados armazenados na **Camada Silver**.  
O objetivo é avaliar a eficácia das transformações aplicadas, incluindo o tratamento de valores ausentes, regras de consistência, verificações de plausibilidade e remoção de duplicatas.  

Enquanto a Camada Bronze contém dados brutos ingeridos, a Camada Silver garante que os registros estejam padronizados, verificados quanto à qualidade e prontos para análises avançadas e agregações na Camada Gold.  

## Etapa 1 – Carregar Dados da Camada Silver

A tabela Silver é carregada em um DataFrame do Spark para validar os resultados das regras de limpeza e transformação aplicadas.  
Os primeiros registros são visualizados para confirmar a consistência.  


In [0]:
df_silver = spark.sql("SELECT * FROM tlc.tripdata.tripdata_silver")
df_silver.limit(10).display()

## Etapa 2 – Validação do Esquema da Camada Silver

Na Camada Silver, o esquema é validado em relação ao **dicionário de dados oficial da TLC** para garantir tipagem e consistência adequadas.  
O objetivo é confirmar que todas as transformações aplicadas aos dados da Bronze preservaram ou corrigiram o esquema esperado.  

- **Campos de timestamp** (ex.: `tpep_pickup_datetime`, `tpep_dropoff_datetime`) são corretamente reconhecidos como `timestamp`.  
- **Colunas numéricas** (ex.: `trip_distance`, `fare_amount`, `total_amount`, `passenger_count`) são convertidas para os tipos numéricos corretos (`int`, `double`).  
- **Códigos categóricos** (ex.: `VendorID`, `RatecodeID`, `payment_type`) são padronizados como `int`.  
- Valores ausentes ou inválidos recebem códigos padrão quando aplicável (ex.: `RatecodeID = 99`, `payment_type = 5`).  

Isso garante que a Camada Silver forneça um **esquema limpo e consistente** para o processamento subsequente na Camada Gold.  


In [0]:
df_silver.printSchema() 

## Etapa 3 – Valores Ausentes na Camada Silver

Na Camada Silver, os valores ausentes são cuidadosamente tratados para melhorar a qualidade dos dados.  

Um resumo dos valores nulos restantes por coluna é gerado para confirmar que apenas **campos não críticos** podem ainda conter dados ausentes na Camada Silver.  


In [0]:
from pyspark.sql.functions import col, sum as spark_sum, to_date, lit
missing_summary = df_silver.select([spark_sum(col(c).isNull().cast("int")).alias(c) for c in df_silver.columns])
missing_summary.display()
print(f"The dataset contains {df_silver.count()} values.")

## Etapa 5 – Verificações de Consistência da Camada Silver

As seguintes regras são aplicadas para garantir consistência lógica e de domínio:  

- **Consistência de início dos dados**: As datas devem começar após jan-2023.  
- **Consistência temporal**: O horário de desembarque deve ser sempre maior que o horário de embarque.  
- **Restrições de passageiros**: A contagem de passageiros deve estar entre 1 e 7.  
- **Distância da viagem**: Deve ser estritamente positiva.  
- **Validade financeira**: Campos como `fare_amount`, `total_amount`, `tip_amount` e outros componentes de custo devem ser não negativos, e `total_amount` deve ser maior que zero.  

Essas verificações refinam o conjunto de dados, removendo registros incompletos, implausíveis ou contraditórios antes de avançar para a Camada Gold.  


In [0]:
from pyspark.sql.functions import min as spark_min, max as spark_max
df_silver.agg(
    spark_min(col("tpep_pickup_datetime")).alias("initial_pickup_datetime"),
    spark_max(col("tpep_pickup_datetime")).alias("final_pickup_datetime")
).display()
lower_values_in = df_silver.filter(col("tpep_pickup_datetime") < to_date(lit("2023-01-01")))
lower_values_off = df_silver.filter(col("tpep_dropoff_datetime") < to_date(lit("2023-01-01")))
dropin_ = df_silver.filter(col("tpep_dropoff_datetime") < col("tpep_pickup_datetime")) 
passenger_ = df_silver.filter((col("passenger_count") <= 0) | (col("passenger_count") > 7)) 
tistances_ = df_silver.filter(col("trip_distance") <= 0) 
fate_ = df_silver.filter(col("fare_amount") < 0)
extra_ = df_silver.filter(col("extra") < 0)
mta_tax_ = df_silver.filter(col("mta_tax") < 0)
mta_tax_ = df_silver.filter(col("mta_tax") < 0)
tips_ = df_silver.filter(col("tip_amount") < 0)
tolls_amount_ = df_silver.filter(col("tolls_amount") < 0)
improvement_surcharge_ = df_silver.filter(col("improvement_surcharge") < 0)
congestion_surcharge_ = df_silver.filter(col("congestion_surcharge") < 0)
airport_fee_ = df_silver.filter(col("airport_fee") < 0)
total_amount_ = df_silver.filter(col("total_amount") < 0)
print(f"Invalid pickup/dropoff before jan-2023: {lower_values_in.count()}/{lower_values_off.count()}")
print(f"Invalid pickup/dropoff times: {dropin_.count()}")
print(f"Invalid passenger count: {passenger_.count()}")
print(f"Invalid trip distances: {tistances_.count()}")
print(f"Invalid fares: {fate_.count()}")
print(f"Invalid extras: {extra_.count()}")
print(f"Invalid mta_tax: {mta_tax_.count()}")
print(f"Invalid tips: {tips_.count()}")
print(f"Invalid tolls_amount: {tolls_amount_.count()}")
print(f"Invalid improvement_surcharge: {improvement_surcharge_.count()}")
print(f"Invalid congestion_surcharge: {congestion_surcharge_.count()}")
print(f"Invalid airport_fee: {airport_fee_.count()}")
print(f"Invalid total_amount: {total_amount_.count()}")

## Etapa 6 – Consistência Entre Campos (Camada Silver)

Garantir que a integridade financeira de cada viagem seja preservada:  

- **Validação do valor total**: Verificar se `total_amount` é igual à soma de seus componentes:  
  `fare_amount + tolls_amount + tip_amount + extra + mta_tax + improvement_surcharge + congestion_surcharge + airport_fee`.  
- **Tolerância**: Pequenas diferenças de arredondamento podem ser toleradas (ex.: dentro de ±3% de `total_amount`) para considerar pequenas discrepâncias computacionais.  

Viagens que não passarem nesta verificação de consistência devem ser sinalizadas para investigação adicional ou excluídas do conjunto de dados Silver, dependendo das regras de governança de dados.  


In [0]:
from pyspark.sql.functions import abs as spark_abs, lit, when 
columns = ["fare_amount", "tolls_amount", "tip_amount", "extra", "mta_tax", "airport_fee", "improvement_surcharge", "total_amount"]
df_financial = df_silver.select(*columns).fillna(0)
df_financial = df_silver.withColumn("total_amount_calc", col("fare_amount")+col("tolls_amount")+col("tip_amount")+col("extra")+col("mta_tax")+col("airport_fee")+col("improvement_surcharge")+col("congestion_surcharge")+col("airport_fee"))
tolerance_pct = 0.03
df_financial = df_financial.withColumn(
    "financial_consistent",
    when(col("total_amount") == 0, lit(0))
    .when(
        (spark_abs(col("total_amount_calc") - col("total_amount")) / col("total_amount")) <= tolerance_pct,
        lit(1)
    ).otherwise(lit(0))
)
financial_inconsistencies = df_financial.filter(col("financial_consistent") == 1)
financial_consistencies = df_financial.filter(col("financial_consistent") == 0)
print(f"Total financial inconsistencies: {financial_inconsistencies.count()} (tolerance: {100*tolerance_pct}%)")
print(f"Total financial valid values: {financial_consistencies.count()} (tolerance: {100*tolerance_pct}%)")
print(f"Total values: {financial_inconsistencies.count()+financial_consistencies.count()}")

## Etapa 7 – Registros Duplicados (Camada Silver)

Antes de finalizar o conjunto de dados Silver, garantir que registros de viagens duplicados sejam removidos.  

- Duplicatas são identificadas com base em atributos-chave da viagem, como:  
  `tpep_pickup_datetime`, `tpep_dropoff_datetime`, `passenger_count` e `trip_distance`.  
- Apenas registros únicos devem ser mantidos para garantir qualidade e confiabilidade dos dados em análises subsequentes.  


In [0]:
from pyspark.sql.functions import count
duplicates = (
    df_silver.groupBy("tpep_pickup_datetime","tpep_dropoff_datetime","passenger_count","trip_distance")
      .agg(count("*").alias("dup_count"))
      .filter(col("dup_count") > 1)
)
print(f"The silver dataset has {duplicates.count()} duplicated values")

# Planejamento – Camada Gold
A Camada Gold fornecerá **métricas agregadas e prontas para uso empresarial** derivadas da Camada Silver.  
O objetivo é produzir conjuntos de dados adequados para análises, relatórios e dashboards.  


In [0]:
from pyspark.sql import DataFrame
def remove_outliers_iqr(df: DataFrame, numeric_cols: list) -> DataFrame:
    for c in numeric_cols:
        quantiles = df.approxQuantile(c, [0.25, 0.75], 0.01)
        Q1, Q3 = quantiles
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        df = df.filter((col(c) >= lower_bound) & (col(c) <= upper_bound))
    return df

## Agregações Temporais na Camada Gold

Esta função agrega métricas de viagens da Camada Silver por local de embarque e período de tempo (dia, semana ou mês), criando a base da Camada Gold.  

As métricas calculadas incluem:  
- **total_trips**: número total de viagens,  
- **total_revenue**: receita total,  
- **total_fare**: tarifas totais,  
- **total_tips**: gorjetas totais,  
- **total_tolls**: pedágios totais,  
- **avg_distance**: distância média da viagem,  
- **avg_trip_time**: tempo médio da viagem,  
- **tip_percentage**: percentual médio de gorjeta em relação à receita total.  

A função adiciona colunas de ano e mês, além de uma coluna de período (dia ou semana, se aplicável), permitindo análise de padrões temporais detalhados.  
Todos os valores numéricos são arredondados para duas casas decimais para facilitar a leitura e apresentação.  


In [0]:
from pyspark.sql.functions import weekofyear, year, month, dayofweek, hour, avg, round as spark_round

# Paramters
df = df_silver
time_stamp = "day"
location_col = ["PUlocationID"]

# Remove outliers
numeric_cols = ["trip_distance", "total_amount"]
df = remove_outliers_iqr(df, numeric_cols)

# Define timestamp function
if time_stamp == "day":
    stamp_func = dayofweek
elif time_stamp == "week":
    stamp_func = weekofyear
elif time_stamp == "month":
    stamp_func = month
else:
    raise ValueError(f"Invalid stamp: {time_stamp}")

# Add year, month, and timestamp columns
df = df.withColumn("pickup_year", year(col("tpep_pickup_datetime"))) \
       .withColumn("pickup_month", month(col("tpep_pickup_datetime"))) \
       .withColumn(f"pickup_{time_stamp}", stamp_func(col("tpep_pickup_datetime")))

# Perform aggregation
agg_df = df.groupBy(
    *[col(x) for x in location_col], 
    col("pickup_year"), 
    col("pickup_month"), 
    col(f"pickup_{time_stamp}")
).agg(
    count("*").alias("total_trips"),
    spark_sum("total_amount").alias("total_revenue"),
    spark_sum("fare_amount").alias("total_fare"),
    spark_sum("tip_amount").alias("total_tips"),
    spark_sum("tolls_amount").alias("total_tolls"),
    avg("trip_distance").alias("avg_distance"),
    avg("trip_time_minutes").alias("avg_trip_time"),
)

# Calculate tip percentage
agg_df = agg_df.withColumn(
    "tip_percentage",
    when(col("total_revenue") != 0, 100 * col("total_tips") / col("total_revenue"))
    .otherwise(0)
)

# Round numeric columns
agg_df = agg_df.select(
    *location_col,
    "pickup_year",
    "pickup_month",
    f"pickup_{time_stamp}",
    "total_trips",
    spark_round("total_revenue", 2).alias("total_revenue"),
    spark_round("total_fare", 2).alias("total_fare"),
    spark_round("total_tips", 2).alias("total_tips"),
    spark_round("total_tolls", 2).alias("total_tolls"),
    spark_round("avg_distance", 2).alias("avg_distance"),
    spark_round("avg_trip_time", 2).alias("avg_trip_time"),
    spark_round("tip_percentage", 2).alias("tip_percentage")
)

df.display()
agg_df.display()

## Agregação por Faixas de Distância – Camada Gold

Nesta etapa, as viagens são classificadas em **faixas de distância** para identificar padrões de viagens curtas, médias e longas, mantendo a padronização temporal com ano e mês.  

A agregação é realizada por local de embarque, faixa de distância, ano, mês e período de tempo (dia, semana ou mês), calculando as seguintes métricas:  
- **total_trips**: número total de viagens,  
- **total_revenue**: receita total,  
- **avg_trip_time**: tempo médio de viagem,  
- **avg_tips**: gorjeta média.  

Todos os valores numéricos são arredondados para duas casas decimais para consistência e fácil interpretação, garantindo o mesmo padrão das outras tabelas da Camada Gold.  


In [0]:
df = df_silver
time_stamp = "day"
location_col = []

# Define faixas de distância
df = df.withColumn(
    "distance_bucket",
    when(col("trip_distance") <= 2, "0-2 miles")
    .when(col("trip_distance") <= 5, "2-5 miles")
    .when(col("trip_distance") <= 10, "5-10 miles")
    .when(col("trip_distance") <= 25, "10-25 miles")
    .when(col("trip_distance") <= 50, "25-50 miles")
    .when(col("trip_distance") <= 100, "50-100 miles")
    .otherwise(">100 miles")
)

# Define função de timestamp
if time_stamp == "day":
    stamp_func = dayofweek
elif time_stamp == "week":
    stamp_func = weekofyear
elif time_stamp == "month":
    stamp_func = month
else:
    raise ValueError(f"Invalid stamp: {time_stamp}")

# Adiciona colunas de ano e mês
df = df.withColumn("pickup_year", year(col("tpep_pickup_datetime"))) \
        .withColumn("pickup_month", month(col("tpep_pickup_datetime")))

# Adiciona coluna de período se não for mês
if time_stamp != "month":
    df = df.withColumn(f"pickup_{time_stamp}", stamp_func(col("tpep_pickup_datetime")))

# Define colunas de agrupamento
group_cols = [*location_col, "distance_bucket", "pickup_year", "pickup_month"]
if time_stamp != "month":
    group_cols.append(f"pickup_{time_stamp}")

# Agregação
agg_df = df.groupBy(*[col(c) for c in group_cols]).agg(
    count("*").alias("total_trips"),
    spark_sum("total_amount").alias("total_revenue"),
    avg("trip_time_minutes").alias("avg_trip_time"),
    avg("tip_amount").alias("avg_tips")
)

# Arredonda colunas numéricas
numeric_cols = ["total_revenue", "avg_trip_time", "avg_tips"]
for c in numeric_cols:
    agg_df = agg_df.withColumn(c, spark_round(col(c), 2))

agg_df.display()

## Agregação por Contagem de Passageiros – Camada Gold

Esta função agrega as métricas de viagens da Camada Silver por **contagem de passageiros** e período de tempo (semana ou mês), mantendo a padronização com a coluna **ano**.  

As métricas calculadas incluem:  
- **total_trips**: número total de viagens,  
- **total_revenue**: receita total,  
- **avg_tips**: gorjeta média,  
- **avg_trip_time**: tempo médio de viagem.  

Todos os valores numéricos são arredondados para duas casas decimais, seguindo o padrão das tabelas Gold, garantindo consistência e legibilidade para dashboards e relatórios.  
A função permite identificar padrões de viagens e receita conforme o tamanho do grupo de passageiros.  


In [0]:
df = df_silver
time_stamp = "day"
location_col = []

# Define função de timestamp
if time_stamp == "day":
    stamp_func = dayofweek
elif time_stamp == "week":
    stamp_func = weekofyear
elif time_stamp == "month":
    stamp_func = month
else:
    raise ValueError(f"Invalid stamp: {time_stamp}")

# Adiciona colunas de ano e mês
df = df.withColumn("pickup_year", year(col("tpep_pickup_datetime"))) \
        .withColumn("pickup_month", month(col("tpep_pickup_datetime"))) \
        .withColumn(f"pickup_{time_stamp}", stamp_func(col("tpep_pickup_datetime")))

# Define colunas de agrupamento
group_cols = [*location_col, "passenger_count", "pickup_year", "pickup_month", f"pickup_{time_stamp}"]

# Agregação
agg_df = df.groupBy(*[col(c) for c in group_cols]).agg(
    count("*").alias("total_trips"),
    spark_sum("total_amount").alias("total_revenue"),
    avg("tip_amount").alias("avg_tips"),
    avg("trip_time_minutes").alias("avg_trip_time")
)

# Arredonda colunas numéricas
numeric_cols = ["total_revenue", "avg_tips", "avg_trip_time"]
for c in numeric_cols:
    agg_df = agg_df.withColumn(c, spark_round(col(c), 2))

agg_df.display()