In [0]:
# MAGIC %md
# MAGIC # NYC Taxi Pipeline - Silver to Gold Aggregation
# MAGIC ### Stack Tecnologias - Desafio Técnico
# MAGIC 
# MAGIC **Objetivo**: Criar agregações analíticas otimizadas para consultas de negócio
# MAGIC 
# MAGIC **Agregações Gold:**
# MAGIC 1. Métricas por hora e localização
# MAGIC 2. Análise de receita por zona
# MAGIC 3. Tendências temporais (diário/semanal/mensal)
# MAGIC 4. KPIs de performance e qualidade

In [0]:
# MAGIC %md
# MAGIC # NYC Taxi Pipeline - Silver to Gold Aggregation
# MAGIC ### Stack Tecnologias - Desafio Técnico

# COMMAND ----------
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

# Configurações
catalog_name = "nyc_taxi_catalog"
silver_schema = "silver"
gold_schema = "gold"

silver_table = f"{catalog_name}.{silver_schema}.nyc_taxi_trips"
gold_path = "s3://nyc-taxi-gold-lucas/"

# COMMAND ----------
# Usar o catálogo correto
spark.sql(f"USE CATALOG {catalog_name}")
spark.sql(f"USE SCHEMA {gold_schema}")

# COMMAND ----------
# Ler dados da camada Silver
df_silver = spark.table(silver_table)

print(f"Registros Silver: {df_silver.count():,}")
print("Schema Silver:")
df_silver.printSchema()

# COMMAND ----------
def create_hourly_metrics():
    """
    Criar métricas agregadas por hora e coordenadas aproximadas
    """
    
    # Agregar por hora e coordenadas arredondadas
    hourly_metrics = df_silver.select(
        # Campos temporais
        col("pickup_hour"),
        col("pickup_dayofweek"),
        col("pickup_month"),
        date_format("pickup_datetime", "yyyy-MM-dd").alias("pickup_date"),
        
        # Localização aproximada
        round(col("pickup_latitude"), 2).alias("pickup_lat_rounded"),
        round(col("pickup_longitude"), 2).alias("pickup_lon_rounded"),
        
        # Métricas da viagem
        col("trip_duration_minutes"),
        col("calculated_distance_km"),
        col("fare_amount"),
        col("total_amount"),
        col("tip_amount"),
        col("passenger_count"),
        col("payment_type_desc")
    ).groupBy(
        "pickup_hour", "pickup_dayofweek", "pickup_month", "pickup_date",
        "pickup_lat_rounded", "pickup_lon_rounded"
    ).agg(
        # Contadores
        count("*").alias("total_trips"),
        countDistinct("payment_type_desc").alias("payment_types_used"),
        
        # Métricas de duração
        avg("trip_duration_minutes").alias("avg_trip_duration_min"),
        expr("percentile_approx(trip_duration_minutes, 0.5)").alias("median_trip_duration_min"),
        max("trip_duration_minutes").alias("max_trip_duration_min"),
        
        # Métricas de distância
        avg("calculated_distance_km").alias("avg_distance_km"),
        sum("calculated_distance_km").alias("total_distance_km"),
        
        # Métricas financeiras
        sum("fare_amount").alias("total_fare_amount"),
        sum("total_amount").alias("total_revenue"),
        sum("tip_amount").alias("total_tips"),
        avg("fare_amount").alias("avg_fare_amount"),
        avg("total_amount").alias("avg_total_amount"),
        
        # Métricas de passageiros
        sum("passenger_count").alias("total_passengers"),
        avg("passenger_count").alias("avg_passengers_per_trip")
    )
    
    # Adicionar colunas derivadas uma por vez
    hourly_metrics = hourly_metrics.withColumn(
        "revenue_per_km", 
        when(col("total_distance_km") > 0, col("total_revenue") / col("total_distance_km")).otherwise(0)
    )
    
    hourly_metrics = hourly_metrics.withColumn(
        "revenue_per_minute",
        when(col("avg_trip_duration_min") > 0, col("total_revenue") / col("avg_trip_duration_min")).otherwise(0)
    )
    
    hourly_metrics = hourly_metrics.withColumn(
        "time_period",
        when(col("pickup_hour").between(6, 11), "Morning")
        .when(col("pickup_hour").between(12, 17), "Afternoon") 
        .when(col("pickup_hour").between(18, 23), "Evening")
        .otherwise("Night")
    )
    
    hourly_metrics = hourly_metrics.withColumn(
        "day_type",
        when(col("pickup_dayofweek").isin([1, 7]), "Weekend")
        .otherwise("Weekday")
    )
    
    hourly_metrics = hourly_metrics.withColumn(
        "aggregated_timestamp",
        current_timestamp()
    )
    
    return hourly_metrics

