In [67]:
import os
import requests
import uuid
from datetime import datetime
from io import BytesIO
import gc
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col, current_timestamp
import pyarrow.parquet as pq
import pandas as pd
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas

In [68]:
# Crear SparkSession
spark = SparkSession.builder \
    .appName("NYC_TLC_Ingestion_RAW") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

print(f"Spark Version: {spark.version}")
print(f"Spark UI: http://localhost:4040")

Spark Version: 3.5.0
Spark UI: http://localhost:4040


In [69]:
# Variables de ambiente
SNOWFLAKE_ACCOUNT = os.getenv("SNOWFLAKE_ACCOUNT")
SNOWFLAKE_USER = os.getenv("SNOWFLAKE_USER")
SNOWFLAKE_PASSWORD = os.getenv("SNOWFLAKE_PASSWORD")
SNOWFLAKE_WAREHOUSE = os.getenv("SNOWFLAKE_WAREHOUSE")
SNOWFLAKE_DATABASE = os.getenv("SNOWFLAKE_DATABASE")
SNOWFLAKE_ROLE = os.getenv("SNOWFLAKE_ROLE")
SNOWFLAKE_SCHEMA_RAW = os.getenv("SNOWFLAKE_SCHEMA_RAW", "RAW")

START_YEAR = 2017
END_YEAR = 2018
SERVICES = os.getenv("SERVICES", "yellow,green").split(",")
BASE_URL = os.getenv("BASE_URL", "https://d37ci6vzurychx.cloudfront.net/trip-data")
RUN_ID = os.getenv("RUN_ID", str(uuid.uuid4()))
BATCH_SIZE = int(os.getenv("BATCH_SIZE", 1_000_000))

print(f"\nConfiguración de Ingesta:")
print(f"  - Base de Datos: {SNOWFLAKE_DATABASE}")
print(f"  - Schema: {SNOWFLAKE_SCHEMA_RAW}")
print(f"  - Período: {START_YEAR}-{END_YEAR}")
print(f"  - Servicios: {SERVICES}")
print(f"  - RUN_ID: {RUN_ID}")
print(f"  - Batch Size: {BATCH_SIZE:,}")


Configuración de Ingesta:
  - Base de Datos: NYC_TAXI
  - Schema: RAW
  - Período: 2017-2018
  - Servicios: ['yellow', 'green']
  - RUN_ID: 05ca0353-0649-4d90-82a3-44d0fae837d1
  - Batch Size: 1,000,000


In [None]:
def get_snowflake_conn():
    return snowflake.connector.connect(
        user=os.getenv("SNOWFLAKE_USER"),
        password=os.getenv("SNOWFLAKE_PASSWORD"),
        account=os.getenv("SNOWFLAKE_ACCOUNT"),
        warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
        database=os.getenv("SNOWFLAKE_DATABASE"),
        schema=os.getenv("SNOWFLAKE_SCHEMA_RAW", "RAW"),
        role=os.getenv("SNOWFLAKE_ROLE"),
        client_session_keep_alive=True,
    )

print("\nProbando conexión a Snowflake...")
try:
    _conn = get_snowflake_conn()
    _cur = _conn.cursor()
    _cur.execute("SELECT CURRENT_VERSION()")
    version = _cur.fetchone()[0]
    _cur.execute(f"SELECT CURRENT_DATABASE(), CURRENT_SCHEMA()")
    db_info = _cur.fetchone()
    print(f"Conectado a Snowflake versión: {version}")
    print(f"Database: {db_info[0]}, Schema: {db_info[1]}")
    _cur.close()
    _conn.close()
except Exception as e:
    print(f"Error de conexión: {e}")
    raise


Probando conexión a Snowflake...
Conectado a Snowflake versión: 9.37.0
Database: NYC_TAXI, Schema: RAW


In [71]:
def build_url(service: str, year: int, month: int) -> str:
    """Construye URL del archivo Parquet"""
    return f"{BASE_URL}/{service}_tripdata_{year}-{month:02d}.parquet"

