# üó∫Ô∏èüì° Ingesta GTFS / GTFS-RT (LATAM)


## üìö Qu√© son
- **GTFS (General Transit Feed Specification):** datos est√°ticos de rutas, trips, stops, calendar, shapes.  
- **GTFS-Realtime:** posiciones de veh√≠culos, alertas y actualizaciones de viajes.


## üîç Particularidades en LATAM
- Fuentes municipales/estatales con disponibilidad variable y formatos comprimidos (ZIP).  
- Posibles inconsistencias de `stop_times.txt` y `calendar_dates.txt` (feriados locales).  
- Campos opcionales ausentes; necesidad de validaci√≥n y normalizaci√≥n.


## ‚öôÔ∏è Ingesta pr√°ctica (propuesta)
- Leer **GTFS** (`stops.txt`, `routes.txt`, `trips.txt`, `stop_times.txt`, `shapes.txt`) y validar claves (`route_id`, `trip_id`, `stop_id`).  
- Ingerir **RT** v√≠a endpoints protobuf/JSON; persistir posiciones con timestamp y `vehicle_id`.  
- Estandarizar TZ (ej. `America/Santiago`, `America/Mexico_City`) y codificaci√≥n UTF-8.


## üéØ Uso downstream
- Construir horarios programados vs observados para **puntualidad** y **regularidad (headways)**.  
- Geofences en estaciones/paraderos para **dwell time** y cobertura.


## ‚ö†Ô∏è Supuestos
- Disponibilidad de feed oficial y consentimiento para uso; respetar l√≠mites de rate y pol√≠ticas p√∫blicas locales.


In [None]:
# Configuraci√≥n del entorno
import sys
from pathlib import Path

# Agregar el directorio ra√≠z al path para imports
root_dir = Path.cwd().parent.parent
if str(root_dir) not in sys.path:
    sys.path.insert(0, str(root_dir))

print(f"‚úÖ Root directory agregado al path: {root_dir}")

‚úÖ Root directory agregado al path: f:\GitHub\supply-chain-data-notebooks


# Contexto de Negocio y Marco de Trabajo

## Empresa y situaci√≥n
Operaci√≥n de retail nacional con m√∫ltiples WMS exportando CSV diarios de inventario y movimientos. Se requiere ingesta confiable al DWH para anal√≠tica.

## Qu√© / Por qu√© / Para qu√© / Cu√°ndo / C√≥mo
- Qu√©: Ingesta batch de archivos CSV del WMS hacia tablas Parquet en el DWH.
- Por qu√©: Centralizar y estandarizar datos para consultas r√°pidas y consistentes.
- Para qu√©: Habilitar dashboards e informes operativos y de inventario.
- Cu√°ndo: Procesos nocturnos y reintentos ante archivos tard√≠os.
- C√≥mo: Validaci√≥n de esquema, enriquecimiento con fecha de ingesta y escritura en formato columna.

---
id: "DE-01"
title: "Batch ingestion from WMS CSV to Data Warehouse"
specialty: "Data Engineering"
process: "Deliver"
level: "Intermediate"
tags: ["etl", "warehouse", "inventory", "python", "sql"]
estimated_time_min: 45
---

## üéØ Contexto del Notebook

### ¬øQu√©?
Ingesta batch de datos de inventario desde archivos CSV (simulando WMS) hacia un modelo anal√≠tico.

### ¬øPor qu√©?
Los sistemas WMS generan archivos planos diarios. Consolidarlos en un warehouse permite an√°lisis hist√≥rico, tendencias y alertas.

### ¬øPara qu√©?
- Construir tablas de hechos de inventario
- Habilitar reportes de cobertura, rotaci√≥n y obsolescencia
- Base para modelos de optimizaci√≥n de stock

### ¬øCu√°ndo?
Ejecutar diariamente en horarios de baja operaci√≥n (ej: 2 AM) post-cierre de turno WMS.

