In [10]:
# Instalar las librerías necesarias para conectar Spark con Snowflake y manejar archivos Parquet desde fuentes HTTP.

!pip install snowflake-connector-python pyarrow requests pandas
print("=" * 80)
print("Paquetes instalados correctamente.")
print("=" * 80)

Paquetes instalados correctamente.


In [11]:
# Verificar que todas las variables de ambiente necesarias estén configuradas.

import os
import requests
from io import BytesIO
import json
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
import snowflake.connector

print("=" * 80)
print("CONFIGURACIÓN DE AMBIENTE - PROYECTO 3")
print("=" * 80)

# Variables obligatorias de Snowflake
required_vars = [
    'SNOWFLAKE_ACCOUNT',
    'SNOWFLAKE_DATABASE', 
    'SNOWFLAKE_SCHEMA_RAW',
    'SNOWFLAKE_WAREHOUSE',
    'SNOWFLAKE_USER',
    'SNOWFLAKE_PASSWORD',
    'SNOWFLAKE_ROLE'
]

# Verificar que todas las variables existan
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
    print(f"ERROR: Faltan variables de ambiente: {', '.join(missing_vars)}")
    print("Por favor configura tu archivo .env correctamente.")
else:
    print("Todas las variables de ambiente requeridas están configuradas.")
    
print("=" * 80)

CONFIGURACIÓN DE AMBIENTE - PROYECTO 3
Todas las variables de ambiente requeridas están configuradas.


In [13]:
# Configurar Spark para:
# 1. Conectarse a Snowflake mediante JDBC.
# 2. Manejar timestamps en UTC (zona horaria consistente).
# 3. Procesar archivos Parquet eficientemente.

spark = (
    SparkSession.builder
    .appName("P3 - Ingesta RAW - NYC Taxis - Anahi Andrade")
    
    # Configuración de zona horaria para timestamps
    .config("spark.sql.timestampType", "TIMESTAMP_LTZ")
    .config("spark.sql.session.timeZone", "UTC")
    
    # JARs necesarios para conectar Spark con Snowflake
    .config(
        "spark.jars.packages",
        "net.snowflake:snowflake-jdbc:3.13.33,"
        "net.snowflake:spark-snowflake_2.12:2.9.3-spark_3.1"
    )
    
    # Desactivar lectura vectorizada para mayor compatibilidad
    .config("spark.sql.parquet.enableVectorizedReader", "false")
    
    .getOrCreate()
)

print("=" * 80)
print("SPARK SESSION CREADA EXITOSAMENTE")
print("=" * 80)
print(f"Spark Version: {spark.version}")
print(f"Spark UI disponible en: http://localhost:4040")
print(f"Timezone configurado: {spark.conf.get('spark.sql.session.timeZone')}")
print("=" * 80)

SPARK SESSION CREADA EXITOSAMENTE
Spark Version: 3.5.0
Spark UI disponible en: http://localhost:4040
Timezone configurado: UTC


In [14]:
# Definir:
# 1. Opciones de conexión a Snowflake para Spark.
# 2. Sistema de checkpoint para idempotencia (no duplicar cargas).

# Configuración de conexión Snowflake para Spark DataFrame Writer
snowflake_options = {
    "sfURL": f"{os.getenv('SNOWFLAKE_ACCOUNT')}.snowflakecomputing.com",
    "sfUser": os.getenv("SNOWFLAKE_USER"),
    "sfPassword": os.getenv("SNOWFLAKE_PASSWORD"),
    "sfDatabase": os.getenv("SNOWFLAKE_DATABASE"),
    "sfSchema": os.getenv("SNOWFLAKE_SCHEMA_RAW"),
    "sfWarehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),
    "sfRole": os.getenv("SNOWFLAKE_ROLE")
}

# SISTEMA DE CHECKPOINT PARA IDEMPOTENCIA:
# - Archivo JSON que registra qué meses ya fueron procesados exitosamente.
# - Evita duplicar datos si el notebook se ejecuta múltiples veces.
CHECKPOINT_FILE = "/home/jovyan/work/checkpoint_ingesta.json"

def save_checkpoint(service, year, month):
    
    # Guarda un checkpoint indicando que un mes específico fue procesado exitosamente.
    # Args:
    #   - service: Tipo de servicio ('yellow' o 'green')
    #   - year: Año procesado
    #   - month: Mes procesado (1-12)
    checkpoint = {}
    
    # Cargar checkpoints existentes si el archivo existe
    if os.path.exists(CHECKPOINT_FILE):
        with open(CHECKPOINT_FILE, "r") as f:
            checkpoint = json.load(f)
    
    # Crear clave única para este mes/servicio
    key = f"{service}_{year}_{month:02d}"
    
    # Guardar información del checkpoint
    checkpoint[key] = {
        "service": service,
        "year": year,
        "month": month,
        "processed_at": datetime.now().isoformat()
    }
    
    # Escribir archivo actualizado
    with open(CHECKPOINT_FILE, "w") as f:
        json.dump(checkpoint, f, indent=2)
    
    print(f" Checkpoint guardado: {key}")

