In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_timestamp
from pyspark.sql.types import *
from datetime import datetime
import urllib.request
import json

print("Imports completados.")

Imports completados.


In [4]:
# Variables de conexion a PostgreSQL
POSTGRES_HOST = os.getenv("POSTGRES_HOST")
POSTGRES_PORT = os.getenv("POSTGRES_PORT")
POSTGRES_DB = os.getenv("POSTGRES_DB")
POSTGRES_USER = os.getenv("POSTGRES_USER")
POSTGRES_PASSWORD = os.getenv("POSTGRES_PASSWORD")
PG_SCHEMA_RAW = os.getenv("PG_SCHEMA_RAW")

# Variables de ingesta
SOURCE_BASE = os.getenv("SOURCE_BASE")
TAXI_ZONE_URL = os.getenv("TAXI_ZONE_URL")
START_YEAR = int(os.getenv("START_YEAR"))
END_YEAR = int(os.getenv("END_YEAR"))
SERVICES = os.getenv("SERVICES").split(",")
RUN_ID = os.getenv("RUN_ID")

# Archivo de checkpoint
CHECKPOINT_FILE = "/home/jovyan/work/checkpoint_ingesta.json"

# URL JDBC
JDBC_URL = f"jdbc:postgresql://{POSTGRES_HOST}:{POSTGRES_PORT}/{POSTGRES_DB}"

print("Variables de ambiente cargadas:")
print(f" - Schema RAW.")
print(f" - Periodo: {START_YEAR}-{END_YEAR}")
print(f" - Servicios: {SERVICES}")
print(f" - Checkpoint: {CHECKPOINT_FILE}")

Variables de ambiente cargadas:
 - Schema RAW.
 - Periodo: 2015-2025
 - Servicios: ['yellow', 'green']
 - Checkpoint: /home/jovyan/work/checkpoint_ingesta.json


In [5]:
# Descargar driver JDBC de PostgreSQL si no existe
jar_dir = "/home/jovyan/jars"
os.makedirs(jar_dir, exist_ok=True)

postgres_jar_url = "https://jdbc.postgresql.org/download/postgresql-42.7.1.jar"
postgres_jar_path = f"{jar_dir}/postgresql-42.7.1.jar"

if not os.path.exists(postgres_jar_path):
    print("Descargando PostgreSQL JDBC driver...")
    urllib.request.urlretrieve(postgres_jar_url, postgres_jar_path)
    print(f"Driver descargado: {postgres_jar_path}")
else:
    print(f"Driver ya existe: {postgres_jar_path}")

Descargando PostgreSQL JDBC driver...
Driver descargado: /home/jovyan/jars/postgresql-42.7.1.jar


In [6]:
# Crear SparkSession con configuraciones optimizadas
spark = SparkSession.builder \
    .appName("NYC_TLC_to_Postgres") \
    .config("spark.jars", postgres_jar_path) \
    .config("spark.driver.memory", "3g") \
    .config("spark.executor.memory", "2g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.shuffle.partitions", "100") \
    .config("spark.driver.maxResultSize", "2g") \
    .config("spark.sql.session.timeZone", "UTC") \
    .getOrCreate()

print("SparkSession creada:")
print(f" - Spark Version: {spark.version}")

SparkSession creada:
 - Spark Version: 3.5.0


In [7]:
# Funciones para manejar el sistema de checkpoint

def load_checkpoint():
    # Carga el estado del checkpoint desde archivo JSON.
    if os.path.exists(CHECKPOINT_FILE):
        try:
            with open(CHECKPOINT_FILE, 'r') as f:
                data = json.load(f)
                print(f"Checkpoint cargado: {len(data)} archivos registrados.")
                return data
        except Exception as e:
            print(f"Error leyendo checkpoint: {e}")
            return {}
    print("Checkpoint nuevo (sin archivos previos).")
    return {}

def save_checkpoint(checkpoint_data):
    # Guarda el estado del checkpoint.
    try:
        with open(CHECKPOINT_FILE, 'w') as f:
            json.dump(checkpoint_data, f, indent=2)
        return True
    except Exception as e:
        print(f"Error guardando checkpoint: {e}")
        return False

def mark_processed(checkpoint_data, service, year, month, status, rows=None, time_sec=None):
    # Marca un archivo como procesado en el checkpoint.
    key = f"{service}_{year}_{month:02d}"
    checkpoint_data[key] = {
        'status': status,
        'timestamp': datetime.now().isoformat(),
        'run_id': RUN_ID,
        'rows': rows,
        'time_seconds': time_sec
    }
    save_checkpoint(checkpoint_data)

def is_already_processed(checkpoint_data, service, year, month):
    # Verifica si un archivo ya fue procesado exitosamente.
    key = f"{service}_{year}_{month:02d}"
    if key in checkpoint_data:
        info = checkpoint_data[key]
        if info['status'] == 'SUCCESS':
            return True, info
    return False, None

