# Análisis de Datos - Panadería Salvador

## 1. Carga inicial de datos

En este primer notebook del proyecto, vamos a realizar la **carga de datos brutos** de la Panadería Salvador. El objetivo es importar los archivos originales (ventas, productos, clientes, etc.), realizar una primera inspección básica y dejar los datos listos para el análisis exploratorio y el modelado posterior.

**Pasos de este notebook:**
- Descripción breve de los datos disponibles.
- Carga de los archivos originales al entorno de trabajo.
- Comprobación inicial de la integridad y estructura de los datos.

---

> **Nota:** Todos los análisis, visualizaciones y modelos posteriores del proyecto partirán de esta carga inicial.


### ¿Por qué usamos SQLAlchemy y Polars?

- **SQLAlchemy**: Permite conectar fácilmente a varias bases de datos y es más flexible y profesional que `mysql-connector-python`.
- **Polars**: Es mucho más rápido y eficiente que pandas para el análisis y procesamiento de grandes volúmenes de datos.


In [73]:
from sqlalchemy import create_engine, text, Table, MetaData, Column, Integer, String, Float, Date
from sqlalchemy.exc import SQLAlchemyError
import polars as pl
from dotenv import load_dotenv
import os

load_dotenv()

# --- Credenciales ---
user = os.getenv("DB_USER")
host = os.getenv("DB_HOST")
port = int(os.getenv("DB_PORT", "3306"))
password = os.getenv("DB_PASSWORD")
nombre_base_datos = os.getenv("DB_NAME")

# --- Motor SQL (con timeouts y ping) ---
if 'engine' in globals():
    try:
        engine.dispose()
    except Exception:
        pass

engine = create_engine(
    f"mysql+pymysql://{user}:{password}@{host}:{port}/{nombre_base_datos}",
    pool_pre_ping=True,
    pool_recycle=1800,
    pool_size=5,
    max_overflow=10,
    connect_args={"connect_timeout": 10, "read_timeout": 120, "write_timeout": 120},
)

# Sanity check y “seguridad” (siempre se cae)
with engine.begin() as con:
    con.execute(text("CREATE SCHEMA IF NOT EXISTS sandbox;"))
    con.execute(text("SET time_zone = 'Europe/Madrid';"))
    con.execute(text("SET SESSION sql_mode = CONCAT(@@sql_mode, ',ONLY_FULL_GROUP_BY');"))

with engine.connect() as con:
    print("Conexión OK:", con.execute(text("SELECT 1;")).fetchone())


Conexión OK: (1,)


In [74]:
import os  

DATA_RAW_DIR = r"d:\PersonalProjects\Panadería Datathon\data\raw"

# He usado fastexcel para leer archivos excel sin usar Pandas (por probar)
df_ventas = pl.read_excel(os.path.join(DATA_RAW_DIR, "ArticulosPanaderia.xlsx"))
df_calendario = pl.read_excel(os.path.join(DATA_RAW_DIR, "Calendario.xlsx"))
df_pedidos = pl.read_excel(os.path.join(DATA_RAW_DIR, "CantidadPedida.xlsx"))

In [75]:
# ✔ Asegurar nombres esperados (renombra si tus columnas difieren)
# Ventas: FAMILIA, Tipo, FechaVenta, HoraVenta, Articulo, Cantidad, Precio, Importe
expected_v_cols = {"FAMILIA","Tipo","FechaVenta","HoraVenta","Articulo","Cantidad","Precio","Importe"}
if set(df_ventas.columns) != expected_v_cols:
    # ejemplo de renombres; ajusta si tus headers son distintos
    ren = {}
    for c in df_ventas.columns:
        if c.lower() == "familia": ren[c] = "FAMILIA"
        elif c.lower() == "fecha" or c.lower()=="fechaventa": ren[c] = "FechaVenta"
        elif c.lower() == "hora" or c.lower()=="horaventa": ren[c] = "HoraVenta"
        elif c.lower() == "articulo": ren[c] = "Articulo"
        elif c.lower() == "cantidad": ren[c] = "Cantidad"
        elif c.lower() == "precio": ren[c] = "Precio"
        elif c.lower() == "importe": ren[c] = "Importe"
        elif c.lower() == "tipo": ren[c] = "Tipo"
    df_ventas = df_ventas.rename(ren)

In [76]:
# --- Normalización robusta de tipos (Polars) ---