### ¬øC√≥mo?
1. Leer CSV desde `data/raw/inventory.csv`
2. Validar esquema y tipos
3. Transformar (agregar timestamp, calcular m√©tricas)
4. Escribir a `data/processed/` o conectar a DB

In [None]:
# üìö CONCEPTO: Librer√≠as fundamentales de Data Engineering
# - pandas: La librer√≠a est√°ndar de facto para ETL en Python. Ofrece DataFrames
#   (estructuras tabulares con √≠ndices, tipos, operaciones vectorizadas).
# - pathlib.Path: Manejo de rutas de archivos independiente del SO (Windows/Linux).
#   Preferir siempre Path sobre strings para rutas.
# - src.utils.paths: M√≥dulo custom con constantes DATA_RAW, DATA_PROCESSED (evita hardcodear paths).
# - src.utils.logging: Wrapper sobre logging est√°ndar de Python para trazabilidad.

# üí° INTERPRETACI√ìN: ¬øPor qu√© usar logging en ETL?
# En producci√≥n, los procesos ETL se ejecutan desatendidos (cron, Airflow, etc.).
# Logging permite:
# - Auditor√≠a: saber qu√© registros se procesaron, cu√°ndo, errores.
# - Debugging: ante fallas, revisar logs en lugar de ejecutar manualmente.
# - Compliance: en industrias reguladas (finanzas, salud), logs son evidencia.

# üîç T√âCNICA: ensure_dirs()
# Esta funci√≥n crea las carpetas data/processed/ si no existen (equivalente a mkdir -p).
# Evita errores FileNotFoundError al escribir archivos.

import pandas as pd
from pathlib import Path
from src.utils.paths import DATA_RAW, DATA_PROCESSED, ensure_dirs
from src.utils.logging import get_logger

logger = get_logger('DE-01')
ensure_dirs()
logger.info('Iniciando ingesta batch de inventario')
print('‚úÖ Librer√≠as cargadas y rutas preparadas')


2025-12-01 13:12:11,959 - DE-01 - INFO - Iniciando ingesta batch de inventario


‚úÖ Librer√≠as cargadas y rutas preparadas


In [None]:
# üìö CONCEPTO: Lectura de CSV como primera etapa ETL
# pd.read_csv() es el m√©todo m√°s com√∫n de ingesta desde archivos planos.
# Par√°metros clave (para producci√≥n):
# - parse_dates=['date_column']: convierte autom√°ticamente a datetime64
# - dtype={'col': str}: fuerza tipos (evita inferencia err√≥nea)
# - na_values=['NULL', 'N/A']: valores a considerar como NaN
# - encoding='utf-8': evita problemas con caracteres especiales

# üîç T√âCNICA: Validaci√≥n b√°sica con .info()
# Siempre revisar:
# - N√∫mero de registros (len(df))
# - Tipos de datos (int64, float64, object para strings)
# - Valores nulos (non-null count)
# Si los tipos no son los esperados, corregir antes de transformaciones.

# ‚ö†Ô∏è SUPUESTO: CSV bien formado
# Este c√≥digo asume que:
# - El CSV tiene encabezados en la primera fila
# - No hay l√≠neas corruptas o con delimitadores inconsistentes
# - El archivo existe en data/raw/inventory.csv
# En producci√≥n, agregar manejo de excepciones (try/except FileNotFoundError).

# Leer CSV de inventario
df = pd.read_csv(DATA_RAW / 'inventory.csv')
logger.info(f'Registros cargados: {len(df)}')
print(df.head())
print(df.info())


2025-12-01 13:12:22,197 - DE-01 - INFO - Registros cargados: 3000


  location_id        sku  on_hand
0     LOC-001  SKU-00087       86
1     LOC-001  SKU-00145       38
2     LOC-001  SKU-00163       53
3     LOC-001  SKU-00024      108
4     LOC-001  SKU-00056       10
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3000 entries, 0 to 2999
Data columns (total 3 columns):
 #   Column       Non-Null Count  Dtype 
