In [12]:
import os
import sys
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window
import snowflake.connector

print(" Imports completados")

 Imports completados


In [13]:
SNOWFLAKE_ACCOUNT = os.getenv('SNOWFLAKE_ACCOUNT')
SNOWFLAKE_USER = os.getenv('SNOWFLAKE_USER')
SNOWFLAKE_PASSWORD = os.getenv('SNOWFLAKE_PASSWORD')
SNOWFLAKE_ROLE = os.getenv('SNOWFLAKE_ROLE', 'ACCOUNTADMIN')
SNOWFLAKE_DATABASE = os.getenv('SNOWFLAKE_DATABASE')
SNOWFLAKE_WAREHOUSE = os.getenv('SNOWFLAKE_WAREHOUSE')
SNOWFLAKE_SCHEMA_RAW = os.getenv('SNOWFLAKE_SCHEMA_RAW', 'RAW')
SNOWFLAKE_SCHEMA_ANALYTICS = os.getenv('SNOWFLAKE_SCHEMA_ANALYTICS', 'ANALYTICS')

RUN_ID = os.getenv('RUN_ID', f"obt_build_{datetime.now().strftime('%Y%m%d_%H%M%S')}")

print("CONFIGURACIÓN CONSTRUCCIÓN OBT")
print(f"RUN_ID: {RUN_ID}")
print(f"Base de datos: {SNOWFLAKE_DATABASE}")
print(f"Schema RAW: {SNOWFLAKE_SCHEMA_RAW}")
print(f"Schema ANALYTICS: {SNOWFLAKE_SCHEMA_ANALYTICS}")


CONFIGURACIÓN CONSTRUCCIÓN OBT
RUN_ID: manual
Base de datos: NYC_TLC_P03
Schema RAW: RAW
Schema ANALYTICS: ANALYTICS


In [14]:
print("\n Inicializando Spark...")

spark = SparkSession.builder \
    .appName("NYC_TLC_OBT_Construction") \
    .config("spark.jars.packages", 
            "net.snowflake:spark-snowflake_2.12:2.11.0-spark_3.3,"
            "net.snowflake:snowflake-jdbc:3.13.30") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "200") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

print(f" Spark {spark.version} inicializado")
print(f" Spark UI: http://localhost:4040")


 Inicializando Spark...
 Spark 3.5.0 inicializado
 Spark UI: http://localhost:4040


In [15]:
sfOptions_raw = {
    "sfURL": f"{SNOWFLAKE_ACCOUNT}.snowflakecomputing.com",
    "sfUser": SNOWFLAKE_USER,
    "sfPassword": SNOWFLAKE_PASSWORD,
    "sfDatabase": SNOWFLAKE_DATABASE,
    "sfSchema": SNOWFLAKE_SCHEMA_RAW,
    "sfWarehouse": SNOWFLAKE_WAREHOUSE,
    "sfRole": SNOWFLAKE_ROLE
}

# Configuración para ANALYTICS
sfOptions_analytics = {
    "sfURL": f"{SNOWFLAKE_ACCOUNT}.snowflakecomputing.com",
    "sfUser": SNOWFLAKE_USER,
    "sfPassword": SNOWFLAKE_PASSWORD,
    "sfDatabase": SNOWFLAKE_DATABASE,
    "sfSchema": SNOWFLAKE_SCHEMA_ANALYTICS,
    "sfWarehouse": SNOWFLAKE_WAREHOUSE,
    "sfRole": SNOWFLAKE_ROLE
}

print(" Configuraciones de Snowflake establecidas")


 Configuraciones de Snowflake establecidas


In [16]:
print(" CARGANDO DATOS DE RAW")

# Cargar Yellow Trips
print(" Cargando Yellow Trips...")
yellow_df = spark.read \
    .format("snowflake") \
    .options(**sfOptions_raw) \
    .option("dbtable", "YELLOW_TRIPS") \
    .load()

yellow_count = yellow_df.count()
print(f" Yellow Trips: {yellow_count:,} registros")