# Ventas: FAMILIA, Tipo, FechaVenta, HoraVenta, Articulo, Cantidad, Precio, Importe
df_ventas = (
    df_ventas
    .with_columns([
        # pasa a texto y luego parsea -> funciona si venía ya como date o como string
        pl.col("FechaVenta").cast(pl.Utf8, strict=False).str.strptime(pl.Datetime, strict=False).alias("FechaVenta"),
        pl.col("HoraVenta").cast(pl.Int64, strict=False).alias("HoraVenta"),
        pl.col("Articulo").cast(pl.Int64, strict=False).alias("Articulo"),
        pl.col("Cantidad").cast(pl.Float64, strict=False).fill_null(0).alias("Cantidad"),
        pl.col("Precio").cast(pl.Float64, strict=False).fill_null(0).alias("Precio"),
        pl.col("Importe").cast(pl.Float64, strict=False).fill_null(0).alias("Importe"),
        pl.col("FAMILIA").cast(pl.Utf8, strict=False).alias("FAMILIA"),
        pl.col("Tipo").cast(pl.Utf8, strict=False).alias("Tipo"),
    ])
)

In [77]:
# Calendario: Fecha (date), Festivo (texto)
df_calendario = (
    df_calendario
    .with_columns([
        pl.col("Fecha").cast(pl.Utf8, strict=False).str.strptime(pl.Date, strict=False).alias("Fecha"),
        pl.col("Festivo").cast(pl.Utf8, strict=False).alias("Festivo"),
    ])
)

In [78]:
# Pedidos (si lo usas más tarde): Fecha, Articulo, Cantidad, Precio, Importe
df_pedidos = (
    df_pedidos
    .with_columns([
        pl.col("Fecha").cast(pl.Utf8, strict=False).str.strptime(pl.Date, strict=False).alias("Fecha"),
        pl.col("Articulo").cast(pl.Int64, strict=False).alias("Articulo"),
        pl.col("Cantidad").cast(pl.Float64, strict=False).fill_null(0).alias("Cantidad"),
        pl.col("Precio").cast(pl.Float64, strict=False).fill_null(0).alias("Precio"),
        pl.col("Importe").cast(pl.Float64, strict=False).fill_null(0).alias("Importe"),
        pl.when(pl.col("Familia").is_not_null()).then(pl.col("Familia")).otherwise(pl.lit("")).alias("Familia"),
        pl.when(pl.col("Tipo").is_not_null()).then(pl.col("Tipo")).otherwise(pl.lit("VENTA")).alias("Tipo"),
    ])
)

In [79]:
# Compruebo los dtypes para confirmar
print("dtypes ventas:", df_ventas.dtypes)
print("dtypes calendario:", df_calendario.dtypes)
print("dtypes pedidos:", df_pedidos.dtypes)

dtypes ventas: [String, String, Datetime(time_unit='us', time_zone=None), Int64, Int64, Float64, Float64, Float64]
dtypes calendario: [Date, String]
dtypes pedidos: [String, String, Date, Int64, Float64, Float64, Float64]


In [80]:
from sqlalchemy import Table, MetaData, Column, Integer, String, Float, Date

metadata = MetaData()

rem_ventas = Table(
    "rem_ventas", metadata,
    Column("FAMILIA", String(255)),
    Column("Tipo", String(255)),
    Column("FechaVenta", Date),
    Column("HoraVenta", Integer),
    Column("Articulo", String(255)),  # dejamos String para no chocar si ya existe
    Column("Cantidad", Float),
    Column("Precio", Float),
    Column("Importe", Float),
    schema="sandbox"
)

rem_calendario = Table(
    "rem_calendario", metadata,
    Column("Fecha", Date),
    Column("Festivo", String(255)),
    schema="sandbox"
)

rem_pedidos = Table(
    "rem_pedidos", metadata,
    Column("Familia", String(255)),
    Column("Tipo", String(255)),
    Column("Fecha", Date),
    Column("Articulo", String(255)),
    Column("Cantidad", Float),
    Column("Precio", Float),
    Column("Importe", Float),
    schema="sandbox"
)

metadata.create_all(engine)
print("Tablas base listas.")


Tablas base listas.


In [81]:
from sqlalchemy import insert, text

CHUNK_SIZE = 1000

# Función para insertar "partes" más livianas, si no me daba error
def insertar_chunks(conn, tabla, df_pl: pl.DataFrame, nombre_tabla: str, chunk_size=CHUNK_SIZE):
    registros = df_pl.to_dicts()
    total = len(registros)
    for i in range(0, total, chunk_size):
        lote = registros[i:i+chunk_size]
        conn.execute(insert(tabla), lote)
        print(f"Chunk {i//chunk_size+1} de {nombre_tabla} ({len(lote)} filas) insertado")

# Evita duplicados: vacía y carga UNA vez
with engine.begin() as con:
    con.execute(text("TRUNCATE TABLE sandbox.rem_ventas;"))
    con.execute(text("TRUNCATE TABLE sandbox.rem_calendario;"))
    con.execute(text("TRUNCATE TABLE sandbox.rem_pedidos;"))