print("Funciones de checkpoint definidas.")

Funciones de checkpoint definidas.


In [8]:
# Ingestar tabla de zonas de taxi (overwrite cada vez)
print("Ingiriendo taxi_zone_lookup...")

try:
    import pandas as pd
    
    print(f" - Descargando desde: {TAXI_ZONE_URL}")
    pdf_zones = pd.read_csv(TAXI_ZONE_URL)
    
    print(f" - Total filas: {len(pdf_zones)}")
    
    df_zones = spark.createDataFrame(pdf_zones)
    
    df_zones = df_zones.withColumn("ingested_at_utc", current_timestamp()) \
                       .withColumn("run_id", lit(RUN_ID))
    
    df_zones.write \
        .format("jdbc") \
        .option("url", JDBC_URL) \
        .option("dbtable", f"{PG_SCHEMA_RAW}.taxi_zone_lookup") \
        .option("user", POSTGRES_USER) \
        .option("password", POSTGRES_PASSWORD) \
        .option("driver", "org.postgresql.Driver") \
        .mode("overwrite") \
        .save()

    
    
    print("taxi_zone_lookup ingresado correctamente.\n")
    
except Exception as e:
    print(f"Error en taxi_zone_lookup: {str(e)}\n")

Ingiriendo taxi_zone_lookup...
 - Descargando desde: https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv
 - Total filas: 265
taxi_zone_lookup ingresado correctamente.



In [9]:
# Función para ingerir un mes específico con idempotencia

def ingest_month(service, year, month, checkpoint_data):
    """
    Ingiere un mes de Yellow o Green taxi con verificación de checkpoint.
    
    Args:
        service: 'yellow' o 'green'
        year: año
        month: mes
        checkpoint_data: diccionario con estado del checkpoint
    
    Returns:
        'SUCCESS', 'SKIPPED', 'NOT_FOUND', 'ERROR'
    """
    start_time = datetime.now()
    
    # Verificar si ya fue procesado
    already_done, info = is_already_processed(checkpoint_data, service, year, month)
    if already_done:
        print(f"  {service} {year}-{month:02d}: Ya procesado (SKIP) "
              f"[{info.get('rows', 'N/A')} filas, {info.get('time_seconds', 'N/A')}s]")
        return 'SKIPPED'
    
    filename = f"{service}_tripdata_{year}-{month:02d}.parquet"
    url = f"{SOURCE_BASE}/{filename}"
    local_path = f"/tmp/{filename}"
    table_name = f"{PG_SCHEMA_RAW}.{service}_taxi_trip"
    
    print(f"Procesando: {service} {year}-{month:02d}")
    
    try:
        # Descargar archivo
        print(f"  Descargando: {url}")
        urllib.request.urlretrieve(url, local_path)
        
        # Leer Parquet
        df = spark.read.parquet(local_path)
        
        # Estandarizar timestamps
        if service == 'yellow':
            if 'tpep_pickup_datetime' in df.columns:
                df = df.withColumn('tpep_pickup_datetime', 
                                 col('tpep_pickup_datetime').cast('timestamp'))
            if 'tpep_dropoff_datetime' in df.columns:
                df = df.withColumn('tpep_dropoff_datetime', 
                                 col('tpep_dropoff_datetime').cast('timestamp'))
        elif service == 'green':
            if 'lpep_pickup_datetime' in df.columns:
                df = df.withColumn('lpep_pickup_datetime', 
                                 col('lpep_pickup_datetime').cast('timestamp'))
            if 'lpep_dropoff_datetime' in df.columns:
                df = df.withColumn('lpep_dropoff_datetime', 
                                 col('lpep_dropoff_datetime').cast('timestamp'))
        
        # Agregar metadatos
        df = df.withColumn("run_id", lit(RUN_ID)) \
               .withColumn("source_year", lit(year)) \
               .withColumn("source_month", lit(month)) \
               .withColumn("ingested_at_utc", current_timestamp())
        
        count = df.count()
        print(f"  Filas leidas: {count:,}")
        
        # Escribir a Postgres
        df.write \
            .format("jdbc") \
            .option("url", JDBC_URL) \
            .option("dbtable", table_name) \
            .option("user", POSTGRES_USER) \
            .option("password", POSTGRES_PASSWORD) \
            .option("driver", "org.postgresql.Driver") \
            .mode("append") \
            .save()
        
        elapsed = (datetime.now() - start_time).total_seconds()
        
        print(f"  {service} {year}-{month:02d} ingresado correctamente ({elapsed:.2f}s)\n")
        
        mark_processed(checkpoint_data, service, year, month, 'SUCCESS', 
                      rows=count, time_sec=elapsed)
        
        # Limpiar archivo temporal
        if os.path.exists(local_path):
            os.remove(local_path)
        
        return 'SUCCESS'
            
    except urllib.error.HTTPError as e:
        elapsed = (datetime.now() - start_time).total_seconds()
        
        if e.code == 404 or e.code == 403:
            print(f"  Archivo no encontrado ({e.code}), saltando...\n")
            mark_processed(checkpoint_data, service, year, month, 'NOT_FOUND', 
                          time_sec=elapsed)
            status = 'NOT_FOUND'
        else:
            print(f"  Error HTTP {e.code}: {str(e)}\n")
            mark_processed(checkpoint_data, service, year, month, 'ERROR', 
                          time_sec=elapsed)
            status = 'ERROR'
        
        if os.path.exists(local_path):
            os.remove(local_path)
        
        return status
        
    except Exception as e:
        elapsed = (datetime.now() - start_time).total_seconds()
        
        print(f"  Error: {str(e)}\n")
        mark_processed(checkpoint_data, service, year, month, 'ERROR', 
                      time_sec=elapsed)
        
        if os.path.exists(local_path):
            os.remove(local_path)
        
        return 'ERROR'