# Cargar Green Trips
print(" Cargando Green Trips...")
green_df = spark.read \
    .format("snowflake") \
    .options(**sfOptions_raw) \
    .option("dbtable", "GREEN_TRIPS") \
    .load()

green_count = green_df.count()
print(f" Green Trips: {green_count:,} registros")

print(f"\n Total a procesar: {yellow_count + green_count:,} registros")


 CARGANDO DATOS DE RAW
 Cargando Yellow Trips...
 Yellow Trips: 772,827,410 registros
 Cargando Green Trips...
 Green Trips: 68,045,597 registros

 Total a procesar: 840,873,007 registros


In [17]:
print(" CARGANDO TABLAS DE LOOKUP")

# Taxi Zones
print(" Cargando Taxi Zones...")
taxi_zones_df = spark.read \
    .format("snowflake") \
    .options(**sfOptions_analytics) \
    .option("dbtable", "TAXI_ZONES") \
    .load()
print(f"Taxi Zones: {taxi_zones_df.count()} zonas")

# Payment Type
print(" Cargando Payment Type...")
payment_type_df = spark.read \
    .format("snowflake") \
    .options(**sfOptions_analytics) \
    .option("dbtable", "PAYMENT_TYPE_LOOKUP") \
    .load()
print(f" Payment Types: {payment_type_df.count()} tipos")

# Rate Code
print(" Cargando Rate Code...")
rate_code_df = spark.read \
    .format("snowflake") \
    .options(**sfOptions_analytics) \
    .option("dbtable", "RATE_CODE_LOOKUP") \
    .load()
print(f" Rate Codes: {rate_code_df.count()} códigos")

# Vendor
print(" Cargando Vendor...")
vendor_df = spark.read \
    .format("snowflake") \
    .options(**sfOptions_analytics) \
    .option("dbtable", "VENDOR_LOOKUP") \
    .load()
print(f" Vendors: {vendor_df.count()} vendors")

# Trip Type
print(" Cargando Trip Type...")
trip_type_df = spark.read \
    .format("snowflake") \
    .options(**sfOptions_analytics) \
    .option("dbtable", "TRIP_TYPE_LOOKUP") \
    .load()
print(f" Trip Types: {trip_type_df.count()} tipos")

print("\n Todas las tablas de lookup cargadas")

 CARGANDO TABLAS DE LOOKUP
 Cargando Taxi Zones...
Taxi Zones: 265 zonas
 Cargando Payment Type...
 Payment Types: 6 tipos
 Cargando Rate Code...
 Rate Codes: 6 códigos
 Cargando Vendor...
 Vendors: 2 vendors
 Cargando Trip Type...
 Trip Types: 2 tipos

 Todas las tablas de lookup cargadas


In [18]:
print(" ESTANDARIZANDO YELLOW TRIPS")

yellow_standardized = yellow_df \
    .withColumnRenamed("tpep_pickup_datetime", "pickup_datetime") \
    .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime") \
    .withColumn("trip_type", lit(None).cast("integer"))  # Yellow no tiene trip_type

# Seleccionar columnas comunes
yellow_unified = yellow_standardized.select(
    "pickup_datetime",
    "dropoff_datetime",
    "PULocationID",
    "DOLocationID",
    "passenger_count",
    "trip_distance",
    "RatecodeID",
    "store_and_fwd_flag",
    "payment_type",
    "VendorID",
    "trip_type",
    "fare_amount",
    "extra",
    "mta_tax",
    "tip_amount",
    "tolls_amount",
    "improvement_surcharge",
    "total_amount",
    "congestion_surcharge",
    "Airport_fee",
    "service_type",
    "run_id",
    "source_year",
    "source_month",
    "ingested_at_utc"
)

print(f" Yellow estandarizado: {yellow_unified.count():,} registros")

 ESTANDARIZANDO YELLOW TRIPS
 Yellow estandarizado: 772,827,410 registros