# COMMAND ----------
# Criar e salvar agregação horária
df_hourly = create_hourly_metrics()

print(f"Registros na agregação horária: {df_hourly.count():,}")

# Salvar tabela Gold - Métricas Horárias
hourly_table = f"{catalog_name}.{gold_schema}.hourly_location_metrics"

df_hourly.write\
    .format("delta")\
    .mode("overwrite")\
    .option("mergeSchema", "true")\
    .option("path", f"{gold_path}hourly_metrics/")\
    .partitionBy("pickup_date", "time_period")\
    .saveAsTable(hourly_table)

print(f"✅ Tabela {hourly_table} criada com sucesso!")

# COMMAND ----------
def create_daily_revenue_metrics():
    """
    Criar métricas diárias de receita e performance
    """
    
    daily_metrics = df_silver.select(
        # Data
        date_format("pickup_datetime", "yyyy-MM-dd").alias("trip_date"),
        dayofweek("pickup_datetime").alias("day_of_week"),
        month("pickup_datetime").alias("month"),
        year("pickup_datetime").alias("year"),
        
        # Métricas
        col("fare_amount"),
        col("total_amount"),
        col("tip_amount"),
        col("trip_duration_minutes"),
        col("calculated_distance_km"),
        col("passenger_count"),
        col("payment_type_desc")
    ).groupBy("trip_date", "day_of_week", "month", "year").agg(
        # Volume
        count("*").alias("daily_trips"),
        sum("passenger_count").alias("daily_passengers"),
        
        # Receita
        sum("total_amount").alias("daily_revenue"),
        sum("fare_amount").alias("daily_fare"),
        sum("tip_amount").alias("daily_tips"),
        avg("total_amount").alias("avg_trip_value"),
        
        # Operacional
        sum("trip_duration_minutes").alias("total_minutes_driven"),
        sum("calculated_distance_km").alias("total_km_driven"),
        avg("trip_duration_minutes").alias("avg_trip_duration"),
        avg("calculated_distance_km").alias("avg_trip_distance"),
        
        # Distribuição de pagamentos
        sum(when(col("payment_type_desc") == "Credit card", 1).otherwise(0)).alias("credit_card_trips"),
        sum(when(col("payment_type_desc") == "Cash", 1).otherwise(0)).alias("cash_trips")
    )
    
    # Adicionar colunas derivadas
    daily_metrics = daily_metrics.withColumn(
        "revenue_per_km",
        when(col("total_km_driven") > 0, col("daily_revenue") / col("total_km_driven")).otherwise(0)
    )
    
    daily_metrics = daily_metrics.withColumn(
        "revenue_per_trip",
        when(col("daily_trips") > 0, col("daily_revenue") / col("daily_trips")).otherwise(0)
    )
    
    daily_metrics = daily_metrics.withColumn(
        "credit_card_percentage",
        when(col("daily_trips") > 0, (col("credit_card_trips") / col("daily_trips") * 100)).otherwise(0)
    )
    
    daily_metrics = daily_metrics.withColumn(
        "day_type",
        when(col("day_of_week").isin([1, 7]), "Weekend").otherwise("Weekday")
    )
    
    daily_metrics = daily_metrics.withColumn(
        "aggregated_timestamp",
        current_timestamp()
    )
    
    return daily_metrics

