# Prueba Ingeniería de Datos - Pipeline por Microbatches

Este notebook implementa un pipeline que:

- Ingiera archivos CSV en **microbatches**.
- Inserte los registros en **PostgreSQL**.
- Mantenga estadísticas **incrementales** (`count`, `mean`, `min`, `max`) sin recalcular sobre toda la tabla.
- Ejecute validación con `validation.csv`.

**Entorno usado:**
- Python 3
- PostgreSQL (contenedor Docker local)
- Librerías: pandas, SQLAlchemy, python-dotenv

> Nota: Aunque los archivos CSV de este reto tienen pocas filas (máx. 40 registros),
> el pipeline está diseñado con `chunksize` para simular un escenario de Big Data,
> asegurando que nunca se cargan todos los archivos en memoria de manera simultánea.



In [1]:
#  Librerías principales
import os, glob, re
from dotenv import load_dotenv
import pandas as pd
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, Float, String, text, insert

# Extensión SQL en notebooks
%load_ext sql

# Configuración de ipython-sql
%config SqlMagic.autopandas = False   # False = salida en tabla (SQL puro)
%config SqlMagic.displaycon = False   # no mostrar la URL de conexión


#  Cargar credenciales desde .env
load_dotenv()
DB_USER = os.getenv("DB_USER")
DB_PASS = os.getenv("DB_PASS")
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = os.getenv("DB_PORT", "5432")
DB_NAME = os.getenv("DB_NAME")

#  Construir URL de conexión
DB_URL = f"postgresql+psycopg2://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

#  Conectar %sql a PostgreSQL
%sql $DB_URL

#  Crear motor de conexión SQLAlchemy
engine = create_engine(DB_URL, echo=False)
metadata = MetaData()

#  Verificar conexión
with engine.connect() as conn:
    test = conn.execute(text("SELECT 1")).fetchone()
    print("Test conexión con SQLAlchemy:", test[0])

print(" Conexión configurada correctamente (SQLAlchemy + %sql)")


Test conexión con SQLAlchemy: 1
 Conexión configurada correctamente (SQLAlchemy + %sql)


In [2]:
# CELDA 3: DEFINIR MOTOR Y ESQUEMA EN LA BD
# Definimos dos tablas:
# 1) transactions -> guarda fila por fila los eventos
# 2) pipeline_stats -> una fila con estadísticas acumuladas (total_count, sum_price, min, max, mean)

engine = create_engine(DB_URL, echo=False)  # echo=True si quieres ver SQL generado
metadata = MetaData()

# tabla principal de transacciones
transactions = Table(
    "transactions",
    metadata,
    Column("id", Integer, primary_key=True, autoincrement=True),
    Column("timestamp", String, nullable=False),  # guardamos como string ISO para simplicidad
    Column("price", Float, nullable=False),
    Column("user_id", String, nullable=False)
)

# tabla de estadísticas incrementales (una sola fila con id=1)
pipeline_stats = Table(
    "pipeline_stats",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("total_count", Integer, nullable=False, default=0),
    Column("sum_price", Float, nullable=False, default=0.0),
    Column("min_price", Float, nullable=True),
    Column("max_price", Float, nullable=True),
    Column("mean_price", Float, nullable=True),
    Column("last_updated", String, nullable=True)  # guardamos timestamp en texto (ISO)
)

# crear tablas si no existen
metadata.create_all(engine)
print("Tablas 'transactions' y 'pipeline_stats' creadas (si no existían).")


Tablas 'transactions' y 'pipeline_stats' creadas (si no existían).


In [3]:
# CELDA 4: INICIALIZAR FILA DE STATS (id = 1)
# Esto asegura que exista una fila con id=1 que mantendrá los acumulados.

def ensure_stats_row():
    with engine.begin() as conn:
        res = conn.execute(text("SELECT id FROM pipeline_stats WHERE id = 1")).fetchone()
        if not res:
            conn.execute(text("""
                INSERT INTO pipeline_stats (id, total_count, sum_price, min_price, max_price, mean_price, last_updated)
                VALUES (1, 0, 0.0, NULL, NULL, NULL, now())
            """))
            print("Fila inicial en pipeline_stats creada.")
        else:
            print("Fila inicial ya existe.")

ensure_stats_row()


Fila inicial ya existe.


In [4]:
# CELDA 5: FUNCIONES PARA ACTUALIZACIÓN INCREMENTAL DE STATS
# update_stats_db actualizará la fila en pipeline_stats sumando los acumulados del "chunk".
# Importante: NO hacemos SELECT AVG(...) sobre 'transactions'. Solo actualizamos la fila de stats.