In [19]:
print(" ESTANDARIZANDO GREEN TRIPS")

# Renombrar columnas de Green para estandarizar
green_standardized = green_df \
    .withColumnRenamed("lpep_pickup_datetime", "pickup_datetime") \
    .withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime") \
    .withColumn("Airport_fee", lit(None).cast("double"))  # Green no tiene airport_fee

# Seleccionar columnas comunes
green_unified = green_standardized.select(
    "pickup_datetime",
    "dropoff_datetime",
    "PULocationID",
    "DOLocationID",
    "passenger_count",
    "trip_distance",
    "RatecodeID",
    "store_and_fwd_flag",
    "payment_type",
    "VendorID",
    "trip_type",
    "fare_amount",
    "extra",
    "mta_tax",
    "tip_amount",
    "tolls_amount",
    "improvement_surcharge",
    "total_amount",
    "congestion_surcharge",
    "Airport_fee",
    "service_type",
    "run_id",
    "source_year",
    "source_month",
    "ingested_at_utc"
)

print(f" Green estandarizado: {green_unified.count():,} registros")


 ESTANDARIZANDO GREEN TRIPS
 Green estandarizado: 68,045,597 registros


In [20]:
print(" UNIFICANDO YELLOW Y GREEN")

# Union de ambos datasets
trips_unified = yellow_unified.union(green_unified)

unified_count = trips_unified.count()
print(f" Trips unificados: {unified_count:,} registros")

# Verificar distribución
print("\nDistribución por servicio:")
trips_unified.groupBy("service_type").count().show()


 UNIFICANDO YELLOW Y GREEN
 Trips unificados: 840,873,007 registros

Distribución por servicio:
+------------+---------+
|service_type|    count|
+------------+---------+
|      yellow|772827410|
|       green| 68045597|
+------------+---------+



In [21]:
print(" AGREGANDO COLUMNAS DE TIEMPO")

trips_with_time = trips_unified \
    .withColumn("pickup_date", to_date("pickup_datetime")) \
    .withColumn("pickup_hour", hour("pickup_datetime")) \
    .withColumn("dropoff_date", to_date("dropoff_datetime")) \
    .withColumn("dropoff_hour", hour("dropoff_datetime")) \
    .withColumn("day_of_week", date_format("pickup_datetime", "EEEE")) \
    .withColumn("month", month("pickup_datetime")) \
    .withColumn("year", year("pickup_datetime"))

print(" Columnas de tiempo agregadas")

 AGREGANDO COLUMNAS DE TIEMPO
 Columnas de tiempo agregadas


In [22]:



print("CALCULANDO MÉTRICAS DERIVADAS")


trips_with_metrics = trips_with_time \
    .withColumn("trip_duration_min", 
                (unix_timestamp("dropoff_datetime") - unix_timestamp("pickup_datetime")) / 60.0) \
    .withColumn("avg_speed_mph", 
                when((col("trip_duration_min") > 0) & (col("trip_distance") > 0),
                     (col("trip_distance") / col("trip_duration_min")) * 60)
                .otherwise(None)) \
    .withColumn("tip_pct",
                when((col("fare_amount") > 0) & (col("tip_amount").isNotNull()),
                     (col("tip_amount") / col("fare_amount")) * 100)
                .otherwise(0))

print(" Métricas derivadas calculadas:")
print("  - trip_duration_min: duración del viaje en minutos")
print("  - avg_speed_mph: velocidad promedio en mph")
print("  - tip_pct: porcentaje de propina sobre fare_amount")

CALCULANDO MÉTRICAS DERIVADAS
 Métricas derivadas calculadas:
  - trip_duration_min: duración del viaje en minutos
  - avg_speed_mph: velocidad promedio en mph
  - tip_pct: porcentaje de propina sobre fare_amount


In [23]:

print(" ENRIQUECIENDO CON ZONAS DE PICKUP")

# Preparar zonas de pickup
taxi_zones_pu = taxi_zones_df.select(
    col("LocationID").alias("pu_location_id_join"),
    col("Zone").alias("pu_zone"),
    col("Borough").alias("pu_borough")
)