def load_checkpoint():
    
    # Carga el archivo de checkpoints existente.
    # Returns:
    #    - dict: Diccionario con todos los checkpoints guardados
    
    if os.path.exists(CHECKPOINT_FILE):
        with open(CHECKPOINT_FILE, "r") as f:
            return json.load(f)
    return {}

def is_processed(checkpoint, service, year, month):
    
    # Verifica si un mes ya fue procesado anteriormente.    
    # Args:
    #    - checkpoint: Diccionario de checkpoints cargado
    #    - service: Tipo de servicio ('yellow' o 'green')
    #    - year: Año a verificar
    #    - month: Mes a verificar (1-12)
    # Returns:
    #    - bool: True si ya fue procesado, False si necesita procesarse
    
    key = f"{service}_{year}_{month:02d}"
    return key in checkpoint
print("=" * 80)
print("Sistema de checkpoint configurado.")
print(f"Archivo checkpoint: {CHECKPOINT_FILE}")
print("Opciones de conexión Snowflake configuradas.")
print("=" * 80)

Sistema de checkpoint configurado.
Archivo checkpoint: /home/jovyan/work/checkpoint_ingesta.json
Opciones de conexión Snowflake configuradas.


In [15]:
# Crear las tablas necesarias en Snowflake:
# 1. RAW_YELLOW_TRIPS: Tabla espejo para datos de taxis amarillos.
# 2. RAW_GREEN_TRIPS: Tabla espejo para datos de taxis verdes.
# 3. INGESTA_AUDIT: Tabla de auditoría para rastrear todas las cargas.

# Establecer conexión directa a Snowflake para ejecutar DDL
conn = snowflake.connector.connect(
    user=os.environ["SNOWFLAKE_USER"],
    password=os.environ["SNOWFLAKE_PASSWORD"],
    account=os.environ["SNOWFLAKE_ACCOUNT"],
    warehouse=os.environ["SNOWFLAKE_WAREHOUSE"],
    database=os.environ["SNOWFLAKE_DATABASE"],
    schema=os.environ["SNOWFLAKE_SCHEMA_RAW"],
    role=os.environ["SNOWFLAKE_ROLE"]
)
cursor = conn.cursor()

print("=" * 80)
print("CREACIÓN DE TABLAS RAW EN SNOWFLAKE")
print("=" * 80)

# TABLA RAW PARA YELLOW TAXIS:
# - Estructura espejo del Parquet origen + metadatos de ingesta
create_yellow_raw = """
CREATE TABLE IF NOT EXISTS RAW_YELLOW_TRIPS (
    -- Campos originales del dataset NYC TLC Yellow
    VENDORID BIGINT,
    TPEP_PICKUP_DATETIME TIMESTAMP_NTZ,    -- Timestamp sin zona horaria
    TPEP_DROPOFF_DATETIME TIMESTAMP_NTZ,
    PASSENGER_COUNT DOUBLE,
    TRIP_DISTANCE DOUBLE,
    RATECODEID DOUBLE,
    STORE_AND_FWD_FLAG VARCHAR(1),          -- Y/N flag
    PULOCATIONID BIGINT,                    -- ID de zona de pickup
    DOLOCATIONID BIGINT,                    -- ID de zona de dropoff
    PAYMENT_TYPE BIGINT,
    FARE_AMOUNT DOUBLE,
    EXTRA DOUBLE,
    MTA_TAX DOUBLE,
    TIP_AMOUNT DOUBLE,
    TOLLS_AMOUNT DOUBLE,
    IMPROVEMENT_SURCHARGE DOUBLE,
    TOTAL_AMOUNT DOUBLE,
    CONGESTION_SURCHARGE DOUBLE,
    AIRPORT_FEE DOUBLE,
    
    -- Metadatos de ingesta
    RUN_ID INTEGER,                         -- Identificador de ejecución
    SERVICE_TYPE VARCHAR(10),               -- 'yellow' o 'green'
    SOURCE_YEAR INTEGER,                    -- Año del archivo origen
    SOURCE_MONTH INTEGER,                   -- Mes del archivo origen
    INGESTED_AT_UTC TIMESTAMP_NTZ,          -- Momento de ingesta
    SOURCE_PATH VARCHAR(500),               -- URL del archivo Parquet
    BATCH_NUMBER INTEGER                    -- Número de batch (para cargas grandes)
)
"""

