## Taller 4
## Cargar CSV a MySQL

In [9]:
import os
import pandas as pd
from sqlalchemy import create_engine, text
from dotenv import load_dotenv

load_dotenv()

DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT")
DB_NAME = os.getenv("DB_NAME")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")

engine = create_engine(f"mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}")

ddl="""
CREATE TABLE IF NOT EXISTS ventas (
    venta_id INT AUTO_INCREMENT PRIMARY KEY,
    cliente VARCHAR(100),
    producto VARCHAR(100),
    categoria VARCHAR(100),
    precio_unitario DECIMAL(10, 2),
    unidades INT,
    fecha DATE
);
"""

with engine.connect() as conn:
    conn.execute(text(ddl))
    conn.commit()

print("Tabla creada exitosamente.")

Tabla creada exitosamente.


In [10]:
# Lee CSV
df = pd.read_csv("ventas.csv")

# Casts / limpieza
df["precio_unitario"] = pd.to_numeric(df["precio_unitario"], errors="coerce")
df["unidades"] = pd.to_numeric(df["unidades"], errors="coerce")
df["fecha"] = pd.to_datetime(df["fecha"], errors="coerce").dt.date

In [11]:
# Carga inicial: reemplazar contenido de la tabla
with engine.begin() as conn:
    conn.execute(text("TRUNCATE TABLE ventas"))
df.to_sql("ventas", con=engine, if_exists="append", index=False, method="multi", chunksize=1000)

print(f"✅ Carga inicial completada. Registros: {len(df)}")

✅ Carga inicial completada. Registros: 20


In [13]:
# Carga incremental 
from sqlalchemy import text

df_inc = pd.read_csv("ventas_incrementales.csv")
df_inc["precio_unitario"] = pd.to_numeric(df_inc["precio_unitario"], errors="coerce")
df_inc["unidades"] = pd.to_numeric(df_inc["unidades"], errors="coerce")
df_inc["fecha"] = pd.to_datetime(df_inc["fecha"], errors="coerce").dt.date

# Convertimos a lista de dicts para ejecuciones masivas
records = df_inc.to_dict(orient="records")

upsert_sql = text("""
INSERT INTO ventas (venta_id, cliente, producto, categoria, precio_unitario, unidades, fecha)
VALUES (:venta_id, :cliente, :producto, :categoria, :precio_unitario, :unidades, :fecha)
ON DUPLICATE KEY UPDATE
  cliente=VALUES(cliente),
  producto=VALUES(producto),
  categoria=VALUES(categoria),
  precio_unitario=VALUES(precio_unitario),
  unidades=VALUES(unidades),
  fecha=VALUES(fecha);
""" )
# Inserción por lotes (chunks) para eficiencia
batch = 1000
with engine.begin() as conn:
    for i in range(0, len(records), batch):
        conn.execute(upsert_sql, records[i:i+batch])

print(f"🔁 UPSERT aplicado. Filas procesadas: {len(df_inc)}")


🔁 UPSERT aplicado. Filas procesadas: 4


In [15]:
# Muestra primeras y últimas filas
all_rows = pd.read_sql("SELECT * FROM ventas ORDER BY venta_id", con=engine)
print(all_rows.head(10))
print(all_rows.tail(10))
print("Total registros:", len(all_rows))

# Top 5 productos por ventas (precio * unidades)
top5 = pd.read_sql("""
SELECT producto, SUM(precio_unitario * unidades) AS total
FROM ventas
GROUP BY producto
ORDER BY total DESC
LIMIT 5;
""", con=engine)
print("Top 5 productos por total vendido:")
print(top5)

   venta_id cliente     producto    categoria  precio_unitario  unidades  \
0         1     Ana       Laptop  Electrónica           2500.0         1   
1         2    Luis     Teléfono  Electrónica           1200.0         2   
2         3   Cesar       Tablet  Electrónica            800.0         1   
3         4   María    Audífonos   Accesorios            150.0         3   
4         5   Pedro      Monitor  Electrónica            650.0         1   
5         6   Laura      Teclado   Accesorios            100.0         1   
6         7   Diego        Mouse   Accesorios             50.0         2   
7         8     Ana  Base Laptop   Accesorios             45.0         1   
8         9    Luis    Disco SSD  Electrónica            300.0         1   
9        10   Cesar       Webcam   Accesorios             80.0         1   

        fecha  
0  2023-01-01  
1  2023-01-02  
2  2023-01-02  
3  2023-01-03  
4  2023-01-15  
5  2023-01-05  
6  2023-01-06  
7  2023-01-06  
8  2023-01-07  
9  