# Join con pickup zones
trips_with_pu = trips_with_metrics.join(
    taxi_zones_pu,
    trips_with_metrics.PULocationID == taxi_zones_pu.pu_location_id_join,
    "left"
).drop("pu_location_id_join")

print(" Zonas de pickup agregadas")

 ENRIQUECIENDO CON ZONAS DE PICKUP
 Zonas de pickup agregadas


In [24]:


print(" ENRIQUECIENDO CON ZONAS DE DROPOFF")

# Preparar zonas de dropoff
taxi_zones_do = taxi_zones_df.select(
    col("LocationID").alias("do_location_id_join"),
    col("Zone").alias("do_zone"),
    col("Borough").alias("do_borough")
)

# Join con dropoff zones
trips_with_zones = trips_with_pu.join(
    taxi_zones_do,
    trips_with_pu.DOLocationID == taxi_zones_do.do_location_id_join,
    "left"
).drop("do_location_id_join")

print("Zonas de dropoff agregadas")

 ENRIQUECIENDO CON ZONAS DE DROPOFF
Zonas de dropoff agregadas


In [25]:
print(" ENRIQUECIENDO CON CATÁLOGOS")

# Join con Vendor
print(" Agregando vendor_name...")
trips_enriched = trips_with_zones.alias("t") \
    .join(
        vendor_df.select(
            col("vendor_id").alias("vendor_id_join"),
            col("vendor_name")
        ),
        col("t.VendorID") == col("vendor_id_join"),
        "left"
    ) \
    .drop("vendor_id_join")

# Join con Payment Type
print(" Agregando payment_type_desc...")
trips_enriched = trips_enriched.alias("t") \
    .join(
        payment_type_df.select(
            col("payment_type_id").alias("payment_type_id_join"),
            col("payment_type_desc")
        ),
        col("t.payment_type") == col("payment_type_id_join"),
        "left"
    ) \
    .drop("payment_type_id_join")

# Join con Rate Code
print(" Agregando rate_code_desc...")
trips_enriched = trips_enriched.alias("t") \
    .join(
        rate_code_df.select(
            col("rate_code_id").alias("rate_code_id_join"),
            col("rate_code_desc")
        ),
        col("t.RatecodeID") == col("rate_code_id_join"),
        "left"
    ) \
    .drop("rate_code_id_join")

# Join con Trip Type (solo para Green)
print(" Agregando trip_type_desc...")
trips_enriched = trips_enriched.alias("t") \
    .join(
        trip_type_df.select(
            col("trip_type_id").alias("trip_type_id_join"),
            col("trip_type_desc")
        ),
        col("t.trip_type") == col("trip_type_id_join"),
        "left"
    ) \
    .drop("trip_type_id_join")

print("Todos los catálogos agregados")

 ENRIQUECIENDO CON CATÁLOGOS
 Agregando vendor_name...
 Agregando payment_type_desc...
 Agregando rate_code_desc...
 Agregando trip_type_desc...
Todos los catálogos agregados


In [26]:


print(" CREANDO ESTRUCTURA FINAL DE LA OBT")