# TABLA RAW PARA GREEN TAXIS:
# - Similar a Yellow, pero con campos específicos de Green (LPEP, TRIP_TYPE, EHAIL_FEE)
create_green_raw = """
CREATE TABLE IF NOT EXISTS RAW_GREEN_TRIPS (
    -- Campos originales del dataset NYC TLC Green
    VENDORID BIGINT,
    LPEP_PICKUP_DATETIME TIMESTAMP_NTZ,    -- 'L' indica Green taxi
    LPEP_DROPOFF_DATETIME TIMESTAMP_NTZ,
    PASSENGER_COUNT DOUBLE,
    TRIP_DISTANCE DOUBLE,
    RATECODEID DOUBLE,
    STORE_AND_FWD_FLAG VARCHAR(1),
    PULOCATIONID BIGINT,
    DOLOCATIONID BIGINT,
    PAYMENT_TYPE BIGINT,
    FARE_AMOUNT DOUBLE,
    EXTRA DOUBLE,
    MTA_TAX DOUBLE,
    TIP_AMOUNT DOUBLE,
    TOLLS_AMOUNT DOUBLE,
    IMPROVEMENT_SURCHARGE DOUBLE,
    TOTAL_AMOUNT DOUBLE,
    TRIP_TYPE BIGINT,                      -- Específico de Green
    CONGESTION_SURCHARGE DOUBLE,
    EHAIL_FEE DOUBLE,                      -- Específico de Green
    
    -- Metadatos de ingesta
    RUN_ID INTEGER,
    SERVICE_TYPE VARCHAR(10),
    SOURCE_YEAR INTEGER,
    SOURCE_MONTH INTEGER,
    INGESTED_AT_UTC TIMESTAMP_NTZ,
    SOURCE_PATH VARCHAR(500),
    BATCH_NUMBER INTEGER
)
"""

# TABLA DE AUDITORÍA:
# - Registra TODAS las operaciones de ingesta (exitosas y fallidas).
create_audit = """
CREATE TABLE IF NOT EXISTS INGESTA_AUDIT (
    AUDIT_ID INTEGER AUTOINCREMENT,          -- ID único autoincremental
    RUN_ID INTEGER,                          -- Relaciona con metadatos en RAW
    SERVICE_TYPE VARCHAR(10),                -- 'yellow' o 'green'
    SOURCE_YEAR INTEGER,
    SOURCE_MONTH INTEGER,
    SOURCE_PATH VARCHAR(500),                -- URL completa del Parquet
    BATCH_NUMBER INTEGER,
    ROWS_PROCESSED INTEGER,                  -- Cantidad de filas insertadas
    PROCESSING_TIME_SECONDS FLOAT,           -- Tiempo de ejecución
    STATUS VARCHAR(20),                      -- 'SUCCESS' o 'FAILED'
    ERROR_MESSAGE VARCHAR(1000),             -- Detalle del error (si aplica)
    CREATED_AT TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP(),
    PRIMARY KEY (AUDIT_ID)
)
"""

# Ejecutar DDL
print("Creando tabla RAW_YELLOW_TRIPS...")
cursor.execute(create_yellow_raw)
print("- Tabla RAW_YELLOW_TRIPS creada/verificada.")

print("\nCreando tabla RAW_GREEN_TRIPS...")
cursor.execute(create_green_raw)
print("- Tabla RAW_GREEN_TRIPS creada/verificada.")

print("\nCreando tabla INGESTA_AUDIT...")
cursor.execute(create_audit)
print("- Tabla INGESTA_AUDIT creada/verificada.")

cursor.close()
conn.close()

print("=" * 80)
print("TODAS LAS TABLAS RAW CREADAS EXITOSAMENTE")
print("=" * 80)

CREACIÓN DE TABLAS RAW EN SNOWFLAKE
Creando tabla RAW_YELLOW_TRIPS...
- Tabla RAW_YELLOW_TRIPS creada/verificada.

Creando tabla RAW_GREEN_TRIPS...
- Tabla RAW_GREEN_TRIPS creada/verificada.

Creando tabla INGESTA_AUDIT...
- Tabla INGESTA_AUDIT creada/verificada.
TODAS LAS TABLAS RAW CREADAS EXITOSAMENTE


In [16]:
# Función completa que:
# 1. Descarga Parquet desde NYC TLC.
# 2. Normaliza tipos y timestamps.
# 3. Agrega metadatos de ingesta.
# 4. Escribe a Snowflake RAW.
# 5. Registra auditoría.
# 6. Guarda checkpoint para idempotencia.

import time

# Esquema esperado por servicio (orden exacto para escritura en Snowflake)
EXPECTED_COLUMNS = {
    "yellow": [
        "VENDORID", "TPEP_PICKUP_DATETIME", "TPEP_DROPOFF_DATETIME",
        "PASSENGER_COUNT", "TRIP_DISTANCE", "RATECODEID", "STORE_AND_FWD_FLAG",
        "PULOCATIONID", "DOLOCATIONID", "PAYMENT_TYPE",
        "FARE_AMOUNT", "EXTRA", "MTA_TAX", "TIP_AMOUNT", "TOLLS_AMOUNT",
        "IMPROVEMENT_SURCHARGE", "TOTAL_AMOUNT", "CONGESTION_SURCHARGE", "AIRPORT_FEE",
        # Metadatos
        "RUN_ID", "SERVICE_TYPE", "SOURCE_YEAR", "SOURCE_MONTH",
        "INGESTED_AT_UTC", "SOURCE_PATH", "BATCH_NUMBER",
    ],
    "green": [
        "VENDORID", "LPEP_PICKUP_DATETIME", "LPEP_DROPOFF_DATETIME",
        "PASSENGER_COUNT", "TRIP_DISTANCE", "RATECODEID", "STORE_AND_FWD_FLAG",
        "PULOCATIONID", "DOLOCATIONID", "PAYMENT_TYPE",
        "FARE_AMOUNT", "EXTRA", "MTA_TAX", "TIP_AMOUNT", "TOLLS_AMOUNT",
        "IMPROVEMENT_SURCHARGE", "TOTAL_AMOUNT", "TRIP_TYPE",
        "CONGESTION_SURCHARGE", "EHAIL_FEE",
        # Metadatos
        "RUN_ID", "SERVICE_TYPE", "SOURCE_YEAR", "SOURCE_MONTH",
        "INGESTED_AT_UTC", "SOURCE_PATH", "BATCH_NUMBER",
    ],
}