def update_stats_db(conn, chunk_count, chunk_sum, chunk_min, chunk_max):
    """
    Recibe valores del chunk y actualiza la fila pipeline_stats (id=1) de forma incremental.
    - conn: conexión SQLAlchemy dentro de engine.begin()
    - chunk_count: número de filas del chunk (int)
    - chunk_sum: suma de precios del chunk (float)
    - chunk_min: min del chunk (float)
    - chunk_max: max del chunk (float)
    """
    # Obtener valores actuales
    row = conn.execute(text("SELECT total_count, sum_price, min_price, max_price FROM pipeline_stats WHERE id=1")).fetchone()
    old_count = int(row[0]) if row and row[0] is not None else 0
    old_sum = float(row[1]) if row and row[1] is not None else 0.0
    old_min = float(row[2]) if row and row[2] is not None else float("inf")
    old_max = float(row[3]) if row and row[3] is not None else float("-inf")

    # Combinar acumulados
    new_count = old_count + int(chunk_count)
    new_sum = old_sum + float(chunk_sum)
    new_min = min(old_min, float(chunk_min)) if chunk_min is not None else (None if old_min == float("inf") else old_min)
    new_max = max(old_max, float(chunk_max)) if chunk_max is not None else (None if old_max == float("-inf") else old_max)
    new_mean = new_sum / new_count if new_count > 0 else None

    # Normalizar min/max para guardar NULL si no hay valores válidos
    min_val = None if new_min == float("inf") else new_min
    max_val = None if new_max == float("-inf") else new_max

    # Actualizar la fila
    conn.execute(text("""
        UPDATE pipeline_stats
        SET total_count = :new_count,
            sum_price = :new_sum,
            min_price = :min_val,
            max_price = :max_val,
            mean_price = :new_mean,
            last_updated = now()
        WHERE id = 1
    """), {"new_count": new_count, "new_sum": new_sum, "min_val": min_val, "max_val": max_val, "new_mean": new_mean})

    # Devolver valores para impresión / verificación
    return {"total_count": new_count, "sum_price": new_sum, "min_price": min_val, "max_price": max_val, "mean_price": new_mean}


In [5]:
# CELDA 6: LISTAR Y ORDENAR ARCHIVOS CSV (excluir validation.csv)
# Asume carpeta 'data/' al mismo nivel que 'notebooks/'

data_folder = "../data"  
if not os.path.isdir(data_folder):
    data_folder = "data"  

all_csv = glob.glob(os.path.join(data_folder, "*.csv"))

# ordenar por número en el nombre de archivo (ej: 2012-1, 2012-2, ...)
def sort_key(path):
    name = os.path.basename(path)
    nums = re.findall(r'\d+', name)
    return tuple(map(int, nums)) if nums else (9999,)

all_csv_sorted = sorted(all_csv, key=sort_key)

# separar validation.csv
train_files = [f for f in all_csv_sorted if os.path.basename(f).lower() != "validation.csv"]
validation_file = next((f for f in all_csv_sorted if os.path.basename(f).lower() == "validation.csv"), None)

print("Archivos de entrenamiento (ordenados):")
for f in train_files:
    print(" -", f)
print("Archivo validation:", validation_file)


Archivos de entrenamiento (ordenados):
 - ../data\2012-1.csv
 - ../data\2012-2.csv
 - ../data\2012-3.csv
 - ../data\2012-4.csv
 - ../data\2012-5.csv
Archivo validation: ../data\validation.csv


In [6]:
# CELDA 7: FUNCIÓN PRINCIPAL PARA PROCESAR CSV POR CHUNKS
# - chunk_size: número de filas leídas por pandas en cada iteración
# - update_per: 'chunk' (por chunk) o 'row' (actualizar stats fila por fila; más lento)