# Solo filas únicas en ventas (por si el Excel trae duplicados reales)
subset_cols = ["FAMILIA","Tipo","FechaVenta","HoraVenta","Articulo","Cantidad","Precio","Importe"]
df_ventas_unique = df_ventas.unique(subset=subset_cols, keep="first")

with engine.begin() as con:
    insertar_chunks(con, rem_ventas, df_ventas_unique, "rem_ventas")
    insertar_chunks(con, rem_calendario, df_calendario, "rem_calendario")
    insertar_chunks(con, rem_pedidos, df_pedidos, "rem_pedidos")

print("Inserciones completadas.")


Chunk 1 de rem_ventas (1000 filas) insertado
Chunk 2 de rem_ventas (1000 filas) insertado
Chunk 3 de rem_ventas (1000 filas) insertado
Chunk 4 de rem_ventas (1000 filas) insertado
Chunk 5 de rem_ventas (1000 filas) insertado
Chunk 6 de rem_ventas (1000 filas) insertado
Chunk 7 de rem_ventas (1000 filas) insertado
Chunk 8 de rem_ventas (1000 filas) insertado
Chunk 9 de rem_ventas (1000 filas) insertado
Chunk 10 de rem_ventas (1000 filas) insertado
Chunk 11 de rem_ventas (1000 filas) insertado
Chunk 12 de rem_ventas (1000 filas) insertado
Chunk 13 de rem_ventas (1000 filas) insertado
Chunk 14 de rem_ventas (1000 filas) insertado
Chunk 15 de rem_ventas (1000 filas) insertado
Chunk 16 de rem_ventas (1000 filas) insertado
Chunk 17 de rem_ventas (1000 filas) insertado
Chunk 18 de rem_ventas (1000 filas) insertado
Chunk 19 de rem_ventas (1000 filas) insertado
Chunk 20 de rem_ventas (1000 filas) insertado
Chunk 21 de rem_ventas (1000 filas) insertado
Chunk 22 de rem_ventas (1000 filas) inserta

### Inserción eficiente de datos: aprendiendo y experimentando

En este  proyecto, he probado distintas formas de cargar grandes volúmenes de datos (como la tabla de ventas) en MySQL.  
Finalmente, me he decantado por la inserción en lotes (*chunks*) usando Polars y SQLAlchemy, por varias razones:

- **Simplicidad y limpieza**: El código es mucho más corto y claro, sin necesidad de recorrer filas una a una ni construir registros manualmente.
- **Rendimiento**: La inserción por lotes evita errores de conexión y permite que la carga sea sorprendentemente rápida (la tabla ventas, que es la mayor, se ha cargado en solo 1 minuto y 40 segundos).
- **Aprendizaje**: Creo que he conseguido mejorar el rendimiento al notebook original, trabajando de forma más eficiente con grandes conjuntos de datos y bases SQL en Python, experimentando con distintos tamaños de lote (*chunk size* o muestras más pequeñas).

En resumen, experimentar con Polars y SQLAlchemy, junto con la inserción en *chunks*, me ha permitido cargar los datos de manera profesional, rápida y con un código más fácil de mantener.


#### Nota: las consultas siguientes están preparadas para lanzarse con los datos cargados en la bbdd data y con los nombres iniciales, si se lanzan con usuario1 debe adaptarse

#### Voy a intentar hacer las consultas con SQLAlchemy


In [82]:
from sqlalchemy import text

START_DATE = "2017-01-01"
END_DATE   = "2023-12-31"

with engine.begin() as con:
    con.execute(text("SET SESSION cte_max_recursion_depth = 5000;"))
    con.execute(text("DROP TABLE IF EXISTS sandbox.calendario_dias;"))
    con.execute(text(f"""
        CREATE TABLE sandbox.calendario_dias AS
        WITH RECURSIVE cte_cal AS (
            SELECT DATE('{START_DATE}') AS d
            UNION ALL
            SELECT DATE_ADD(d, INTERVAL 1 DAY)
            FROM cte_cal
            WHERE d < DATE('{END_DATE}')
        )
        SELECT
            d                                        AS fecha,
            YEAR(d)                                  AS fx_anno,
            MONTH(d)                                 AS fx_mes,
            DAY(d)                                   AS fx_day,
            DATE_FORMAT(d, '%Y%m')                   AS fx_anno_mes,
            DATE_FORMAT(d, '%x-%v')                  AS semana
        FROM cte_cal;
    """))

    con.execute(text("DROP TABLE IF EXISTS sandbox.rem_calendario_uniq;"))
    con.execute(text("""
        CREATE TABLE sandbox.rem_calendario_uniq AS
        SELECT 
            CAST(Fecha AS DATE) AS fecha,
            MAX(Festivo)        AS festivo
        FROM sandbox.rem_calendario
        GROUP BY CAST(Fecha AS DATE);
    """))

    con.execute(text("DROP TABLE IF EXISTS sandbox.calendario_completo;"))
    con.execute(text("""
        CREATE TABLE sandbox.calendario_completo AS
        SELECT
            b.fecha, b.fx_anno, b.fx_mes, b.fx_day, b.fx_anno_mes, b.semana,
            u.festivo
        FROM sandbox.calendario_dias b
        LEFT JOIN sandbox.rem_calendario_uniq u
          ON b.fecha = u.fecha;
    """))

    try:
        con.execute(text("ALTER TABLE sandbox.calendario_completo ADD UNIQUE KEY uq_calendario_fecha (fecha);"))
    except Exception as e:
        print("Aviso índice único:", e)