def _open_sf_conn(schema=None):
    
    # Abre una conexión directa a Snowflake usando snowflake-connector-python.
    # Args:
    #    - schema: Si no se provee RAW, usa SNOWFLAKE_SCHEMA_RAW
    
    return snowflake.connector.connect(
        user=os.environ["SNOWFLAKE_USER"],
        password=os.environ["SNOWFLAKE_PASSWORD"],
        account=os.environ["SNOWFLAKE_ACCOUNT"],
        warehouse=os.environ["SNOWFLAKE_WAREHOUSE"],
        database=os.environ["SNOWFLAKE_DATABASE"],
        schema=schema or os.environ["SNOWFLAKE_SCHEMA_RAW"],
        role=os.environ["SNOWFLAKE_ROLE"],
    )

def ingest_parquet_to_raw(service_type, year, month, run_id, batch_size=1000000):
    
    # Ingesta un archivo Parquet mensual de NYC TLC hacia Snowflake RAW.
    
    # Proceso completo:
    # 1. Descarga el Parquet desde la CDN de NYC TLC.
    # 2. Lee el archivo con Spark.
    # 3. Normaliza timestamps (TimestampNTZType -> TimestampType).
    # 4. Estandariza tipos numéricos.
    # 5. Agrega metadatos de lineage.
    # 6. Alinea columnas al esquema esperado.
    # 7. Escribe a Snowflake en modo APPEND.
    # 8. Registra auditoría (exitosa o fallida).
    # 9. Guarda checkpoint si fue exitoso.
    
    # Args:
    #     - service_type: 'yellow' o 'green'
    #     - year: Año del archivo (2015-2025)
    #     - month: Mes del archivo (1-12)
    #     - run_id: Identificador único de esta ejecución
    #     - batch_size: Tamaño de batch
    
    # Returns:
    #     - bool: True si la ingesta fue exitosa, False si falló
    
    start_time = time.time()
    
    # Construir URL del archivo Parquet
    base_url = os.getenv('SOURCE_BASE', 'https://d37ci6vzurychx.cloudfront.net/trip-data')
    file_name = f'{service_type}_tripdata_{year}-{month:02d}.parquet'
    url = f"{base_url}/{file_name}"
    table_name = f"RAW_{service_type.upper()}_TRIPS"
    
    print(f"\n{'='*80}")
    print(f"PROCESANDO: {file_name}")
    print(f"URL: {url}")
    print(f"Destino: {table_name}")
    print(f"{'='*80}")
    
    try:
        # PASO 1: DESCARGA DEL PARQUET
        print("Descargando archivo desde NYC TLC...")
        headers = {
            "User-Agent": "Mozilla/5.0",
            "Accept": "application/octet-stream"
        }
        resp = requests.get(url, headers=headers, timeout=120)
        resp.raise_for_status()  # Lanza excepción si status != 200
        
        size_mb = len(resp.content) / 1024 / 1024
        print(f"Descarga completada ({size_mb:.2f} MB)")
        
        # PASO 2: LECTURA CON SPARK
        print("Leyendo Parquet con Spark...")
        temp_path = f"/tmp/{file_name}"
        with open(temp_path, 'wb') as f:
            f.write(resp.content)
        
        df = spark.read.parquet(temp_path)
        print(f"Parquet leído: {df.count():,} filas")
        
        drop_if_exists = ["cbd_congestion_fee"]
        for c in drop_if_exists:
            if c in df.columns:
                df = df.drop(c)
                print(f"  - Columna '{c}' eliminada (no necesaria)")
        
        # PASO 3: NORMALIZAR NOMBRES A UPPERCASE
        # - Importante: Snowflake trata nombres como uppercase por defecto
        df = df.toDF(*[c.upper() for c in df.columns])
        
        # PASO 4: NORMALIZAR TIMESTAMPS
        # - TimestampNTZType (sin zona) -> TimestampType (con zona UTC)
        # - Esto asegura compatibilidad con Snowflake y consultas posteriores
        ntz_cols = []
        for field in df.schema.fields:
            # Verificar si es TimestampNTZType
            if isinstance(field.dataType, getattr(T, "TimestampNTZType", type(None))):
                ntz_cols.append(field.name)
        
        for c in ntz_cols:
            df = df.withColumn(c, F.col(c).cast(T.TimestampType()))
            print(f"  - Columna '{c}' convertida: TimestampNTZType -> TimestampType")
        
        # PASO 5: CASTING NUMÉRICO CONSISTENTE
        # - Asegurar que los tipos coincidan con las tablas RAW en Snowflake
        
        # Columnas que deben ser BIGINT (IDs)
        long_cols = ["VENDORID", "PULOCATIONID", "DOLOCATIONID", "PAYMENT_TYPE", "TRIP_TYPE"]
        for c in long_cols:
            if c in df.columns:
                df = df.withColumn(c, F.col(c).cast(T.LongType()))
        
        # Columnas que deben ser DOUBLE (métricas y montos)
        double_cols = [
            "PASSENGER_COUNT", "TRIP_DISTANCE", "RATECODEID",
            "FARE_AMOUNT", "EXTRA", "MTA_TAX", "TIP_AMOUNT", "TOLLS_AMOUNT",
            "IMPROVEMENT_SURCHARGE", "TOTAL_AMOUNT", "CONGESTION_SURCHARGE",
            "AIRPORT_FEE", "EHAIL_FEE"
        ]
        for c in double_cols:
            if c in df.columns:
                df = df.withColumn(c, F.col(c).cast(T.DoubleType()))
        
        print("- Tipos de datos normalizados.")
        
        # PASO 6: AGREGAR METADATOS DE LINEAGE
        # - Crítico para rastreabilidad y auditoría
        df = (df
              .withColumn("RUN_ID", F.lit(run_id))
              .withColumn("SERVICE_TYPE", F.lit(service_type))
              .withColumn("SOURCE_YEAR", F.lit(year))
              .withColumn("SOURCE_MONTH", F.lit(month))
              .withColumn("INGESTED_AT_UTC", F.current_timestamp().cast(T.TimestampType()))
              .withColumn("SOURCE_PATH", F.lit(url))
              .withColumn("BATCH_NUMBER", F.lit(1))  # Reservado para cargas por batch
        )
        
        total_rows = df.count()
        print(f"- Metadatos agregados: {total_rows:,} filas listas para carga.")
        
        # PASO 7: ALINEAR COLUMNAS AL ESQUEMA ESPERADO
        # - Asegurar que todas las columnas esperadas existan (agregar NULLs si faltan)
        expected = EXPECTED_COLUMNS[service_type]
        for c in expected:
            if c not in df.columns:
                df = df.withColumn(c, F.lit(None))
                print(f"  - Columna '{c}' agregada con NULL (no existe en origen).")
        
        # Reordenar columnas en el orden esperado
        df = df.select(*expected)
        
        # PASO 8: ESCRIBIR A SNOWFLAKE
        print(f"Escribiendo a Snowflake ({table_name})...")
        (df.write
           .format("snowflake")
           .options(**snowflake_options)
           .option("dbtable", table_name)
           .option("sessionParams", "TIMEZONE=UTC")  # Asegurar timezone UTC
           .option("onError", "CONTINUE")             # Continuar si hay errores en filas
           .mode("append")                            # APPEND: no sobrescribe datos existentes
           .save()
        )
        
        elapsed = time.time() - start_time
        print(f"- Carga completada en {elapsed:.2f} segundos")
        
        # PASO 9: REGISTRAR AUDITORÍA EXITOSA
        conn = _open_sf_conn(schema=os.environ["SNOWFLAKE_SCHEMA_RAW"])
        cur = conn.cursor()
        cur.execute(f"""
            INSERT INTO INGESTA_AUDIT 
            (RUN_ID, SERVICE_TYPE, SOURCE_YEAR, SOURCE_MONTH, SOURCE_PATH, 
             BATCH_NUMBER, ROWS_PROCESSED, PROCESSING_TIME_SECONDS, STATUS)
            VALUES ({run_id}, '{service_type}', {year}, {month}, '{url}', 
                    1, {total_rows}, {elapsed}, 'SUCCESS')
        """)
        cur.close()
        conn.close()
        print("- Auditoría registrada: SUCCESS.")
        
        # PASO 10: GUARDAR CHECKPOINT Y LIMPIAR
        save_checkpoint(service_type, year, month)
        
        # Limpiar archivo temporal
        try:
            os.remove(temp_path)
        except:
            pass
        
        return True
    
    except requests.exceptions.HTTPError as e:
        # Error HTTP (ej: 403 Forbidden, 404 Not Found)
        status = getattr(e.response, "status_code", None)
        msg = f"HTTP {status}" if status else str(e)
        print(f"Error HTTP: {msg}")
        
        # Registrar auditoría de fallo
        try:
            elapsed = time.time() - start_time
            conn = _open_sf_conn(schema=os.environ["SNOWFLAKE_SCHEMA_RAW"])
            cur = conn.cursor()
            err = msg.replace("'", "")[:1000]  # Sanitizar comillas
            cur.execute(f"""
                INSERT INTO INGESTA_AUDIT 
                (RUN_ID, SERVICE_TYPE, SOURCE_YEAR, SOURCE_MONTH, SOURCE_PATH, 
                 PROCESSING_TIME_SECONDS, STATUS, ERROR_MESSAGE)
                VALUES ({run_id}, '{service_type}', {year}, {month}, '{url}', 
                        {elapsed}, 'FAILED', '{err}')
            """)
            cur.close()
            conn.close()
        except:
            pass
        
        return False
    
    except Exception as e:
        # Cualquier otro error (parsing, Snowflake, etc.)
        elapsed = time.time() - start_time
        print(f"Error: {str(e)}")
        
        # Registrar auditoría de fallo
        try:
            conn = _open_sf_conn(schema=os.environ["SNOWFLAKE_SCHEMA_RAW"])
            cur = conn.cursor()
            err = str(e).replace("'", "")[:1000]
            cur.execute(f"""
                INSERT INTO INGESTA_AUDIT 
                (RUN_ID, SERVICE_TYPE, SOURCE_YEAR, SOURCE_MONTH, SOURCE_PATH, 
                 PROCESSING_TIME_SECONDS, STATUS, ERROR_MESSAGE)
                VALUES ({run_id}, '{service_type}', {year}, {month}, '{url}', 
                        {elapsed}, 'FAILED', '{err}')
            """)
            cur.close()
            conn.close()
        except:
            pass
        
        # Limpiar archivo temporal si existe
        try:
            os.remove(temp_path)
        except:
            pass
        
        return False