obt_trips = trips_enriched.select(
    
    md5(concat_ws("_", 
                  col("pickup_datetime").cast("string"),
                  col("PULocationID").cast("string"),
                  col("DOLocationID").cast("string"),
                  col("VendorID").cast("string"))).alias("trip_id"),
    
    # Tiempo
    "pickup_datetime",
    "dropoff_datetime",
    "pickup_date",
    "pickup_hour",
    "dropoff_date",
    "dropoff_hour",
    "day_of_week",
    "month",
    "year",
    
    # Ubicación Pickup
    col("PULocationID").alias("pu_location_id"),
    "pu_zone",
    "pu_borough",
    
    # Ubicación Dropoff
    col("DOLocationID").alias("do_location_id"),
    "do_zone",
    "do_borough",
    
    # Servicio y códigos
    "service_type",
    col("VendorID").alias("vendor_id"),
    "vendor_name",
    col("RatecodeID").alias("rate_code_id"),
    "rate_code_desc",
    "payment_type",
    "payment_type_desc",
    "trip_type",
    "trip_type_desc",
    
    # Viaje
    "passenger_count",
    "trip_distance",
    "store_and_fwd_flag",
    
    # Tarifas
    "fare_amount",
    "extra",
    "mta_tax",
    "tip_amount",
    "tolls_amount",
    "improvement_surcharge",
    "congestion_surcharge",
    "Airport_fee",
    "total_amount",
    
    # Métricas derivadas
    "trip_duration_min",
    "avg_speed_mph",
    "tip_pct",
    
    # Lineage y calidad
    "run_id",
    "ingested_at_utc",
    col("service_type").alias("source_service"),
    "source_year",
    "source_month"
)

print(" Estructura OBT definida")
print(f"Total columnas: {len(obt_trips.columns)}")
print(" Saltando .show() para evitar crash de memoria")
print("   (Verificaremos los datos después de escribir a Snowflake)")

 CREANDO ESTRUCTURA FINAL DE LA OBT
 Estructura OBT definida
Total columnas: 45
 Saltando .show() para evitar crash de memoria
   (Verificaremos los datos después de escribir a Snowflake)


In [27]:
print(" APLICANDO FILTROS DE CALIDAD")

initial_count = obt_trips.count()
print(f"Registros iniciales: {initial_count:,}")

# Filtrar registros con datos básicos válidos
obt_cleaned = obt_trips.filter(
    (col("pickup_datetime").isNotNull()) &
    (col("dropoff_datetime").isNotNull()) &
    (col("pu_location_id").isNotNull()) &
    (col("do_location_id").isNotNull()) &
    (col("trip_duration_min") > 0) &
    (col("trip_duration_min") < 1440) &  # Menos de 24 horas
    (col("trip_distance") >= 0) &
    (col("trip_distance") < 500) &  # Menos de 500 millas
    (col("total_amount") >= 0) &
    (col("total_amount") < 10000)  # Menos de $10,000
)

cleaned_count = obt_cleaned.count()
discarded = initial_count - cleaned_count

print(f" Registros después de limpieza: {cleaned_count:,}")
print(f" Registros descartados: {discarded:,} ({(discarded/initial_count*100):.2f}%)")


 APLICANDO FILTROS DE CALIDAD
Registros iniciales: 840,873,007
 Registros después de limpieza: 837,099,213
 Registros descartados: 3,773,794 (0.45%)


In [28]:
print(" APLICANDO IDEMPOTENCIA")

try:
    conn = snowflake.connector.connect(
        account=SNOWFLAKE_ACCOUNT,
        user=SNOWFLAKE_USER,
        password=SNOWFLAKE_PASSWORD,
        role=SNOWFLAKE_ROLE,
        warehouse=SNOWFLAKE_WAREHOUSE,
        database=SNOWFLAKE_DATABASE,
        schema=SNOWFLAKE_SCHEMA_ANALYTICS
    )
    
    cursor = conn.cursor()
    
    # Truncar tabla OBT_TRIPS si existe
    truncate_sql = "TRUNCATE TABLE IF EXISTS OBT_TRIPS"
    cursor.execute(truncate_sql)
    
    cursor.close()
    conn.close()
    
    print(" Tabla OBT_TRIPS truncada (rebuild completo)")
    
except Exception as e:
    print(f"  No se pudo truncar: {e}")
    print("  (Probablemente es la primera ejecución)")

 APLICANDO IDEMPOTENCIA
 Tabla OBT_TRIPS truncada (rebuild completo)


In [29]:
print(" ESCRIBIENDO OBT A SNOWFLAKE")
print(f" Escribiendo {cleaned_count:,} registros...")
print("   Esto puede tardar varios minutos...")

start_write = datetime.now()

