In [None]:
!pip -q install psycopg2-binary sqlalchemy pyarrow pandas requests
import os
from psycopg2.extras import execute_values
import os, csv
import time
import tempfile
from datetime import datetime
from urllib.request import urlretrieve
import os
import tempfile
from urllib.request import urlretrieve
from pyspark.sql.functions import lit, current_timestamp
import time
import pandas as pd
from sqlalchemy import text
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_timestamp, when, unix_timestamp, abs as spark_abs
from pyspark.sql.types import StringType, IntegerType, DoubleType, TimestampType
PG_HOST = os.getenv('PG_HOST')
PG_PORT = int(os.getenv('PG_PORT'))
PG_DB = os.getenv('PG_DB')
PG_USER = os.getenv('PG_USER')
PG_PASSWORD = os.getenv('PG_PASSWORD')
PG_SCHEMA_RAW = os.getenv('PG_SCHEMA_RAW')
PG_SCHEMA_ANALYTICS = os.getenv('PG_SCHEMA_ANALYTICS')
CHUNK_SIZE = int(os.getenv('CHUNK_SIZE', '1000000'))


In [12]:
AUDIT_DIR = "/home/jovyan/auditoria"
os.makedirs(AUDIT_DIR, exist_ok=True)  # crea la carpeta si no existe

AUDIT_CSV = os.path.join(AUDIT_DIR, "audit_ingesta.csv")

# Crear el CSV si no existe
if not os.path.exists(AUDIT_CSV):
    with open(AUDIT_CSV, "w", newline="", encoding="utf-8") as f:
        wr = csv.writer(f)
        wr.writerow(["service", "year", "month", "rows", "status ","run_tag", "ts_utc", "error"])

def _audit_rows_set(status_filter=None):
    """
    Devuelve un set de (service, year, month) ya registrados.
    Si status_filter es una lista (p.ej. ["OK","SKIPPED"]), filtra por esos estados.
    """
    if not os.path.exists(AUDIT_CSV):
        return set()
    df = pd.read_csv(AUDIT_CSV, dtype={"service": str, "year": int, "month": int, "status": str})
    if status_filter:
        df = df[df["status"].isin(status_filter)]
    return set(zip(df["service"], df["year"], df["month"]))

def is_already_loaded_csv(service: str, year: int, month: int) -> bool:
    """True si ya se registró como OK (cargado) en el CSV."""
    loaded = _audit_rows_set(status_filter=["OK"])
    return (service, year, month) in loaded

def mark_audit_csv(service: str, year: int, month: int, status: str,
                   rows: int = 0, run_tag: str = "", error: str = ""):
    """Agrega una fila de auditoría al CSV."""
    with open(AUDIT_CSV, "a", newline="", encoding="utf-8") as f:
        wr = csv.writer(f)
        wr.writerow([service, year, month, status, rows, run_tag,
                     datetime.utcnow().isoformat(), error[:500]])

print(f"Archivo de auditoría listo: {AUDIT_CSV}")

Archivo de auditoría listo: /home/jovyan/auditoria/audit_ingesta.csv


In [13]:
# Cargar variables de ambiente
PG_HOST = os.getenv('PG_HOST', 'postgres')
PG_PORT = os.getenv('PG_PORT', '5432')
PG_DB = os.getenv('PG_DB', 'nyc_tlc')
PG_USER = os.getenv('PG_USER', 'postgres')
PG_PASSWORD = os.getenv('PG_PASSWORD', 'postgres')
PG_SCHEMA_RAW = os.getenv('PG_SCHEMA_RAW', 'raw')

# Parámetros de ingesta
YEARS = list(range(2015, 2026))  
MONTHS = list(range(1, 13))      
SERVICES = ['yellow', 'green']   
RUN_TAG = os.getenv('RUN_TAG', f"ingesta_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}")

# URL base para los archivos Parquet de NYC TLC
BASE_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data"

print("Configuración cargada:")
print(f"  - Postgres: {PG_USER}@{PG_HOST}:{PG_PORT}/{PG_DB}")
print(f"  - Schema RAW: {PG_SCHEMA_RAW}")
print(f"  - Años: {YEARS[0]}-{YEARS[-1]}")
print(f"  - Servicios: {SERVICES}")
print(f"  - RUN_TAG: {RUN_TAG}")