print("=" * 80)
print("FUNCIÓN DE INGESTA DEFINIDA Y LISTA")
print("=" * 80)


FUNCIÓN DE INGESTA DEFINIDA Y LISTA


In [17]:
# Loop principal que:
# 1. Itera sobre todos los años/meses/servicios configurados.
# 2. Verifica checkpoints (idempotencia).
# 3. Llama a la función de ingesta.
# 4. Genera resumen final.

from datetime import datetime

# PARÁMETROS DE INGESTA (desde variables de ambiente):
START_YEAR = int(os.getenv('START_YEAR', 2015))
END_YEAR = int(os.getenv('END_YEAR', 2025))
SERVICES = [s.strip() for s in os.getenv('SERVICES', 'yellow,green').split(',')]
RUN_ID = int(os.getenv('RUN_ID', 1))

# LÍMITE MÁXIMO: Agosto 2025 (según disponibilidad de datos NYC TLC)
MAX_YEAR = 2025
MAX_MONTH = 8

# Cargar checkpoints existentes
checkpoint = load_checkpoint()

# Contadores para resumen
total_files = 0
success_count = 0
failed_count = 0
skipped_count = 0

# Obtener fecha/hora actual para validación
now = datetime.utcnow()
current_year = now.year
current_month = now.month

print("\n" + "="*80)
print("INICIANDO INGESTA COMPLETA A RAW")
print("="*80)
print(f"Periodo: {START_YEAR}-{END_YEAR}")
print(f"Servicios: {', '.join(SERVICES)}")
print(f"Run ID: {RUN_ID}")
print(f"Checkpoint file: {CHECKPOINT_FILE}")
print("="*80 + "\n")