obt_cleaned.write \
    .format("snowflake") \
    .options(**sfOptions_analytics) \
    .option("dbtable", "OBT_TRIPS") \
    .mode("append") \
    .save()

end_write = datetime.now()
duration_write = (end_write - start_write).total_seconds()

print(f" OBT_TRIPS creada exitosamente!")
print(f"  Tiempo de escritura: {duration_write/60:.2f} minutos")


 ESCRIBIENDO OBT A SNOWFLAKE
 Escribiendo 837,099,213 registros...
   Esto puede tardar varios minutos...
 OBT_TRIPS creada exitosamente!
  Tiempo de escritura: 192.95 minutos


In [30]:
print(" VERIFICANDO OBT EN SNOWFLAKE")

obt_count = spark.read \
    .format("snowflake") \
    .options(**sfOptions_analytics) \
    .option("query", "SELECT COUNT(*) as COUNT FROM OBT_TRIPS") \
    .load() \
    .collect()[0]['COUNT']

print(f" Registros en OBT_TRIPS: {obt_count:,}")

print("\n Distribución por servicio:")
service_dist = spark.read \
    .format("snowflake") \
    .options(**sfOptions_analytics) \
    .option("query", """
        SELECT 
            service_type,
            COUNT(*) as REGISTROS,
            ROUND(AVG(total_amount), 2) as AVG_AMOUNT
        FROM OBT_TRIPS
        GROUP BY service_type
    """) \
    .load()

service_dist.show()

# Verificar años cargados
print("\n Distribución por año:")
year_dist = spark.read \
    .format("snowflake") \
    .options(**sfOptions_analytics) \
    .option("query", """
        SELECT 
            year,
            COUNT(*) as REGISTROS
        FROM OBT_TRIPS
        GROUP BY year
        ORDER BY year
    """) \
    .load()

year_dist.show()

print("\n Verificación completada")

 VERIFICANDO OBT EN SNOWFLAKE
 Registros en OBT_TRIPS: 837,099,213

 Distribución por servicio:
+------------+---------+----------+
|SERVICE_TYPE|REGISTROS|AVG_AMOUNT|
+------------+---------+----------+
|       green| 67814741|     15.79|
|      yellow|769284472|      18.8|
+------------+---------+----------+


 Distribución por año:
+----+---------+
|YEAR|REGISTROS|
+----+---------+
|2001|       27|
|2002|      351|
|2003|       49|
|2004|        1|
|2007|        1|
|2008|      872|
|2009|     1594|
|2010|      347|
|2011|        4|
|2012|        4|
|2014|        1|
|2015|153459929|
|2016|147273641|
|2017|125039468|
|2018|111583125|
|2019| 90587494|
|2020| 26242711|
|2021| 31766496|
|2022| 40202520|
|2023| 38699863|
+----+---------+
only showing top 20 rows


 Verificación completada


In [33]:
print(" RESUMEN DE CONSTRUCCIÓN OBT")

print(f"\nOBT_TRIPS creada exitosamente")
print(f"   Total registros: {obt_count:,}")
print(f"   Tiempo total de escritura: {duration_write/60:.2f} minutos")

print(f"\n Estructura de la OBT:")
print(f"   - Grano: 1 fila = 1 viaje")
print(f"   - Servicios: Yellow + Green unificados")
print(f"   - Enriquecida con: zonas, vendors, payment types, rate codes")
print(f"   - Columnas derivadas: duración, velocidad, tip%")
print(f"   - Filtros de calidad aplicados")




 RESUMEN DE CONSTRUCCIÓN OBT

OBT_TRIPS creada exitosamente
   Total registros: 837,099,213
   Tiempo total de escritura: 192.95 minutos

 Estructura de la OBT:
   - Grano: 1 fila = 1 viaje
   - Servicios: Yellow + Green unificados
   - Enriquecida con: zonas, vendors, payment types, rate codes
   - Columnas derivadas: duración, velocidad, tip%
   - Filtros de calidad aplicados