In [72]:
def create_tables_if_not_exist(conn, service: str):
    cur = conn.cursor()
    
    if service == "yellow":
        table_name = "YELLOW_TAXIS"
        audit_table = "AUDIT_YELLOW"
        
        create_table_sql = f"""
        CREATE TABLE IF NOT EXISTS {SNOWFLAKE_SCHEMA_RAW}.{table_name} (
            VENDORID NUMBER,
            TPEP_PICKUP_DATETIME TIMESTAMP_NTZ,
            TPEP_DROPOFF_DATETIME TIMESTAMP_NTZ,
            PASSENGER_COUNT NUMBER,
            TRIP_DISTANCE FLOAT,
            RATECODEID NUMBER,
            STORE_AND_FWD_FLAG VARCHAR(1),
            PULOCATIONID NUMBER,
            DOLOCATIONID NUMBER,
            PAYMENT_TYPE NUMBER,
            FARE_AMOUNT FLOAT,
            EXTRA FLOAT,
            MTA_TAX FLOAT,
            TIP_AMOUNT FLOAT,
            TOLLS_AMOUNT FLOAT,
            IMPROVEMENT_SURCHARGE FLOAT,
            TOTAL_AMOUNT FLOAT,
            CONGESTION_SURCHARGE FLOAT,
            AIRPORT_FEE FLOAT,
            CBD_CONGESTION_FEE FLOAT,
            RUN_ID STRING,
            INGESTED_AT_UTC TIMESTAMP_NTZ,
            SOURCE_YEAR NUMBER,
            SOURCE_MONTH NUMBER,
            CHUNK_ID VARCHAR(50),
            PRIMARY KEY (TPEP_PICKUP_DATETIME, TPEP_DROPOFF_DATETIME, PULOCATIONID, DOLOCATIONID)
        );
        """
    else:  # green
        table_name = "GREEN_TAXIS"
        audit_table = "AUDIT_GREEN"
        
        create_table_sql = f"""
        CREATE TABLE IF NOT EXISTS {SNOWFLAKE_SCHEMA_RAW}.{table_name} (
            VENDORID NUMBER,
            LPEP_PICKUP_DATETIME TIMESTAMP_NTZ,
            LPEP_DROPOFF_DATETIME TIMESTAMP_NTZ,
            STORE_AND_FWD_FLAG VARCHAR(1),
            RATECODEID NUMBER,
            PULOCATIONID NUMBER,
            DOLOCATIONID NUMBER,
            PASSENGER_COUNT NUMBER,
            TRIP_DISTANCE FLOAT,
            FARE_AMOUNT FLOAT,
            EXTRA FLOAT,
            MTA_TAX FLOAT,
            TIP_AMOUNT FLOAT,
            TOLLS_AMOUNT FLOAT,
            EHAIL_FEE FLOAT,
            IMPROVEMENT_SURCHARGE FLOAT,
            TOTAL_AMOUNT FLOAT,
            PAYMENT_TYPE NUMBER,
            TRIP_TYPE NUMBER,
            CONGESTION_SURCHARGE FLOAT,
            CBD_CONGESTION_FEE FLOAT,
            AIRPORT_FEE FLOAT,
            RUN_ID STRING,
            INGESTED_AT_UTC TIMESTAMP_NTZ,
            SOURCE_YEAR NUMBER,
            SOURCE_MONTH NUMBER,
            CHUNK_ID VARCHAR(50),
            PRIMARY KEY (LPEP_PICKUP_DATETIME, LPEP_DROPOFF_DATETIME, PULOCATIONID, DOLOCATIONID)
        );
        """
    
    cur.execute(create_table_sql)
    print(f"Tabla {table_name} verificada/creada")
    
    # Crear tabla de auditoría
    create_audit_sql = f"""
    CREATE TABLE IF NOT EXISTS {SNOWFLAKE_SCHEMA_RAW}.{audit_table} (
        SERVICE STRING,
        YEAR NUMBER,
        MONTH NUMBER,
        STATUS STRING,
        ROW_COUNT NUMBER,
        ERROR_MESSAGE STRING,
        RUN_ID STRING,
        INGESTED_AT_UTC TIMESTAMP_NTZ
    );
    """
    cur.execute(create_audit_sql)
    print(f"Tabla de auditoría {audit_table} verificada/creada")
    
    cur.close()
    return table_name, audit_table


In [73]:
def month_already_loaded(conn, audit_table: str, service: str, year: int, month: int) -> bool:
    query = f"""
    SELECT 1
    FROM {SNOWFLAKE_SCHEMA_RAW}.{audit_table}
    WHERE SERVICE = %s
      AND YEAR = %s
      AND MONTH = %s
      AND STATUS = 'ok'
    LIMIT 1
    """
    cur = conn.cursor()
    cur.execute(query, (service, year, month))
    result = cur.fetchone()
    cur.close()
    return result is not None