# LOOP PRINCIPAL DE INGESTA:
for service in SERVICES:
    print(f"\n{'#'*80}")
    print(f"SERVICIO: {service.upper()}")
    print(f"{'#'*80}")
    
    for year in range(START_YEAR, END_YEAR + 1):
        for month in range(1, 13):
            # VALIDACIÓN 1: No procesar meses futuros (según fecha actual UTC)
            if (year > current_year) or (year == current_year and month > current_month):
                break
            
            # VALIDACIÓN 2: No procesar más allá de agosto 2025 (límite de datos disponibles)
            if (year > MAX_YEAR) or (year == MAX_YEAR and month > MAX_MONTH):
                break
            
            total_files += 1
            
            # VALIDACIÓN 3: Verificar checkpoint (idempotencia)
            if is_processed(checkpoint, service, year, month):
                print(f"SALTANDO {service} {year}-{month:02d} (ya procesado).")
                skipped_count += 1
                continue
            
            # Ejecutar ingesta para este mes
            ok = ingest_parquet_to_raw(service, year, month, RUN_ID)
            
            if ok:
                success_count += 1
            else:
                failed_count += 1
            
            # Pausa entre peticiones para no saturar la CDN
            import time as _time
            _time.sleep(2)

# RESUMEN FINAL:
print("\n" + "="*80)
print("RESUMEN DE INGESTA")
print("="*80)
print(f"Total de archivos evaluados: {total_files}")
print(f"Exitosos (nuevos): {success_count}")
print(f"Fallidos: {failed_count}")
print(f"Saltados (ya procesados): {skipped_count}")
print("="*80)

if failed_count > 0:
    print("ATENCIÓN: Hubo archivos fallidos. Revisa la tabla INGESTA_AUDIT para detalles.")
else:
    print("Todos los archivos disponibles fueron procesados exitosamente.")
print("="*80)


INICIANDO INGESTA COMPLETA A RAW
Periodo: 2015-2025
Servicios: yellow, green
Run ID: 1
Checkpoint file: /home/jovyan/work/checkpoint_ingesta.json