# Comprobación de unicidad
with engine.connect() as con:
    filas, fechas = con.execute(text("""
        SELECT COUNT(*) AS filas, COUNT(DISTINCT fecha) AS fechas FROM sandbox.calendario_completo;
    """)).fetchone()
    print("calendario_completo -> filas:", filas, "| fechas únicas:", fechas)


calendario_completo -> filas: 2556 | fechas únicas: 2556


In [83]:
from sqlalchemy import text

with engine.begin() as con:
    con.execute(text("DROP TABLE IF EXISTS sandbox.articulos_top;"))
    con.execute(text("""
        CREATE TABLE sandbox.articulos_top AS
        SELECT
            t.Articulo,
            t.FAMILIA,
            t.importe_total,
            ROW_NUMBER() OVER (PARTITION BY t.FAMILIA ORDER BY t.importe_total DESC) AS orden
        FROM (
            SELECT Articulo, FAMILIA, SUM(Importe) AS importe_total
            FROM sandbox.rem_ventas
            WHERE FechaVenta >= '2021-05-01' AND Tipo='VENTA'
            GROUP BY 1,2
        ) t;
    """))
print("articulos_top listo.")


articulos_top listo.


In [84]:
from sqlalchemy import text

with engine.begin() as con:
    # Base agregada diaria SOLO VENTA y SOLO cantidades > 0
    con.execute(text("DROP TABLE IF EXISTS sandbox.tmp_ventas_agg;"))
    con.execute(text("""
        CREATE TABLE sandbox.tmp_ventas_agg AS
        SELECT
            DATE(FechaVenta) AS fecha,
            Articulo,
            MAX(FAMILIA)     AS familia,
            'VENTA'          AS tipo,
            SUM(CASE WHEN Cantidad > 0 THEN Cantidad ELSE 0 END) AS cantidad,
            SUM(CASE WHEN Cantidad > 0 THEN Importe  ELSE 0 END) AS importe,
            CASE
              WHEN SUM(CASE WHEN Cantidad > 0 THEN Cantidad ELSE 0 END) = 0 THEN NULL
              ELSE SUM(CASE WHEN Cantidad > 0 THEN Precio*Cantidad ELSE 0 END)
                   / SUM(CASE WHEN Cantidad > 0 THEN Cantidad         ELSE 0 END)
            END AS precio
        FROM sandbox.rem_ventas
        WHERE Tipo='VENTA'
        GROUP BY 1,2;
    """))

    # Final con calendario + ranking (1:1)
    con.execute(text("DROP TABLE IF EXISTS sandbox.ventas_diarias;"))
    con.execute(text("""
        CREATE TABLE sandbox.ventas_diarias AS
        SELECT
            v.familia, v.tipo, v.fecha, c.festivo,
            v.Articulo, v.precio,
            a.orden AS orden_articulo_familia,
            CASE WHEN v.fecha >= DATE('2021-05-01') THEN 'S' ELSE 'N' END AS in_fecha_estudio,
            v.cantidad, v.importe
        FROM sandbox.tmp_ventas_agg v
        LEFT JOIN sandbox.calendario_completo c ON v.fecha = c.fecha
        LEFT JOIN sandbox.articulos_top a ON v.familia = a.FAMILIA AND v.Articulo = a.Articulo;
    """))

    try:
        con.execute(text("ALTER TABLE sandbox.ventas_diarias ADD UNIQUE KEY uq_ventas_dia_art (fecha, articulo);"))
    except Exception as e:
        print("Aviso índice único:", e)

print("✅ ventas_diarias reconstruida (solo cantidades > 0)")


✅ ventas_diarias reconstruida (solo cantidades > 0)


In [85]:
# Comprobaciones, es la tercera vez que rehago el proyecto por errores en la carga de datos
# Ventas muy altas, precios, importes, etc.

