# Ingestión de Datos - Ventas

Este notebook implementa la ingesta batch de datos de ventas con:
- Trazabilidad (_ingest_ts, _source_file, _batch_id)
- Persistencia en Parquet y SQLite
- Cuarentena para filas inválidas

In [None]:
import pandas as pd
import sqlite3
from datetime import datetime
from pathlib import Path
import uuid
import pyarrow as pa
import pyarrow.parquet as pq

# Configuración de rutas
PROJECT_ROOT = Path().resolve().parent
DATA_DIR = PROJECT_ROOT / "data" / "drops"
OUTPUT_DIR = PROJECT_ROOT / "output"
PARQUET_DIR = OUTPUT_DIR / "parquet"
QUALITY_DIR = OUTPUT_DIR / "quality"

# Crear directorios si no existen
for dir_path in [OUTPUT_DIR, PARQUET_DIR, QUALITY_DIR]:
    dir_path.mkdir(parents=True, exist_ok=True)

# Conexión a SQLite
DB_PATH = OUTPUT_DIR / "ventas.db"
conn = sqlite3.connect(DB_PATH)

In [None]:
# Función para procesar un archivo
def process_file(file_path):
    # Generar batch_id único
    batch_id = str(uuid.uuid4())
    ingest_ts = datetime.now().isoformat()
    
    # Leer CSV
    df = pd.read_csv(file_path)
    
    # Añadir campos de trazabilidad
    df['_ingest_ts'] = ingest_ts
    df['_source_file'] = str(file_path.name)
    df['_batch_id'] = batch_id
    
    return df

# Función para validar una fila
def validate_row(row):
    try:
        # Validar tipos de datos
        fecha = pd.to_datetime(row['fecha'])
        unidades = float(row['unidades'])
        precio = float(row['precio_unitario'])
        
        # Validar rangos
        if unidades <= 0:
            return False, "unidades debe ser positivo"
        if precio <= 0:
            return False, "precio debe ser positivo"
            
        return True, None
    except Exception as e:
        return False, str(e)

# Función para separar filas válidas e inválidas
def split_valid_invalid(df):
    valid_rows = []
    invalid_rows = []
    
    for idx, row in df.iterrows():
        is_valid, reason = validate_row(row)
        if is_valid:
            valid_rows.append(row)
        else:
            invalid_row = row.copy()
            invalid_row['_reason'] = reason
            invalid_rows.append(invalid_row)
    
    valid_df = pd.DataFrame(valid_rows) if valid_rows else pd.DataFrame(columns=df.columns)
    invalid_df = pd.DataFrame(invalid_rows) if invalid_rows else pd.DataFrame(columns=list(df.columns) + ['_reason'])
    
    return valid_df, invalid_df

In [None]:
# Procesar todos los archivos CSV en el directorio de datos
for file_path in DATA_DIR.glob("*.csv"):
    print(f"Procesando archivo: {file_path.name}")
    
    # Leer y procesar archivo
    df = process_file(file_path)
    
    # Separar filas válidas e inválidas
    valid_df, invalid_df = split_valid_invalid(df)
    
    # Guardar datos válidos en Parquet
    if not valid_df.empty:
        # Particionar por fecha
        valid_df['year'] = pd.to_datetime(valid_df['fecha']).dt.year
        valid_df['month'] = pd.to_datetime(valid_df['fecha']).dt.month
        
        table = pa.Table.from_pandas(valid_df)
        pq.write_to_dataset(
            table,
            root_path=str(PARQUET_DIR / 'ventas'),
            partition_cols=['year', 'month']
        )
        
        # Guardar en SQLite
        valid_df.to_sql('raw_ventas', conn, if_exists='append', index=False)
    
    # Guardar datos inválidos
    if not invalid_df.empty:
        invalid_df.to_csv(
            QUALITY_DIR / f"ventas_invalidas_{df['_batch_id'].iloc[0]}.csv",
            index=False
        )
        invalid_df.to_sql('quarantine_ventas', conn, if_exists='append', index=False)
    
    print(f"Filas válidas: {len(valid_df)}, Filas inválidas: {len(invalid_df)}")

conn.close()