################################################################################
SERVICIO: YELLOW
################################################################################
SALTANDO yellow 2015-01 (ya procesado).
SALTANDO yellow 2015-02 (ya procesado).
SALTANDO yellow 2015-03 (ya procesado).
SALTANDO yellow 2015-04 (ya procesado).
SALTANDO yellow 2015-05 (ya procesado).
SALTANDO yellow 2015-06 (ya procesado).
SALTANDO yellow 2015-07 (ya procesado).
SALTANDO yellow 2015-08 (ya procesado).
SALTANDO yellow 2015-09 (ya procesado).
SALTANDO yellow 2015-10 (ya procesado).
SALTANDO yellow 2015-11 (ya procesado).
SALTANDO yellow 2015-12 (ya procesado).
SALTANDO yellow 2016-01 (ya procesado).
SALTANDO yellow 2016-02 (ya procesado).
SALTANDO yellow 2016-03 (ya procesado).
SALTANDO yellow 2016-04 (ya procesado).
SALTANDO yellow 2016-05 (ya proc

In [18]:
# Genera reportes completos sobre:
# 1. Conteos totales en tablas RAW.
# 2. Distribución por año/mes/servicio.
# 3. Resumen de auditoría (solo estado actual).
# 4. Últimos intentos de carga.

import os
import snowflake.connector

# Conectar a Snowflake
conn = snowflake.connector.connect(
    user=os.environ["SNOWFLAKE_USER"],
    password=os.environ["SNOWFLAKE_PASSWORD"],
    account=os.environ["SNOWFLAKE_ACCOUNT"],
    warehouse=os.environ["SNOWFLAKE_WAREHOUSE"],
    database=os.environ["SNOWFLAKE_DATABASE"],
    schema=os.environ["SNOWFLAKE_SCHEMA_RAW"],
    role=os.environ["SNOWFLAKE_ROLE"],
)
cursor = conn.cursor()

print("\n" + "="*80)
print("VERIFICACIÓN DE DATOS EN SNOWFLAKE RAW")
print("="*80 + "\n")

# CONTEOS TOTALES POR TABLA
print("CONTEOS TOTALES")
print("-"*80)

cursor.execute("SELECT COUNT(*) FROM RAW_YELLOW_TRIPS")
yellow_count = cursor.fetchone()[0]
print(f"RAW_YELLOW_TRIPS: {yellow_count:,} filas")

cursor.execute("SELECT COUNT(*) FROM RAW_GREEN_TRIPS")
green_count = cursor.fetchone()[0]
print(f"RAW_GREEN_TRIPS:  {green_count:,} filas")

print(f"TOTAL GENERAL:    {yellow_count + green_count:,} filas")

# DISTRIBUCIÓN YELLOW POR AÑO/MES
print("\n" + "-"*80)
print("DISTRIBUCIÓN YELLOW (año-mes)")
print("-"*80)
cursor.execute("""
    SELECT SOURCE_YEAR, SOURCE_MONTH, COUNT(*) AS cnt
    FROM RAW_YELLOW_TRIPS
    GROUP BY SOURCE_YEAR, SOURCE_MONTH
    ORDER BY SOURCE_YEAR, SOURCE_MONTH
""")
rows = cursor.fetchall()
if not rows:
    print("  (sin datos)")
else:
    for row in rows:
        print(f"  {int(row[0])}-{int(row[1]):02d}: {int(row[2]):,} viajes")

# DISTRIBUCIÓN GREEN POR AÑO/MES
print("\n" + "-"*80)
print("DISTRIBUCIÓN GREEN (año-mes)")
print("-"*80)
cursor.execute("""
    SELECT SOURCE_YEAR, SOURCE_MONTH, COUNT(*) AS cnt
    FROM RAW_GREEN_TRIPS
    GROUP BY SOURCE_YEAR, SOURCE_MONTH
    ORDER BY SOURCE_YEAR, SOURCE_MONTH
""")
rows = cursor.fetchall()
if not rows:
    print("  (sin datos)")
else:
    for row in rows:
        print(f"  {int(row[0])}-{int(row[1]):02d}: {int(row[2]):,} viajes")

# RESUMEN DE AUDITORÍA - SOLO ÚLTIMO ESTADO POR ARCHIVO
# - Usa ROW_NUMBER para obtener solo el intento más reciente de cada archivo
print("\n" + "-"*80)
print("RESUMEN DE AUDITORÍA (ÚLTIMO ESTADO POR ARCHIVO)")
print("-"*80)
cursor.execute("""
    WITH LastAttempt AS (
        SELECT 
            SERVICE_TYPE,
            SOURCE_YEAR,
            SOURCE_MONTH,
            STATUS,
            ROWS_PROCESSED,
            PROCESSING_TIME_SECONDS,
            ROW_NUMBER() OVER (
                PARTITION BY SERVICE_TYPE, SOURCE_YEAR, SOURCE_MONTH 
                ORDER BY CREATED_AT DESC
            ) AS rn
        FROM INGESTA_AUDIT
    )
    SELECT 
        STATUS,
        COUNT(*) AS cnt,
        COALESCE(SUM(ROWS_PROCESSED), 0) AS total_rows,
        COALESCE(AVG(PROCESSING_TIME_SECONDS), 0) AS avg_time
    FROM LastAttempt
    WHERE rn = 1
    GROUP BY STATUS
    ORDER BY STATUS
""")

audit_summary = cursor.fetchall()
if not audit_summary:
    print("  (sin registros de auditoría)")
else:
    for status, cnt, total_rows, avg_time in audit_summary:
        print(f"  {status}: {cnt} archivos | {int(total_rows):,} filas | {float(avg_time):.2f}s promedio")

# ESTADO ACTUAL POR SERVICIO
print("\n" + "-"*80)
print("ESTADO ACTUAL POR SERVICIO")
print("-"*80)
cursor.execute("""
    WITH LastAttempt AS (
        SELECT 
            SERVICE_TYPE,
            SOURCE_YEAR,
            SOURCE_MONTH,
            STATUS,
            ROWS_PROCESSED,
            ROW_NUMBER() OVER (
                PARTITION BY SERVICE_TYPE, SOURCE_YEAR, SOURCE_MONTH 
                ORDER BY CREATED_AT DESC
            ) AS rn
        FROM INGESTA_AUDIT
    )
    SELECT 
        SERVICE_TYPE,
        STATUS,
        COUNT(*) AS archivos,
        SUM(ROWS_PROCESSED) AS total_rows
    FROM LastAttempt
    WHERE rn = 1
    GROUP BY SERVICE_TYPE, STATUS
    ORDER BY SERVICE_TYPE, STATUS
""")

service_status = cursor.fetchall()
if not service_status:
    print("  (sin datos)")
else:
    for svc, status, cnt, total_rows in service_status:
        total_rows = total_rows or 0
        print(f"  {svc.upper()} - {status}: {cnt} archivos | {int(total_rows):,} filas")

# MATRIZ DE COBERTURA
# - Muestra qué meses están disponibles por año y servicio
print("\n" + "-"*80)
print("MATRIZ DE COBERTURA 2015-2025")
print("-"*80)
print("Leyenda: ✓ = Cargado | ✗ = Fallido | - = No disponible\n")

# Obtener datos de cobertura
cursor.execute("""
    WITH LastAttempt AS (
        SELECT 
            SERVICE_TYPE,
            SOURCE_YEAR,
            SOURCE_MONTH,
            STATUS,
            ROW_NUMBER() OVER (
                PARTITION BY SERVICE_TYPE, SOURCE_YEAR, SOURCE_MONTH 
                ORDER BY CREATED_AT DESC
            ) AS rn
        FROM INGESTA_AUDIT
    )
    SELECT 
        SERVICE_TYPE,
        SOURCE_YEAR,
        SOURCE_MONTH,
        STATUS
    FROM LastAttempt
    WHERE rn = 1
    ORDER BY SERVICE_TYPE, SOURCE_YEAR, SOURCE_MONTH
""")

coverage_data = {}
for svc, yr, mn, st in cursor.fetchall():
    key = f"{svc}_{yr}"
    if key not in coverage_data:
        coverage_data[key] = {}
    coverage_data[key][mn] = st

# Imprimir matriz
for service in ['yellow', 'green']:
    print(f"\n{service.upper()}:")
    print("Año  | Ene Feb Mar Abr May Jun Jul Ago Sep Oct Nov Dic")
    print("-----|" + "-" * 48)
    
    for year in range(START_YEAR, END_YEAR + 1):
        key = f"{service}_{year}"
        months_str = []
        
        for month in range(1, 13):
            # No mostrar meses futuros
            if year == MAX_YEAR and month > MAX_MONTH:
                months_str.append(" - ")
            elif key in coverage_data and month in coverage_data[key]:
                status = coverage_data[key][month]
                symbol = " ✓ " if status == "SUCCESS" else " ✗ "
                months_str.append(symbol)
            else:
                months_str.append(" - ")
        
        print(f"{year} | {''.join(months_str)}")

cursor.close()
conn.close()

print("\n" + "="*80)
print("NOTEBOOK 01_INGESTA_PARQUET_RAW.IPYNB COMPLETADO")
print("="*80)


VERIFICACIÓN DE DATOS EN SNOWFLAKE RAW

CONTEOS TOTALES
--------------------------------------------------------------------------------
RAW_YELLOW_TRIPS: 821,925,430 filas
RAW_GREEN_TRIPS:  68,045,597 filas
TOTAL GENERAL:    889,971,027 filas

--------------------------------------------------------------------------------
DISTRIBUCIÓN YELLOW (año-mes)
--------------------------------------------------------------------------------
  2015-01: 12,741,035 viajes
  2015-02: 12,442,394 viajes
  2015-03: 26,685,902 viajes
  2015-04: 26,127,516 viajes
  2015-05: 13,157,677 viajes
  2015-06: 12,324,936 viajes
  2015-07: 11,559,666 viajes
  2015-08: 11,123,123 viajes
  2015-09: 11,218,122 viajes
  2015-10: 12,307,333 viajes
  2015-11: 11,305,240 viajes
  2015-12: 11,452,996 viajes
  2016-01: 10,905,067 viajes
  2016-02: 11,375,412 viajes
  2016-03: 12,203,824 viajes
  2016-04: 11,927,996 viajes
  2016-05: 11,832,049 viajes
  2016-06: 22,263,290 viajes
  2016-07: 10,294,080 viajes
  2016-08: 