def process_csv_file(file_path, chunk_size=10, update_per='chunk'):
    """
    Procesa un archivo CSV en microbatches (chunks).
    Inserta los registros en 'transactions' y actualiza la fila 'pipeline_stats' incrementalmente.
    """
    print(f"\nProcesando archivo: {file_path}")
    reader = pd.read_csv(file_path, chunksize=chunk_size)

    total_chunks = 0
    for chunk in reader:
        total_chunks += 1
        # --- limpieza / transformaciones básicas ---
        # parse timestamp a formato uniforme (ISO-like), convertir price a float
        chunk['timestamp'] = pd.to_datetime(chunk['timestamp'], errors='coerce')
        chunk = chunk.dropna(subset=['timestamp', 'price', 'user_id'])  # quitar filas con nulls críticos
        if chunk.empty:
            continue

        # convertir timestamp a string legible para la BD (YYYY-MM-DD HH:MM:SS)
        chunk['timestamp'] = chunk['timestamp'].dt.strftime('%Y-%m-%d %H:%M:%S')
        chunk['price'] = pd.to_numeric(chunk['price'], errors='coerce')
        chunk = chunk.dropna(subset=['price'])
        if chunk.empty:
            continue

        # preparar registros para inserción (lista de dicts)
        records = chunk.to_dict(orient='records')

        # calcular agregados del chunk (para actualizar stats)
        chunk_count = len(records)
        chunk_sum = float(chunk['price'].sum())
        chunk_min = float(chunk['price'].min())
        chunk_max = float(chunk['price'].max())

        # --- inserción y actualización de stats en la misma transacción ---
        with engine.begin() as conn:
            # insertar registros (executemany)
            conn.execute(transactions.insert(), records)

            # actualizar stats: por chunk (recomendado) o por fila (opción)
            if update_per == 'row':
                # opción "por fila" (más costoso): actualizar por cada registro
                for r in records:
                    update_stats_db(conn, 1, r['price'], r['price'], r['price'])
            else:
                # opción eficiente: actualizar usando los agregados del chunk
                update_stats_db(conn, chunk_count, chunk_sum, chunk_min, chunk_max)

        # después de la transacción, podemos leer la fila stats para mostrar estado
        with engine.connect() as conn2:
            stats_row = conn2.execute(text("SELECT total_count, mean_price, min_price, max_price FROM pipeline_stats WHERE id=1")).fetchone()
            print(f"  Chunk {total_chunks} procesado. Stats (DB): count={stats_row[0]}, mean={stats_row[1]}, min={stats_row[2]}, max={stats_row[3]}")

    print(f"Finalizado archivo: {file_path}. Total chunks: {total_chunks}")


In [7]:
# CELDA 8: EJECUTAR INGESTA SOBRE LOS 5 ARCHIVOS PRINCIPALES

for f in train_files:
    process_csv_file(f, chunk_size=10, update_per='chunk')

print("\n Carga de archivos principales finalizada.")



Procesando archivo: ../data\2012-1.csv
  Chunk 1 procesado. Stats (DB): count=451, mean=57.13968957871397, min=10.0, max=100.0
  Chunk 2 procesado. Stats (DB): count=460, mean=57.05434782608695, min=10.0, max=100.0
  Chunk 3 procesado. Stats (DB): count=461, mean=57.12147505422993, min=10.0, max=100.0
Finalizado archivo: ../data\2012-1.csv. Total chunks: 3

Procesando archivo: ../data\2012-2.csv
  Chunk 1 procesado. Stats (DB): count=471, mean=57.2208067940552, min=10.0, max=100.0
  Chunk 2 procesado. Stats (DB): count=481, mean=56.91060291060291, min=10.0, max=100.0
  Chunk 3 procesado. Stats (DB): count=490, mean=56.98571428571429, min=10.0, max=100.0
Finalizado archivo: ../data\2012-2.csv. Total chunks: 3

Procesando archivo: ../data\2012-3.csv
  Chunk 1 procesado. Stats (DB): count=500, mean=56.94, min=10.0, max=100.0
  Chunk 2 procesado. Stats (DB): count=510, mean=57.11960784313725, min=10.0, max=100.0
  Chunk 3 procesado. Stats (DB): count=520, mean=57.13076923076923, min=10.0,

In [8]:
# CELDA 9: IMPRIMIR ESTADÍSTICAS ACTUALES (desde pipeline_stats)
with engine.connect() as conn:
    row = conn.execute(text("SELECT total_count, sum_price, mean_price, min_price, max_price, last_updated FROM pipeline_stats WHERE id=1")).fetchone()
    print("ESTADÍSTICAS (desde pipeline_stats)")
    print("Total rows:", row[0])
    print("Suma precios:", row[1])
    print("Media (mean):", row[2])
    print("Min:", row[3])
    print("Max:", row[4])
    print("Última actualización (DB):", row[5])


ESTADÍSTICAS (desde pipeline_stats)
Total rows: 580
Suma precios: 33186.0
Media (mean): 57.217241379310344
Min: 10.0
Max: 100.0
Última actualización (DB): 2025-10-01 06:43:58.929033+00