In [74]:
def ingest_service(service: str, start_year: int, end_year: int):
    print(f"\n{'='*80}")
    print(f"INICIANDO INGESTA: {service.upper()}")
    print(f"{'='*80}")
    
    conn = get_snowflake_conn()
    table_name, audit_table = create_tables_if_not_exist(conn, service)
    
    ingested_at = datetime.utcnow()
    headers = {
        "User-Agent": "Mozilla/5.0",
        "Accept": "application/octet-stream",
    }
    
    total_months_processed = 0
    total_months_skipped = 0
    total_rows_ingested = 0
    
    try:
        for year in range(start_year, end_year + 1):
            for month in range(1, 13):
                # Verificar idempotencia
                if month_already_loaded(conn, audit_table, service, year, month):
                    print(f"[SKIP] {service} {year}-{month:02d} ya cargado (idempotencia)")
                    total_months_skipped += 1
                    continue
                
                url = build_url(service, year, month)
                print(f"\n[{year}-{month:02d}] Descargando desde: {url}")
                
                row_count = 0
                status = "ok"
                error_msg = None
                
                try:
                    # Descargar archivo
                    resp = requests.get(url, headers=headers, timeout=120)
                    
                    if resp.status_code == 404 or resp.status_code == 403:
                        print(f"[{year}-{month:02d}] Archivo no disponible (HTTP {resp.status_code})")
                        status = "skip"
                        error_msg = f"Archivo no disponible (HTTP {resp.status_code})"
                        
                        # Registrar en auditoría
                        cur = conn.cursor()
                        cur.execute(f"""
                            INSERT INTO {SNOWFLAKE_SCHEMA_RAW}.{audit_table}
                            (SERVICE, YEAR, MONTH, STATUS, ROW_COUNT, ERROR_MESSAGE, RUN_ID, INGESTED_AT_UTC)
                            VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                        """, (service, year, month, status, 0, error_msg, RUN_ID, ingested_at))
                        cur.close()
                        total_months_skipped += 1
                        continue
                    
                    resp.raise_for_status()
                    
                    # Leer Parquet en chunks
                    parquet_file = pq.ParquetFile(BytesIO(resp.content))
                    total_rows = parquet_file.metadata.num_rows
                    total_batches = (total_rows // BATCH_SIZE) + (1 if total_rows % BATCH_SIZE else 0)
                    
                    print(f"[{year}-{month:02d}] Total filas: {total_rows:,} en {total_batches} batch(es)")
                    
                    # Procesar por batches
                    for i, batch in enumerate(parquet_file.iter_batches(batch_size=BATCH_SIZE), start=1):
                        df = batch.to_pandas()
                        
                        # Agregar metadatos
                        df["RUN_ID"] = RUN_ID
                        df["INGESTED_AT_UTC"] = ingested_at
                        df["SOURCE_YEAR"] = year
                        df["SOURCE_MONTH"] = month
                        df["CHUNK_ID"] = f"{i}/{total_batches}"
                        
                        # Normalizar nombres de columnas
                        df = df.rename(columns=str.upper)
                        
                        # Convertir timestamps
                        timestamp_cols = []
                        if service == "yellow":
                            timestamp_cols = ["TPEP_PICKUP_DATETIME", "TPEP_DROPOFF_DATETIME"]
                        else:
                            timestamp_cols = ["LPEP_PICKUP_DATETIME", "LPEP_DROPOFF_DATETIME"]
                        
                        timestamp_cols.append("INGESTED_AT_UTC")
                        
                        for col in timestamp_cols:
                            if col in df.columns:
                                df[col] = pd.to_datetime(df[col], errors="coerce").dt.strftime("%Y-%m-%d %H:%M:%S")
                        
                        # Escribir a Snowflake
                        success, nchunks, nrows, _ = write_pandas(
                            conn,
                            df,
                            table_name=table_name,
                            auto_create_table=False,
                            overwrite=False,
                            quote_identifiers=True,
                        )
                        
                        row_count += len(df)
                        print(f"Batch {i}/{total_batches}: {len(df):,} filas escritas (acum: {row_count:,})")
                        
                        # Limpiar memoria
                        del df
                        gc.collect()
                        time.sleep(0.5)
                    
                    # Registrar éxito en auditoría
                    cur = conn.cursor()
                    cur.execute(f"""
                        INSERT INTO {SNOWFLAKE_SCHEMA_RAW}.{audit_table}
                        (SERVICE, YEAR, MONTH, STATUS, ROW_COUNT, ERROR_MESSAGE, RUN_ID, INGESTED_AT_UTC)
                        VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                    """, (service, year, month, status, row_count, None, RUN_ID, ingested_at))
                    cur.close()
                    
                    total_months_processed += 1
                    total_rows_ingested += row_count
                    print(f"[{year}-{month:02d}] Completado: {row_count:,} filas ingestadas")
                    
                except Exception as e:
                    error_msg = str(e)
                    print(f"[{year}-{month:02d}] ERROR: {error_msg}")
                    
                    # Registrar fallo en auditoría
                    cur = conn.cursor()
                    cur.execute(f"""
                        INSERT INTO {SNOWFLAKE_SCHEMA_RAW}.{audit_table}
                        (SERVICE, YEAR, MONTH, STATUS, ROW_COUNT, ERROR_MESSAGE, RUN_ID, INGESTED_AT_UTC)
                        VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                    """, (service, year, month, "fail", 0, error_msg, RUN_ID, ingested_at))
                    cur.close()
                
                time.sleep(1)  # Rate limiting
        
        # Resumen
        print(f"\n{'='*80}")
        print(f"RESUMEN INGESTA {service.upper()}")
        print(f"{'='*80}")
        print(f"Meses procesados: {total_months_processed}")
        print(f"Meses omitidos: {total_months_skipped}")
        print(f"Total filas ingestadas: {total_rows_ingested:,}")
        print(f"RUN_ID: {RUN_ID}")
        
    finally:
        conn.close()


In [75]:
print(f"\nINICIANDO PROCESO DE INGESTA COMPLETA")
print(f"Servicios a procesar: {SERVICES}")

for service in SERVICES:
    service = service.strip().lower()
    if service not in ["yellow", "green"]:
        print(f"Servicio '{service}' no válido. Solo se permiten 'yellow' o 'green'")
        continue
    
    try:
        ingest_service(service, START_YEAR, END_YEAR)
    except Exception as e:
        print(f"Error crítico en servicio {service}: {e}")
        continue

print(f"\n{'='*80}")
print(f"PROCESO DE INGESTA COMPLETADO")
print(f"{'='*80}")

# %% Verificación final: Conteo por tabla
print("\n VERIFICACIÓN FINAL: Conteo de registros")
conn = get_snowflake_conn()
cur = conn.cursor()

for service in SERVICES:
    service = service.strip().lower()
    if service == "yellow":
        table = "YELLOW_TAXIS"
    elif service == "green":
        table = "GREEN_TAXIS"
    else:
        continue
    
    try:
        cur.execute(f"""
            SELECT 
                SOURCE_YEAR,
                SOURCE_MONTH,
                COUNT(*) as row_count
            FROM {SNOWFLAKE_SCHEMA_RAW}.{table}
            GROUP BY SOURCE_YEAR, SOURCE_MONTH
            ORDER BY SOURCE_YEAR, SOURCE_MONTH
        """)
        
        results = cur.fetchall()
        print(f"\n{service.upper()} - Distribución por año/mes:")
        for row in results:
            print(f"  {row[0]}-{row[1]:02d}: {row[2]:,} filas")
    except Exception as e:
        print(f" Error verificando {service}: {e}")

cur.close()
conn.close()



INICIANDO PROCESO DE INGESTA COMPLETA
Servicios a procesar: ['yellow', 'green']

INICIANDO INGESTA: YELLOW
Tabla YELLOW_TAXIS verificada/creada
Tabla de auditoría AUDIT_YELLOW verificada/creada
[SKIP] yellow 2017-01 ya cargado (idempotencia)
[SKIP] yellow 2017-02 ya cargado (idempotencia)
[SKIP] yellow 2017-03 ya cargado (idempotencia)
[SKIP] yellow 2017-04 ya cargado (idempotencia)
[SKIP] yellow 2017-05 ya cargado (idempotencia)
[SKIP] yellow 2017-06 ya cargado (idempotencia)
[SKIP] yellow 2017-07 ya cargado (idempotencia)
[SKIP] yellow 2017-08 ya cargado (idempotencia)
[SKIP] yellow 2017-09 ya cargado (idempotencia)
[SKIP] yellow 2017-10 ya cargado (idempotencia)
[SKIP] yellow 2017-11 ya cargado (idempotencia)
[SKIP] yellow 2017-12 ya cargado (idempotencia)
[SKIP] yellow 2018-01 ya cargado (idempotencia)
[SKIP] yellow 2018-02 ya cargado (idempotencia)

[2018-03] Descargando desde: https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2018-03.parquet
[2018-03] Total filas: 