Configuración cargada:
  - Postgres: root@postgres:5432/nyc_taxi
  - Schema RAW: raw
  - Años: 2015-2025
  - Servicios: ['yellow', 'green']
  - RUN_TAG: ingesta_20251110_172710


In [14]:
spark = (
    SparkSession.builder
    .appName("NYC_TLC_Ingesta_Raw")
    .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0")
    .config("spark.driver.memory", "4g")
    .config("spark.executor.memory", "4g")
    .config("spark.sql.adaptive.enabled", "true")
    .config("spark.driver.extraClassPath", "/usr/local/spark/jars/postgresql-42.6.0.jar")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

print(f"Spark {spark.version} inicializado correctamente")

Spark 3.5.0 inicializado correctamente


In [15]:
def compose_url(service: str, year: int, month: int) -> str:
    filename = f"{service}_tripdata_{year}-{month:02d}.parquet"
    return f"{BASE_URL}/{filename}"

def download_to_temp(url: str) -> str:
    temp_dir = tempfile.gettempdir()
    filename = url.split('/')[-1]
    local_path = os.path.join(temp_dir, filename)
    if not os.path.exists(local_path):
        print(f"  Descargando: {filename}...", end=" ")
        urlretrieve(url, local_path)
        print("OK")
    else:
        print(f"  Usando caché local: {filename}")
    return local_path

def get_postgres_jdbc_url() -> str:
    return f"jdbc:postgresql://{PG_HOST}:{PG_PORT}/{PG_DB}"

def get_postgres_properties() -> dict:
    return {
        "user": PG_USER,
        "password": PG_PASSWORD,
        "driver": "org.postgresql.Driver"
    }

print("Funciones de utilidad cargadas correctamente")

Funciones de utilidad cargadas correctamente


In [16]:
def standardize_columns(df, service: str):
    column_mapping = {
        'VendorID': 'VendorID',
        'RatecodeID': 'RatecodeID',
        'PULocationID': 'PULocationID',
        'DOLocationID': 'DOLocationID',
        'passenger_count': 'passenger_count',
        'trip_distance': 'trip_distance',
        'fare_amount': 'fare_amount',
        'extra': 'extra',
        'mta_tax': 'mta_tax',
        'tip_amount': 'tip_amount',
        'tolls_amount': 'tolls_amount',
        'improvement_surcharge': 'improvement_surcharge',
        'total_amount': 'total_amount',
        'payment_type': 'payment_type',
        'congestion_surcharge': 'congestion_surcharge',
        'airport_fee': 'airport_fee',
        'store_and_fwd_flag': 'store_and_fwd_flag',
        'CBD_CONGESTION_FEE': 'CBD_CONGESTION_FEE',
        'cbd_congestion_fee': 'cbd_congestion_fee',

    }
    for old_name, new_name in column_mapping.items():
        if old_name in df.columns and old_name != new_name:
            df = df.withColumnRenamed(old_name, new_name)
    if service == 'yellow':
        if 'tpep_pickup_datetime' in df.columns:
            df = df.withColumn('tpep_pickup_datetime', col('tpep_pickup_datetime').cast(TimestampType()))
        if 'tpep_dropoff_datetime' in df.columns:
            df = df.withColumn('tpep_dropoff_datetime', col('tpep_dropoff_datetime').cast(TimestampType()))
    elif service == 'green':
        if 'lpep_pickup_datetime' in df.columns:
            df = df.withColumn('lpep_pickup_datetime', col('lpep_pickup_datetime').cast(TimestampType()))
        if 'lpep_dropoff_datetime' in df.columns:
            df = df.withColumn('lpep_dropoff_datetime', col('lpep_dropoff_datetime').cast(TimestampType()))
        if 'trip_type' in df.columns:
            df = df.withColumn('trip_type', col('trip_type').cast(IntegerType()))
    numeric_int_cols = ['VendorID', 'RatecodeID', 'PULocationID', 'DOLocationID', 'passenger_count', 'payment_type']
    numeric_double_cols = ['trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount',
                           'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'airport_fee', 'CBD_CONGESTION_FEE']
    for col_name in numeric_int_cols:
        if col_name in df.columns:
            df = df.withColumn(col_name, col(col_name).cast(IntegerType()))
    for col_name in numeric_double_cols:
        if col_name in df.columns:
            df = df.withColumn(col_name, col(col_name).cast(DoubleType()))
    if 'store_and_fwd_flag' in df.columns:
        df = df.withColumn('store_and_fwd_flag', col('store_and_fwd_flag').cast(StringType()))
    if 'CBD_CONGESTION_FEE' in df.columns and 'cbd_congestion_fee' not in df.columns:
        df = df.withColumnRenamed('CBD_CONGESTION_FEE', 'cbd_congestion_fee')
    if 'cbd_congestion_fee' not in df.columns:
        df = df.withColumn('cbd_congestion_fee', lit(None).cast(DoubleType()))
    return df

print("Función de estandarización cargada")

Función de estandarización cargada


In [17]:
def add_metadata(df, year: int, month: int, run_tag: str):
    """Agrega columnas de metadatos para trazabilidad de la ingesta."""
    return (
        df
        .withColumn('run_tag', lit(run_tag))
        .withColumn('source_year', lit(year))
        .withColumn('source_month', lit(month))
        .withColumn('ingested_at_utc', current_timestamp())
    )

print("Función de metadatos cargada")


Función de metadatos cargada


In [18]:
def write_batch(df, fact_table: str, mode: str = "append", writers: int = 4, batchsize: int = 5000):
    df_to_write = df.coalesce(writers) 

    (df_to_write.write
        .format("jdbc")
        .option("url", get_postgres_jdbc_url())
        .option("dbtable", fact_table)
        .option("user", PG_USER)
        .option("password", PG_PASSWORD)
        .option("driver", "org.postgresql.Driver")
        .option("batchsize", str(batchsize))   
        .mode(mode)
        .save()
    )

print("Función de escritura a Postgres cargada")

Función de escritura a Postgres cargada


In [19]:
def create_schemas():
    """Crea los esquemas raw y analytics si no existen."""
    conn = psycopg2.connect(
        host=PG_HOST,
        port=PG_PORT,
        database=PG_DB,
        user=PG_USER,
        password=PG_PASSWORD
    )
    conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
    cursor = conn.cursor()
    cursor.execute(f"CREATE SCHEMA IF NOT EXISTS {PG_SCHEMA_RAW};")
    cursor.execute("CREATE SCHEMA IF NOT EXISTS analytics;")
    print(f"Esquemas '{PG_SCHEMA_RAW}' y 'analytics' verificados/creados")
    cursor.close()
    conn.close()

# Ejecutar creación de esquemas
create_schemas()

Esquemas 'raw' y 'analytics' verificados/creados


In [20]:
total_rows = 0
total_errors = 0
ingestion_log = []

print("="*80)
print("» Inicio de proceso")
print("="*80)

for service in SERVICES:
    table = f"{PG_SCHEMA_RAW}.{service}_taxi_trip"
    print(f"\n{'='*80}")
    print(f"SERVICIO: {service.upper()} → {table}")
    print(f"{'='*80}\n")

    service_rows = 0

    for y in YEARS:
        for m in MONTHS:

            # 0) Verificar si ya fue cargado (CSV)
            if is_already_loaded_csv(service, y, m):
                print(f"[{y}-{m:02d}] Ya marcado como OK en auditoría CSV → saltando")
                continue

            try:
                t0 = time.time()
                print(f"[{y}-{m:02d}] Procesando {service}...")

                # 1) Descargar y leer Parquet
                tmp_path = download_to_temp(compose_url(service, y, m))
                raw_df = spark.read.parquet(tmp_path)
                raw_count = raw_df.count()
                print(f"    Filas descargadas: {raw_count:,}")

                # 2) Estandarizar columnas
                df = standardize_columns(raw_df, service)

                # 3) (sin limpieza) usar el DF tal cual
                df_final = add_metadata(df, y, m, RUN_TAG).withColumn("service_type", lit(service))
                clean_count = raw_count

                # 4) Escribir a Postgres
                print(f"    Escribiendo a {table}...", end=" ")
                write_batch(df_final, table, mode="append", writers=4, batchsize=5000)
                print("OK")

                # 5) Estadísticas
                service_rows += clean_count
                total_rows += clean_count
                dt = time.time() - t0

                print(f"    [OK] {clean_count:,} filas insertadas en {dt:.1f}s\n")

                # 6) Marcar auditoría como OK
                mark_audit_csv(service, y, m, status="OK", rows=clean_count, run_tag=RUN_TAG)

            except Exception as e:
                total_errors += 1
                print(f"    [ERROR] {e}\n")
                mark_audit_csv(service, y, m, status="ERROR", rows=0, run_tag=RUN_TAG, error=str(e))

    print(f"\n{'='*80}")
    print("» Resumen de ejecución del servicio")
    print(f"  Filas insertadas: {service_rows:,}")
    print(f"{'='*80}\n")

# RESUMEN 
print("\n" + "="*80)
print("» RESUMEN GLOBAL DE EJECUCIÓN")
print("="*80)
print(f"Total filas insertadas: {total_rows:,}")
print(f"Total errores: {total_errors}")
print(f"RUN_TAG: {RUN_TAG}")
print("="*80)

log_df = pd.read_csv(AUDIT_CSV)
print("\nLog de auditoría cargado en variable 'log_df'")

» Inicio de proceso

SERVICIO: YELLOW → raw.yellow_taxi_trip

[2015-01] Ya marcado como OK en auditoría CSV → saltando
[2015-02] Ya marcado como OK en auditoría CSV → saltando
[2015-03] Procesando yellow...
  Usando caché local: yellow_tripdata_2015-03.parquet
    Filas descargadas: 13,342,951
    Escribiendo a raw.yellow_taxi_trip... OK
    [OK] 13,342,951 filas insertadas en 209.6s

[2015-04] Ya marcado como OK en auditoría CSV → saltando
[2015-05] Ya marcado como OK en auditoría CSV → saltando
[2015-06] Ya marcado como OK en auditoría CSV → saltando
[2015-07] Ya marcado como OK en auditoría CSV → saltando
[2015-08] Ya marcado como OK en auditoría CSV → saltando
[2015-09] Ya marcado como OK en auditoría CSV → saltando
[2015-10] Ya marcado como OK en auditoría CSV → saltando
[2015-11] Ya marcado como OK en auditoría CSV → saltando
[2015-12] Ya marcado como OK en auditoría CSV → saltando
[2016-01] Ya marcado como OK en auditoría CSV → saltando
[2016-02] Ya marcado como OK en auditoría 

In [29]:
print(log_df)
import pandas as pd

# --- 1) Copia segura y normaliza tipos ---
df = log_df.copy()

# Asegura que year, month, rows sean numéricos
for c in ["year", "month", "rows"]:
    if c in df.columns:
        df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0).astype(int)

# --- 2) Resumen por servicio/estado ---
summary = (
    df.groupby(["service", "status"])["rows"]
      .agg(N_Meses="count", Total_Filas="sum")
      .reset_index()
      .sort_values(["service", "status"])
)

# Formato amigable (separador de miles)
summary_fmt = summary.copy()
summary_fmt["Total_Filas"] = summary_fmt["Total_Filas"].map(lambda v: f"{v:,}")

print("\n=== RESUMEN POR SERVICIO Y ESTADO ===")
try:
    # bonito si tienes tabulate instalado
    from tabulate import tabulate
    print(tabulate(summary_fmt, headers="keys", tablefmt="github", showindex=False))
except Exception:
    # fallback estándar
    print(summary_fmt.to_string(index=False))

# --- 3) Porcentaje de errores por servicio ---
pivot = (
    summary.pivot(index="service", columns="status", values="N_Meses")
           .fillna(0)
)
pivot["Total_Meses"] = pivot.sum(axis=1)
pivot["% Error"] = (pivot.get("ERROR", 0) / pivot["Total_Meses"] * 100).round(1)


    service  year  month      rows status                  run_tag  \
0    yellow  2015      1  12741035     OK  ingesta_20251110_043934   
1    yellow  2015      2  12442394     OK  ingesta_20251110_043934   
2    yellow  2015      4  13063758     OK  ingesta_20251110_043934   
3    yellow  2015      5  13157677     OK  ingesta_20251110_043934   
4    yellow  2015      6  12324936     OK  ingesta_20251110_043934   
..      ...   ...    ...       ...    ...                      ...   
285   green  2025      8        OK  46306  ingesta_20251110_172710   
286   green  2025      9        OK  48893  ingesta_20251110_172710   
287   green  2025     10     ERROR      0  ingesta_20251110_172710   
288   green  2025     11     ERROR      0  ingesta_20251110_172710   
289   green  2025     12     ERROR      0  ingesta_20251110_172710   

                         ts_utc                      error  
0          2025-11-10T04:39:37Z                        NaN  
1          2025-11-10T04:42:59Z      

In [None]:
# URL del archivo taxi_zone_lookup
ZONE_LOOKUP_URL = "https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv"

print("Descargando Taxi Zone Lookup...")
try:
    # Descargar CSV
    temp_dir = tempfile.gettempdir()
    zone_path = os.path.join(temp_dir, "taxi_zone_lookup.csv")
    urlretrieve(ZONE_LOOKUP_URL, zone_path)

    # Leer con Spark
    zones_df = spark.read.csv(zone_path, header=True, inferSchema=True)

    # Estandarizar nombres de columnas
    zones_df = zones_df.toDF(*[c.lower().replace(' ', '_') for c in zones_df.columns])

    # Agregar metadatos
    zones_df = zones_df.withColumn('run_tag', lit(RUN_TAG)) \
                       .withColumn('ingested_at_utc', current_timestamp())

    # Contar filas
    zone_count = zones_df.count()
    print(f"Zonas cargadas: {zone_count}")

    # Escribir a Postgres (overwrite para mantener actualizado)
    zones_table = f"{PG_SCHEMA_RAW}.taxi_zone_lookup"
    write_batch(zones_df, zones_table, mode="overwrite")

    print(f"Taxi Zone Lookup insertado en {zones_table}")

    # Mostrar preview
    zones_df.show(10, truncate=False)

except Exception as e:
    print("[error] revisar detalle")


In [22]:
def get_table_count(fact_table: str) -> int:
    """Obtiene el conteo de filas de una tabla en Postgres."""
    jdbc_url = get_postgres_jdbc_url()
    properties = get_postgres_properties()
    query = f"(SELECT COUNT(*) as count FROM {fact_table}) as subquery"
    df = spark.read.jdbc(url=jdbc_url, table=query, properties=properties)
    return df.collect()[0]['count']

print("\n" + "="*80)
print("VALIDACIÓN FINAL: Conteos en Postgres")
print("="*80)

try:
    yellow_count = get_table_count(f"{PG_SCHEMA_RAW}.yellow_taxi_trip")
    print(f"\n{PG_SCHEMA_RAW}.yellow_taxi_trip: {yellow_count:,} filas")
except Exception as e:
    print(f"Error leyendo yellow_taxi_trip: {e}")
    yellow_count = 0

try:
    green_count = get_table_count(f"{PG_SCHEMA_RAW}.green_taxi_trip")
    print(f"{PG_SCHEMA_RAW}.green_taxi_trip: {green_count:,} filas")
except Exception as e:
    print(f"Error leyendo green_taxi_trip: {e}")
    green_count = 0

try:
    zones_count = get_table_count(f"{PG_SCHEMA_RAW}.taxi_zone_lookup")
    print(f"{PG_SCHEMA_RAW}.taxi_zone_lookup: {zones_count:,} filas")
except Exception as e:
    print(f"Error leyendo taxi_zone_lookup: {e}")
    zones_count = 0

print(f"\nTOTAL TRIPS: {yellow_count + green_count:,} filas")
print("="*80)


VALIDACIÓN FINAL: Conteos en Postgres

raw.yellow_taxi_trip: 788,638,091 filas
raw.green_taxi_trip: 68,094,490 filas
raw.taxi_zone_lookup: 265 filas

TOTAL TRIPS: 856,732,581 filas