In [9]:
# CELDA 10: EJECUTAR validation.csv por el mismo pipeline y mostrar cambios
if validation_file:
    print("Ejecutando validation.csv ahora (se aplicará la misma lógica)...")
    process_csv_file(validation_file, chunk_size=10, update_per='chunk')
    print("Validación finalizada.")
else:
    print("No se encontró validation.csv en data/.")


Ejecutando validation.csv ahora (se aplicará la misma lógica)...

Procesando archivo: ../data\validation.csv
  Chunk 1 procesado. Stats (DB): count=588, mean=57.006802721088434, min=10.0, max=100.0
Finalizado archivo: ../data\validation.csv. Total chunks: 1
Validación finalizada.


In [10]:
# CELDA 11: CONSULTAS FINALES: comparar total en transactions vs pipeline_stats
with engine.connect() as conn:
    res1 = conn.execute(text("SELECT COUNT(*) FROM transactions")).fetchone()
    stats = conn.execute(text("SELECT total_count, mean_price, min_price, max_price FROM pipeline_stats WHERE id=1")).fetchone()
    print("COUNT en transactions:", res1[0])
    print("pipeline_stats.total_count:", stats[0])
    print("pipeline_stats.mean_price:", stats[1])
    print("pipeline_stats.min_price:", stats[2])
    print("pipeline_stats.max_price:", stats[3])

    # mostrar algunas filas ejemplo
    sample = conn.execute(text("SELECT id, timestamp, price, user_id FROM transactions ORDER BY id DESC LIMIT 5")).fetchall()
    print("\nÚltimas 5 filas insertadas (ejemplo):")
    for r in sample:
        print(r)


COUNT en transactions: 588
pipeline_stats.total_count: 588
pipeline_stats.mean_price: 57.006802721088434
pipeline_stats.min_price: 10.0
pipeline_stats.max_price: 100.0

Últimas 5 filas insertadas (ejemplo):
(588, '2012-06-08 00:00:00', 86.0, '7')
(587, '2012-06-07 00:00:00', 13.0, '4')
(586, '2012-06-06 00:00:00', 62.0, '3')
(585, '2012-06-05 00:00:00', 31.0, '10')
(584, '2012-06-04 00:00:00', 92.0, '7')


Para mayor calidad de visualización, traemos los resultados tabulados desde la base de datos creada

In [11]:
%%sql
SELECT COUNT(*) as total_rows,
       AVG(price) as avg_price,
       MIN(price) as min_price,
       MAX(price) as max_price
FROM transactions;


1 rows affected.


total_rows,avg_price,min_price,max_price
588,57.006802721088434,10.0,100.0


In [12]:

# Ejemplo 1: traer primeras 10 filas de transactions
df_sample = pd.read_sql(text("SELECT * FROM transactions LIMIT 10;"), con=engine)
print("Primeras 10 filas en la tabla 'transactions':")
display(df_sample)

# Ejemplo 2: traer estadísticas calculadas directamente con SQL
df_stats = pd.read_sql(text("""
    SELECT COUNT(*) AS total_rows,
           AVG(price) AS avg_price,
           MIN(price) AS min_price,
           MAX(price) AS max_price
    FROM transactions;
"""), con=engine)
print("\n📊 Estadísticas calculadas en SQL directamente:")
display(df_stats)


Primeras 10 filas en la tabla 'transactions':


Unnamed: 0,id,timestamp,price,user_id
0,1,2012-01-10 00:00:00,50.0,9
1,2,2012-01-11 00:00:00,87.0,10
2,3,2012-01-12 00:00:00,64.0,7
3,4,2012-01-13 00:00:00,20.0,10
4,5,2012-01-14 00:00:00,14.0,10
5,6,2012-01-15 00:00:00,95.0,8
6,7,2012-01-16 00:00:00,95.0,2
7,8,2012-01-17 00:00:00,62.0,4
8,9,2012-01-18 00:00:00,46.0,8
9,10,2012-01-19 00:00:00,97.0,2



📊 Estadísticas calculadas en SQL directamente:


Unnamed: 0,total_rows,avg_price,min_price,max_price
0,588,57.006803,10.0,100.0


## Conclusiones

- El pipeline cargó los CSV por microbatches (chunks).
- Los datos se insertaron en la tabla `transactions`.
- Las estadísticas se actualizaron de forma incremental en la tabla `pipeline_stats`.
- La validación con `validation.csv` mostró cómo cambian los valores globales.