print("Funcion ingest_month definida.")

Funcion ingest_month definida.


In [10]:
# Ejecutar ingesta masiva con idempotencia

print("=" * 80)
print("INICIANDO INGESTA MASIVA NYC TLC -> POSTGRES")
print("=" * 80)
print(f"Inicio: {datetime.now()}")
print(f"Servicios: {SERVICES}")
print(f"Rango: {START_YEAR}-{END_YEAR}")
print(f"Run ID: {RUN_ID}")
print("=" * 80)
print()

# Cargar checkpoint
checkpoint_data = load_checkpoint()
print()

# Contadores
stats = {
    'SUCCESS': 0,
    'SKIPPED': 0,
    'NOT_FOUND': 0,
    'ERROR': 0
}

ingestion_start = datetime.now()

# Loop por servicio, año y mes
for service in SERVICES:
    print(f"\n{'='*80}")
    print(f"Servicio: {service.upper()}")
    print(f"{'='*80}\n")
    
    for year in range(START_YEAR, END_YEAR + 1):
        # Para 2025: solo hasta septiembre
        if year == 2025:
            max_month = 9
        else:
            max_month = 12
        
        for month in range(1, max_month + 1):
            result = ingest_month(service, year, month, checkpoint_data)
            stats[result] = stats.get(result, 0) + 1

total_duration = (datetime.now() - ingestion_start).total_seconds()

print("\n" + "=" * 80)
print("RESUMEN FINAL DE INGESTA")
print("=" * 80)
print(f"Exitosos:       {stats['SUCCESS']}")
print(f"Omitidos:       {stats['SKIPPED']}")
print(f"No encontrados: {stats['NOT_FOUND']}")
print(f"Fallidos:       {stats['ERROR']}")
print(f"Rango:          {START_YEAR}-{END_YEAR}")
print(f"Servicios:      {', '.join(SERVICES)}")
print(f"Run ID:         {RUN_ID}")
print(f"Checkpoint:     {CHECKPOINT_FILE}")
print(f"Duracion total: {total_duration:.2f}s ({total_duration/60:.2f} min)")
print(f"Fin: {datetime.now()}")
print("=" * 80)

INICIANDO INGESTA MASIVA NYC TLC -> POSTGRES
Inicio: 2025-11-11 13:19:00.976454
Servicios: ['yellow', 'green']
Rango: 2015-2025
Run ID: run_001

Checkpoint cargado: 258 archivos registrados.


Servicio: YELLOW

  yellow 2015-01: Ya procesado (SKIP) [12741035 filas, 494.63848s]
  yellow 2015-02: Ya procesado (SKIP) [12442394 filas, 339.948858s]
  yellow 2015-03: Ya procesado (SKIP) [13342951 filas, 364.81764s]
  yellow 2015-04: Ya procesado (SKIP) [13063758 filas, 350.321475s]
  yellow 2015-05: Ya procesado (SKIP) [13157677 filas, 359.700977s]
  yellow 2015-06: Ya procesado (SKIP) [12324936 filas, 334.870962s]
  yellow 2015-07: Ya procesado (SKIP) [11559666 filas, 312.419438s]
  yellow 2015-08: Ya procesado (SKIP) [11123123 filas, 308.906263s]
  yellow 2015-09: Ya procesado (SKIP) [11218122 filas, 303.17454s]
  yellow 2015-10: Ya procesado (SKIP) [12307333 filas, 336.394519s]
  yellow 2015-11: Ya procesado (SKIP) [11305240 filas, 307.264311s]
  yellow 2015-12: Ya procesado (SKIP) [11452