from sqlalchemy import text

with engine.connect() as con:
    # Coincidencia exacta con base (>0) por (fecha, articulo)
    diff = con.execute(text("""
        WITH base AS (
          SELECT DATE(FechaVenta) AS fecha, Articulo,
                 SUM(CASE WHEN Cantidad > 0 THEN Cantidad ELSE 0 END) AS cant_base
          FROM sandbox.rem_ventas
          WHERE Tipo='VENTA'
          GROUP BY 1,2
        ), comp AS (
          SELECT v.fecha, v.articulo, v.cantidad - b.cant_base AS d
          FROM sandbox.ventas_diarias v
          JOIN base b ON v.fecha=b.fecha AND v.articulo=b.articulo
        )
        SELECT SUM(d<>0) FROM comp;
    """)).scalar() or 0
    assert diff == 0, f"Diferencias base(>0) vs ventas_diarias: {diff}"

    # No negativos
    negs = con.execute(text("SELECT SUM(cantidad<0) FROM sandbox.ventas_diarias")).scalar() or 0
    assert negs == 0, "Cantidades negativas en ventas_diarias"

print("Smoke tests OK ✅")


Smoke tests OK ✅


In [86]:
from sqlalchemy import text

with engine.begin() as con:
    # Base agregada diaria SOLO VENTA y SOLO cantidades > 0
    con.execute(text("DROP TABLE IF EXISTS sandbox.tmp_ventas_agg;"))
    con.execute(text("""
        CREATE TABLE sandbox.tmp_ventas_agg AS
        SELECT
            DATE(FechaVenta) AS fecha,
            Articulo,
            MAX(FAMILIA)     AS familia,
            'VENTA'          AS tipo,
            SUM(CASE WHEN Cantidad > 0 THEN Cantidad ELSE 0 END) AS cantidad,
            SUM(CASE WHEN Cantidad > 0 THEN Importe  ELSE 0 END) AS importe,
            CASE
              WHEN SUM(CASE WHEN Cantidad > 0 THEN Cantidad ELSE 0 END) = 0 THEN NULL
              ELSE SUM(CASE WHEN Cantidad > 0 THEN Precio*Cantidad ELSE 0 END)
                   / SUM(CASE WHEN Cantidad > 0 THEN Cantidad         ELSE 0 END)
            END AS precio
        FROM sandbox.rem_ventas
        WHERE Tipo='VENTA'
        GROUP BY 1,2;
    """))

    # Final con calendario + ranking (1:1)
    con.execute(text("DROP TABLE IF EXISTS sandbox.ventas_diarias;"))
    con.execute(text("""
        CREATE TABLE sandbox.ventas_diarias AS
        SELECT
            v.familia, v.tipo, v.fecha, c.festivo,
            v.Articulo, v.precio,
            a.orden AS orden_articulo_familia,
            CASE WHEN v.fecha >= DATE('2021-05-01') THEN 'S' ELSE 'N' END AS in_fecha_estudio,
            v.cantidad, v.importe
        FROM sandbox.tmp_ventas_agg v
        LEFT JOIN sandbox.calendario_completo c ON v.fecha = c.fecha
        LEFT JOIN sandbox.articulos_top a ON v.familia = a.FAMILIA AND v.Articulo = a.Articulo;
    """))

    # Unicidad por (fecha, articulo)
    try:
        con.execute(text("ALTER TABLE sandbox.ventas_diarias ADD UNIQUE KEY uq_ventas_dia_art (fecha, articulo);"))
    except Exception as e:
        print("Aviso (índice único):", e)

print("✅ ventas_diarias creada/actualizada (VENTA y >0)")


✅ ventas_diarias creada/actualizada (VENTA y >0)


In [87]:
from sqlalchemy import text
with engine.connect() as con:
    print(con.execute(text("SELECT 1;")).fetchone())


(1,)


In [88]:
from sqlalchemy import text

with engine.begin() as con:
    con.execute(text("DROP TABLE IF EXISTS sandbox.articulos_top;"))
    con.execute(text("""
        CREATE TABLE sandbox.articulos_top AS
        SELECT
            t.Articulo,
            t.FAMILIA,
            t.importe_total,
            ROW_NUMBER() OVER (PARTITION BY t.FAMILIA ORDER BY t.importe_total DESC) AS orden
        FROM (
            SELECT Articulo, FAMILIA, SUM(Importe) AS importe_total
            FROM sandbox.rem_ventas
            WHERE FechaVenta >= '2021-05-01' AND Tipo='VENTA'
            GROUP BY 1,2
        ) t;
    """))
print("articulos_top listo (VENTA).")


articulos_top listo (VENTA).


In [89]:
from sqlalchemy import text