# COMMAND ----------
# Criar e salvar agregação diária
df_daily = create_daily_revenue_metrics()

print(f"Registros na agregação diária: {df_daily.count():,}")

# Salvar tabela Gold - Métricas Diárias
daily_table = f"{catalog_name}.{gold_schema}.daily_revenue_metrics"

df_daily.write\
    .format("delta")\
    .mode("overwrite")\
    .option("mergeSchema", "true")\
    .option("path", f"{gold_path}daily_metrics/")\
    .partitionBy("year", "month")\
    .saveAsTable(daily_table)

print(f"✅ Tabela {daily_table} criada com sucesso!")

# COMMAND ----------
def create_executive_kpis():
    """
    Criar KPIs executivos consolidados
    """
    
    kpis = df_silver.agg(
        # Volume
        count("*").alias("total_trips_processed"),
        countDistinct(date_format("pickup_datetime", "yyyy-MM-dd")).alias("days_of_data"),
        
        # Receita
        sum("total_amount").alias("total_revenue"),
        avg("total_amount").alias("avg_trip_value"),
        sum("tip_amount").alias("total_tips"),
        
        # Operacional
        sum("trip_duration_minutes").alias("total_minutes"),
        sum("calculated_distance_km").alias("total_kilometers"),
        avg("trip_duration_minutes").alias("avg_trip_duration"),
        avg("calculated_distance_km").alias("avg_trip_distance"),
        
        # Metadados
        min("pickup_datetime").alias("data_start_date"),
        max("pickup_datetime").alias("data_end_date")
    )
    
    # Adicionar métricas derivadas
    kpis = kpis.withColumn(
        "avg_daily_revenue",
        when(col("days_of_data") > 0, col("total_revenue") / col("days_of_data")).otherwise(0)
    )
    
    kpis = kpis.withColumn(
        "avg_daily_trips",
        when(col("days_of_data") > 0, col("total_trips_processed") / col("days_of_data")).otherwise(0)
    )
    
    kpis = kpis.withColumn(
        "kpi_calculated_at",
        current_timestamp()
    )
    
    return kpis

# COMMAND ----------
# Criar KPIs executivos
df_kpis = create_executive_kpis()

# Mostrar KPIs
print("=== KPIs EXECUTIVOS ===")
df_kpis.show(vertical=True)

# Salvar tabela Gold - KPIs Executivos
kpis_table = f"{catalog_name}.{gold_schema}.executive_kpis"

df_kpis.write\
    .format("delta")\
    .mode("overwrite")\
    .option("mergeSchema", "true")\
    .option("path", f"{gold_path}kpis/")\
    .saveAsTable(kpis_table)

print(f"✅ Tabela {kpis_table} criada com sucesso!")

# COMMAND ----------
# Validar todas as tabelas criadas
gold_tables = [
    f"{catalog_name}.{gold_schema}.hourly_location_metrics",
    f"{catalog_name}.{gold_schema}.daily_revenue_metrics", 
    f"{catalog_name}.{gold_schema}.executive_kpis"
]

print("=== VALIDAÇÃO TABELAS GOLD ===")
for table in gold_tables:
    count = spark.table(table).count()
    print(f"{table}: {count:,} registros")

# COMMAND ----------
# Exemplo de consultas analíticas
print("=== EXEMPLOS DE CONSULTAS ANALÍTICAS ===")

# Top 5 horários com maior receita
spark.sql(f"""
SELECT 
    pickup_hour,
    time_period,
    SUM(total_revenue) as revenue,
    SUM(total_trips) as trips,
    AVG(avg_total_amount) as avg_trip_value
FROM {catalog_name}.{gold_schema}.hourly_location_metrics
GROUP BY pickup_hour, time_period
ORDER BY revenue DESC
LIMIT 5
""").show()

Registros Silver: 46,385,374
Schema Silver:
root
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- rate_code_id: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- trip_duration_minutes: integer (nullable = true)
 |-- pickup_hour: integer (