---  ------       --------------  ----- 
 0   location_id  3000 non-null   object
 1   sku          3000 non-null   object
 2   on_hand      3000 non-null   int64 
dtypes: int64(1), object(2)
memory usage: 70.4+ KB
None


In [None]:
# üìö CONCEPTO: Transformaciones en ETL - Enriquecimiento de datos
# Fase "T" (Transform) del ETL: agregar columnas calculadas y metadatos.

# üí° INTERPRETACI√ìN: ¬øPor qu√© agregar ingestion_date?
# - Trazabilidad: saber cu√°ndo se carg√≥ cada registro (√∫til para auditor√≠as).
# - Idempotencia: si reprocesas el mismo d√≠a, puedes identificar duplicados.
# - Incremental loads: en pipelines avanzados, esta columna permite cargas diferenciales.

# üîç T√âCNICA: pd.Timestamp.now()
# Captura la fecha/hora actual en formato datetime64[ns] (nanosegundos).
# En producci√≥n distribuida (ej: Spark), usar una constante fija por ejecuci√≥n
# para que todos los registros tengan el mismo timestamp.

# ‚ö†Ô∏è SUPUESTO: Precio unitario constante (mock)
# En este ejemplo, multiplicamos on_hand * 10 para simular valor.
# En producci√≥n:
# - JOIN con tabla maestra de productos para obtener precio real.
# - Considerar precios hist√≥ricos (SCD Type 2) si el precio cambia.

# Transformaciones b√°sicas
df['ingestion_date'] = pd.Timestamp.now()
df['on_hand_value'] = df['on_hand'] * 10  # mock: precio unitario
print(df[['location_id','sku','on_hand','on_hand_value']].head())


  location_id        sku  on_hand  on_hand_value
0     LOC-001  SKU-00087       86            860
1     LOC-001  SKU-00145       38            380
2     LOC-001  SKU-00163       53            530
3     LOC-001  SKU-00024      108           1080
4     LOC-001  SKU-00056       10            100


In [None]:
# üìö CONCEPTO: Escritura en formato columnar Parquet
# Parquet es el est√°ndar en data warehouses modernos (vs CSV/JSON):
# - Compresi√≥n: reduce tama√±o 5-10x (usa snappy o gzip)
# - Velocidad: queries 10-100x m√°s r√°pidos (columnar, predicate pushdown)
# - Tipos: preserva datetime, int64, float64 (CSV convierte todo a string)
# - Particionamiento: soporta partitions por fecha (ej: year=2025/month=01/)

# üí° INTERPRETACI√ìN: ¬øPor qu√© Parquet en la capa "processed"?
# - Capa raw: CSV (tal cual viene del source)
# - Capa processed: Parquet (limpio, tipado, enriquecido)
# - Capa consumption: Modelos dimensionales, agregados

# üîç T√âCNICA: index=False
# Evita escribir el √≠ndice num√©rico de pandas como columna.
# En general, los √≠ndices de pandas no se deben persistir (usar IDs de negocio).

# üéØ APLICACI√ìN: Pr√≥ximos pasos en producci√≥n
# - Particionamiento por fecha: df.to_parquet(path, partition_cols=['date'])
# - Conexi√≥n a DB: usar sqlalchemy + pd.to_sql() para escribir a PostgreSQL/SQL Server
# - Orquestaci√≥n: envolver este script en task de Airflow/Prefect con retries y alertas

# Escribir a processed
output = DATA_PROCESSED / 'inventory_fact.parquet'
df.to_parquet(output, index=False)
logger.info(f'Datos escritos en {output}')
print('‚úÖ Ingesta completada')


2025-12-01 13:12:42,190 - DE-01 - INFO - Datos escritos en F:\GitHub\supply-chain-data-notebooks\data\processed\inventory_fact.parquet


‚úÖ Ingesta completada