with engine.begin() as con:
    # Base agregada diaria SOLO VENTA y SOLO cantidades > 0
    con.execute(text("DROP TABLE IF EXISTS sandbox.tmp_ventas_agg;"))
    con.execute(text("""
        CREATE TABLE sandbox.tmp_ventas_agg AS
        SELECT
            DATE(FechaVenta) AS fecha,
            Articulo,
            MAX(FAMILIA)     AS familia,
            'VENTA'          AS tipo,
            SUM(CASE WHEN Cantidad > 0 THEN Cantidad ELSE 0 END) AS cantidad,
            SUM(CASE WHEN Cantidad > 0 THEN Importe  ELSE 0 END) AS importe,
            CASE
              WHEN SUM(CASE WHEN Cantidad > 0 THEN Cantidad ELSE 0 END) = 0 THEN NULL
              ELSE SUM(CASE WHEN Cantidad > 0 THEN Precio*Cantidad ELSE 0 END)
                   / SUM(CASE WHEN Cantidad > 0 THEN Cantidad         ELSE 0 END)
            END AS precio
        FROM sandbox.rem_ventas
        WHERE Tipo='VENTA'
        GROUP BY 1,2;
    """))

    # Final con calendario + ranking (1:1)
    con.execute(text("DROP TABLE IF EXISTS sandbox.ventas_diarias;"))
    con.execute(text("""
        CREATE TABLE sandbox.ventas_diarias AS
        SELECT
            v.familia, v.tipo, v.fecha, c.festivo,
            v.Articulo, v.precio,
            a.orden AS orden_articulo_familia,
            CASE WHEN v.fecha >= DATE('2021-05-01') THEN 'S' ELSE 'N' END AS in_fecha_estudio,
            v.cantidad, v.importe
        FROM sandbox.tmp_ventas_agg v
        LEFT JOIN sandbox.calendario_completo c ON v.fecha = c.fecha
        LEFT JOIN sandbox.articulos_top a ON v.familia = a.FAMILIA AND v.Articulo = a.Articulo;
    """))

    # Unicidad por (fecha, articulo) (si existe, ignora aviso)
    try:
        con.execute(text("ALTER TABLE sandbox.ventas_diarias ADD UNIQUE KEY uq_ventas_dia_art (fecha, articulo);"))
    except Exception as e:
        print("Aviso índice único:", e)

print("ventas_diarias creada/actualizada (VENTA y >0).")


ventas_diarias creada/actualizada (VENTA y >0).


In [90]:
from sqlalchemy import text

with engine.connect() as con:
    # Coincidencia exacta con base (VENTA y >0) por (fecha, articulo)
    diff = con.execute(text("""
        WITH base AS (
          SELECT DATE(FechaVenta) AS fecha, Articulo,
                 SUM(CASE WHEN Cantidad > 0 THEN Cantidad ELSE 0 END) AS cant_base
          FROM sandbox.rem_ventas
          WHERE Tipo='VENTA'
          GROUP BY 1,2
        ), comp AS (
          SELECT v.fecha, v.articulo, v.cantidad - b.cant_base AS d
          FROM sandbox.ventas_diarias v
          JOIN base b ON v.fecha=b.fecha AND v.articulo=b.articulo
        )
        SELECT SUM(d<>0) FROM comp;
    """)).scalar() or 0
    assert diff == 0, f"Diferencias base(>0) vs ventas_diarias: {diff}"

    # Unicidad y no-negativos
    dups = con.execute(text("""
        SELECT COUNT(*) - COUNT(DISTINCT CONCAT_WS('|',fecha,articulo))
        FROM sandbox.ventas_diarias
    """)).scalar() or 0
    assert dups == 0, f"Duplicados en ventas_diarias: {dups}"

    negs = con.execute(text("SELECT SUM(cantidad<0) FROM sandbox.ventas_diarias")).scalar() or 0
    assert negs == 0, "Cantidades negativas en ventas_diarias"

print("Smoke tests OK ✅")


Smoke tests OK ✅


In [91]:
from sqlalchemy import text

with engine.begin() as con:
    # 2.1) Rehacer ranking de artículos con solo VENTA
    con.execute(text("DROP TABLE IF EXISTS sandbox.articulos_top;"))
    con.execute(text("""
        CREATE TABLE sandbox.articulos_top AS
        SELECT
            t.Articulo,
            t.FAMILIA,
            t.importe_total,
            ROW_NUMBER() OVER (PARTITION BY t.FAMILIA ORDER BY t.importe_total DESC) AS orden
        FROM (
            SELECT Articulo, FAMILIA, SUM(Importe) AS importe_total
            FROM sandbox.rem_ventas
            WHERE FechaVenta >= '2021-05-01' AND Tipo='VENTA'
            GROUP BY 1,2
        ) t;
    """))

    # 2.2) Agregación diaria por artículo SOLO VENTA y SOLO cantidades > 0
    con.execute(text("DROP TABLE IF EXISTS sandbox.tmp_ventas_agg;"))
    con.execute(text("""
        CREATE TABLE sandbox.tmp_ventas_agg AS
        SELECT
            DATE(FechaVenta) AS fecha,
            Articulo,
            MAX(FAMILIA)     AS familia,
            'VENTA'          AS tipo,
            SUM(CASE WHEN Cantidad > 0 THEN Cantidad ELSE 0 END) AS cantidad,
            SUM(CASE WHEN Cantidad > 0 THEN Importe  ELSE 0 END) AS importe,
            CASE
              WHEN SUM(CASE WHEN Cantidad > 0 THEN Cantidad ELSE 0 END) = 0 THEN NULL
              ELSE SUM(CASE WHEN Cantidad > 0 THEN Precio*Cantidad ELSE 0 END)
                   / SUM(CASE WHEN Cantidad > 0 THEN Cantidad         ELSE 0 END)
            END AS precio
        FROM sandbox.rem_ventas
        WHERE Tipo='VENTA'
        GROUP BY 1,2;
    """))

    # 2.3) ventas_diarias (join 1:1 con calendario y ranking)
    con.execute(text("DROP TABLE IF EXISTS sandbox.ventas_diarias;"))
    con.execute(text("""
        CREATE TABLE sandbox.ventas_diarias AS
        SELECT
            v.familia,
            v.tipo,
            v.fecha,
            c.festivo,
            v.Articulo,
            v.precio,
            a.orden AS orden_articulo_familia,
            CASE WHEN v.fecha >= DATE('2021-05-01') THEN 'S' ELSE 'N' END AS in_fecha_estudio,
            v.cantidad,
            v.importe
        FROM sandbox.tmp_ventas_agg v
        LEFT JOIN sandbox.calendario_completo c
          ON v.fecha = c.fecha
        LEFT JOIN sandbox.articulos_top a
          ON v.familia = a.FAMILIA AND v.Articulo = a.Articulo;
    """))

    # 2.4) Blindaje (no crítico si falla)
    try:
        con.execute(text("""
            ALTER TABLE sandbox.ventas_diarias
            ADD UNIQUE KEY uq_ventas_dia_art (fecha, articulo);
        """))
    except Exception as e:
        print("Aviso índice único (no crítico):", e)


In [92]:
# CREATE
with engine.begin() as con:
    con.execute(text("""
        CREATE OR REPLACE VIEW sandbox.ventas_diarias_estudio_completo AS
        SELECT *
        FROM sandbox.ventas_diarias
        WHERE tipo='VENTA'
          AND in_fecha_estudio='S'
          AND orden_articulo_familia<=5;
    """))
    con.execute(text("""
        CREATE OR REPLACE VIEW sandbox.ventas_diarias_estudio AS
        SELECT *
        FROM sandbox.ventas_diarias
        WHERE tipo='VENTA'
          AND in_fecha_estudio='S'
          AND fecha < DATE('2023-05-01')
          AND orden_articulo_familia<=5;
    """))

print("Vistas creadas/actualizadas ✅")


Vistas creadas/actualizadas ✅


In [93]:
from sqlalchemy import text

tablas = [
    "sandbox.articulos_top",
    "sandbox.calendario_dias",
    "sandbox.calendario_completo",
    "sandbox.ventas_diarias",
]
vistas = [
    "sandbox.ventas_diarias_estudio_completo",
    "sandbox.ventas_diarias_estudio",
]

with engine.connect() as con:
    print("== Recuento de filas ==")
    for t in tablas:
        n = con.execute(text(f"SELECT COUNT(*) FROM {t};")).scalar()
        print(f"✔ {t}: {n} filas")

    print("\n== Esquema ventas_diarias ==")
    cols = [r[0] for r in con.execute(text("SHOW COLUMNS FROM sandbox.ventas_diarias;")).fetchall()]
    print(cols)

    print("\n== Resumen ventas_diarias ==")
    resumen = con.execute(text("""
        SELECT 
            MIN(fecha) AS min_fecha,
            MAX(fecha) AS max_fecha,
            COUNT(*)   AS filas,
            COUNT(DISTINCT articulo) AS n_articulos,
            SUM(cantidad) AS total_cantidad
        FROM sandbox.ventas_diarias;
    """)).mappings().one()
    print(dict(resumen))

    # --- Smoke tests ---
    print("\n== Smoke tests ==")
    # A) Consistencia exacta con base (VENTA y >0)
    diff = con.execute(text("""
        WITH base AS (
          SELECT DATE(FechaVenta) AS fecha, Articulo,
                 SUM(CASE WHEN Cantidad > 0 THEN Cantidad ELSE 0 END) AS cant_base
          FROM sandbox.rem_ventas
          WHERE Tipo='VENTA'
          GROUP BY 1,2
        ), comp AS (
          SELECT v.fecha, v.articulo, v.cantidad - b.cant_base AS d
          FROM sandbox.ventas_diarias v
          JOIN base b ON v.fecha=b.fecha AND v.articulo=b.articulo
        )
        SELECT COALESCE(SUM(d<>0),0) FROM comp;
    """)).scalar()
    assert diff == 0, f"Diferencias base(>0) vs ventas_diarias: {diff}"
    print("✔ Consistencia con base (VENTA>0)")

    # B) Unicidad (fecha, articulo)
    dups = con.execute(text("""
        SELECT COALESCE(COUNT(*) - COUNT(DISTINCT CONCAT_WS('|',fecha,articulo)),0)
        FROM sandbox.ventas_diarias;
    """)).scalar()
    assert dups == 0, f"Duplicados en ventas_diarias: {dups}"
    print("✔ Unicidad (fecha, articulo)")

    # C) No negativos
    negs = con.execute(text("""
        SELECT COALESCE(SUM(cantidad<0),0) FROM sandbox.ventas_diarias;
    """)).scalar()
    assert negs == 0, "Cantidades negativas en ventas_diarias"
    print("✔ Sin negativos")

    # Vistas: muestra 5 filas
    print("\n== Primeras filas de vistas ==")
    for v in vistas:
        try:
            rows = con.execute(text(f"SELECT * FROM {v} LIMIT 5;")).fetchall()
            print(f"\n{v}:")
            for r in rows:
                print(r)
        except Exception as e:
            print(f"⚠ No se pudo leer {v}: {e}")

print("\n✅ Verificación final completada")


== Recuento de filas ==
✔ sandbox.articulos_top: 170 filas
✔ sandbox.calendario_dias: 2556 filas
✔ sandbox.calendario_completo: 2556 filas
✔ sandbox.ventas_diarias: 140021 filas

== Esquema ventas_diarias ==
['familia', 'tipo', 'fecha', 'festivo', 'Articulo', 'precio', 'orden_articulo_familia', 'in_fecha_estudio', 'cantidad', 'importe']

== Resumen ventas_diarias ==
{'min_fecha': datetime.date(2017, 1, 1), 'max_fecha': datetime.date(2023, 5, 18), 'filas': 140021, 'n_articulos': 172, 'total_cantidad': 10366859.982097466}

== Smoke tests ==
✔ Consistencia con base (VENTA>0)
✔ Unicidad (fecha, articulo)
✔ Sin negativos

== Primeras filas de vistas ==

sandbox.ventas_diarias_estudio_completo:
('PANADERIA', 'VENTA', datetime.date(2021, 5, 1), None, '1043', 2.5959999561309814, 2, 'S', 327.0, 848.8920021057129)
('PANADERIA', 'VENTA', datetime.date(2021, 5, 1), None, '1084', 0.8650000095367432, 4, 'S', 423.0, 365.89500522613525)
('BOLLERIA', 'VENTA', datetime.date(2021, 5, 1), None, '3880', 2.

## Nota sobre la ejecución de consultas SQL y el uso de SQLAlchemy

Durante la preparación y carga de datos, he tenido que **dividir las consultas SQL en varias ejecuciones individuales** en vez de ejecutar todo el bloque de una vez. Esto se debe a que el driver `pymysql` de SQLAlchemy (que conecta con MySQL) no permite ejecutar varias sentencias (por ejemplo, DROP y CREATE) juntas en una sola llamada. Además, algunas sentencias complejas requieren ejecutarse de forma separada para evitar errores de sintaxis y problemas con el formateo de cadenas (especialmente cuando se usan funciones como `date_format` con `%`).

**Sobre el uso de SQLAlchemy:**  
He optado por usar SQLAlchemy por su flexibilidad y porque permite una gestión más profesional y portable de la conexión y las transacciones a la base de datos. Es cierto que, en algunos casos, el conector `mysql-connector-python` podría haber simplificado la ejecución de bloques largos de SQL, pero trabajar con SQLAlchemy me ha permitido aprender y aplicar buenas prácticas de desarrollo en proyectos de análisis de datos con Python.

En resumen, aunque ejecutar las consultas una a una puede parecer menos eficiente al principio, garantiza que cada paso del proceso se controla y documenta correctamente, y me ha servido para entender mejor cómo interactúan Python y SQL en proyectos reales.
