In [1]:
# Cell 1) Instalación de dependencias
# Ejecuta esto UNA VEZ. Si ya las tienes instaladas, salta esta celda.
!pip install mysql-connector-python sqlalchemy pandas



In [3]:
# Cell 2) Imports y configuración de conexión

import os
import pandas as pd
import mysql.connector
from sqlalchemy import create_engine, text
import logging

# Logger
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger("ZooETL")

# Parámetros de conexión (puedes ponerlos en variables de entorno)
DB_HOST = os.getenv("DB_HOST", "127.0.0.1")
DB_PORT = int(os.getenv("DB_PORT", 3306))
DB_USER = os.getenv("DB_USER", "root")
DB_PASS = os.getenv("DB_PASS", "2369")
DB_NAME = os.getenv("DB_NAME", "zoologico_etl")

# Cadena para SQLAlchemy (la usaremos después de crear la BD)
SQLA_URL = f"mysql+mysqlconnector://{DB_USER}:{DB_PASS}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

def get_mysql_conn(init=False):
    """
    Si init=True conecta SIN database para crearla.
    Si init=False conecta ya a DB_NAME.
    """
    cfg = dict(host=DB_HOST, port=DB_PORT, user=DB_USER, password=DB_PASS)
    if not init:
        cfg["database"] = DB_NAME
    return mysql.connector.connect(**cfg)

# engine de SQLAlchemy (se activará después de crear la BD)
engine = None

In [4]:
# Cell 3) Creación de la base de datos

# 1) Conectamos sin especificar base de datos
cnx = get_mysql_conn(init=True)
cursor = cnx.cursor()
cursor.execute(f"""
    CREATE DATABASE IF NOT EXISTS `{DB_NAME}`
    DEFAULT CHARACTER SET utf8mb4
    COLLATE utf8mb4_unicode_ci;
""")
cnx.commit()
cursor.close()
cnx.close()
logger.info("Base de datos '%s' creada o ya existente.", DB_NAME)

# 2) Ahora sí creamos el engine apuntando a esa base
from sqlalchemy import create_engine
engine = create_engine(SQLA_URL, echo=False)

2025-04-27 01:01:33,616 INFO Base de datos 'zoologico_etl' creada o ya existente.


In [5]:
# Cell 4) Creación de las tablas zoologicos y animales

DDL_ZOO = """
CREATE TABLE IF NOT EXISTS zoologicos (
    id INT AUTO_INCREMENT PRIMARY KEY,
    nombre VARCHAR(100)   NOT NULL,
    ciudad VARCHAR(50)    NOT NULL,
    pais VARCHAR(50)      NOT NULL,
    tamanio_m2 FLOAT      NOT NULL CHECK(tamanio_m2>=0),
    especies_total INT    NOT NULL CHECK(especies_total>=0),
    encargado VARCHAR(100) NOT NULL,
    presupuesto_anual DECIMAL(15,2) NOT NULL CHECK(presupuesto_anual>=0),
    UNIQUE KEY ux_zoo (nombre, ciudad, pais)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
"""
DDL_ANIMAL = """
CREATE TABLE IF NOT EXISTS animales (
    id INT AUTO_INCREMENT PRIMARY KEY,
    nombre_comun        VARCHAR(100) NOT NULL,
    nombre_cientifico   VARCHAR(100) NOT NULL,
    familia             VARCHAR(50)  NOT NULL,
    especie             VARCHAR(50)  NOT NULL,
    estatus_conservacion VARCHAR(20) NOT NULL,
    numero_identificacion VARCHAR(20) NOT NULL UNIQUE,
    sexo ENUM('M','H')  NOT NULL,
    anio_nacimiento INT NOT NULL CHECK(anio_nacimiento>1800),
    pais_origen VARCHAR(50) NOT NULL,
    continente_origen VARCHAR(50) NOT NULL,
    zoologico_id INT,
    FOREIGN KEY (zoologico_id)
      REFERENCES zoologicos(id)
      ON DELETE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
"""
with engine.begin() as conn:
    conn.execute(text(DDL_ZOO))
    conn.execute(text(DDL_ANIMAL))
logger.info("Tablas creadas/verificadas correctamente.")

2025-04-27 01:01:46,023 INFO Tablas creadas/verificadas correctamente.


In [6]:
# Cell 5) Preparar datos de ejemplo (CSV) y carga en DataFrames

# Creamos los CSV si no existen
if not os.path.exists("zoos.csv"):
    with open("zoos.csv", "w", encoding="utf8") as f:
        f.write("""nombre,ciudad,pais,tamanio_m2,especies_total,encargado,presupuesto_anual
Zoo Central,Madrid,España,120000,150,María López,2500000.00
Safari Park,San Diego,EEUU,450000,200,John Smith,5000000.00
""")
    logger.info("Archivo zoos.csv creado.")

if not os.path.exists("animales.csv"):
    with open("animales.csv", "w", encoding="utf8") as f:
        f.write("""nombre_comun,nombre_cientifico,familia,especie,estatus_conservacion,numero_identificacion,sexo,anio_nacimiento,pais_origen,continente_origen,zoologico
León,Panthera leo,Felidae,leo,Vulnerable,ID1001,M,2012,Sudáfrica,África,Zoo Central
Tigre,Panthera tigris,Felidae,tigris,En Peligro,ID1002,H,2015,India,Asia,Safari Park
""")
    logger.info("Archivo animales.csv creado.")

# Leemos los CSV
df_zoos     = pd.read_csv("zoos.csv")
df_animales = pd.read_csv("animales.csv")
logger.info("CSV cargados en DataFrames: %d zoos, %d animales.", len(df_zoos), len(df_animales))

df_zoos.head(), df_animales.head()

2025-04-27 01:01:54,696 INFO CSV cargados en DataFrames: 2 zoos, 2 animales.


(        nombre     ciudad    pais  tamanio_m2  especies_total    encargado  \
 0  Zoo Central     Madrid  España      120000             150  María López   
 1  Safari Park  San Diego    EEUU      450000             200   John Smith   
 
    presupuesto_anual  
 0          2500000.0  
 1          5000000.0  ,
   nombre_comun nombre_cientifico  familia especie estatus_conservacion  \
 0         León      Panthera leo  Felidae     leo           Vulnerable   
 1        Tigre   Panthera tigris  Felidae  tigris           En Peligro   
 
   numero_identificacion sexo  anio_nacimiento pais_origen continente_origen  \
 0                ID1001    M             2012   Sudáfrica            África   
 1                ID1002    H             2015       India              Asia   
 
      zoologico  
 0  Zoo Central  
 1  Safari Park  )

In [12]:
# Cell 6) ETL completo: Extract, Transform, Load + pruebas de calidad

# 6.1) Transform Zoos (validaciones básicas)
df_zoos["tamanio_m2"]       = df_zoos["tamanio_m2"].astype(float).clip(lower=0)
df_zoos["especies_total"]    = df_zoos["especies_total"].astype(int).clip(lower=0)
df_zoos["presupuesto_anual"] = df_zoos["presupuesto_anual"].astype(float).clip(lower=0)
logger.info("Zoológicos transformados.")

# 6.2’) Load Zoos filtrando duplicados en pandas
df_exist = pd.read_sql("SELECT nombre, ciudad, pais FROM zoologicos", engine)
df_nuevos = (
    df_zoos
    .merge(df_exist, on=["nombre","ciudad","pais"], how="left", indicator=True)
    .query("_merge=='left_only'")
    .drop(columns="_merge")
)

if not df_nuevos.empty:
    df_nuevos.to_sql(
        name="zoologicos",
        con=engine,
        index=False,
        if_exists="append",
        chunksize=50
    )
    logger.info("Se cargaron %d zoológicos nuevos", len(df_nuevos))
else:
    logger.info("No hay zoológicos nuevos que cargar")

# 6.3) Fetch map nombre→id zoologicos
with engine.connect() as conn:
    rows = conn.execute(text("SELECT id, nombre FROM zoologicos"))
    zoo_map = {r.nombre: r.id for r in rows}
logger.info("Mapa de zoologicos obtenido: %s", zoo_map)

# 6.4) Transform Animales
df_animales["anio_nacimiento"] = df_animales["anio_nacimiento"].astype(int)
df_animales["zoologico_id"]     = df_animales["zoologico"].map(zoo_map)
if df_animales["zoologico_id"].isna().any():
    raise ValueError("Algunos animales no tienen zoológico válido")
df_load = df_animales.drop(columns=["zoologico"])
logger.info("Animales transformados y listos para cargar.")

# 6.5’) Load Animales filtrando duplicados

# 1) Leemos los IDs ya insertados
df_exist_ids = pd.read_sql(
    "SELECT numero_identificacion FROM animales",
    con=engine
)

# 2) Hacemos un anti-join para quedarnos sólo con los nuevos
df_nuevos_anim = df_load[
    ~df_load["numero_identificacion"].isin(
        df_exist_ids["numero_identificacion"]
    )
].copy()

# 3) Insertamos sólo los que faltan
if not df_nuevos_anim.empty:
    df_nuevos_anim.to_sql(
        name="animales",
        con=engine,
        index=False,
        if_exists="append",
        chunksize=50
    )
    logger.info("Cargados %d animales nuevos", len(df_nuevos_anim))
else:
    logger.info("No había animales nuevos que cargar")

# Cell 6.6) Pruebas de calidad – versión corregida

import numpy as np

cnx = get_mysql_conn(init=False)
cur = cnx.cursor()

# 1) Conteos
cur.execute("SELECT COUNT(*) FROM zoologicos;")
assert cur.fetchone()[0] == len(df_zoos), "Error en conteo zoológicos"
cur.execute("SELECT COUNT(*) FROM animales;")
assert cur.fetchone()[0] == len(df_load), "Error en conteo animales"

# 2) Prueba de constraint UNIQUE usando mysql.connector
#    Convertimos la fila 0 a tipos Python puros
row0 = df_zoos.iloc[0]
params = [
    row0["nombre"], 
    row0["ciudad"], 
    row0["pais"],
    float(row0["tamanio_m2"]),       # convierte numpy.float64→float
    int(row0["especies_total"]),      # convierte numpy.int64→int
    row0["encargado"],
    float(row0["presupuesto_anual"])  # convierte numpy.float64→float
]

try:
    cur.execute(
        """
        INSERT INTO zoologicos
          (nombre,ciudad,pais,tamanio_m2,especies_total,encargado,presupuesto_anual)
        VALUES (%s,%s,%s,%s,%s,%s,%s)
        """,
        params
    )
    cnx.commit()
    raise AssertionError("El UNIQUE no ha funcionado para zoológicos")
except mysql.connector.errors.IntegrityError:
    cnx.rollback()
    logger.info("Prueba UNIQUE zoológicos: OK")

cur.close()
cnx.close()
logger.info("Todas las pruebas de calidad pasaron correctamente.")

2025-04-27 01:13:00,502 INFO Zoológicos transformados.
2025-04-27 01:13:00,518 INFO No hay zoológicos nuevos que cargar
2025-04-27 01:13:00,523 INFO Mapa de zoologicos obtenido: {'Safari Park': 2, 'Zoo Central': 1}
2025-04-27 01:13:00,526 INFO Animales transformados y listos para cargar.
2025-04-27 01:13:00,531 INFO No había animales nuevos que cargar
2025-04-27 01:13:00,978 INFO Prueba UNIQUE zoológicos: OK
2025-04-27 01:13:00,981 INFO Todas las pruebas de calidad pasaron correctamente.


In [13]:
# Listar los zoológicos
df = pd.read_sql("SELECT * FROM zoologicos ORDER BY nombre", engine)
print(df)

# Listar animales
df2 = pd.read_sql("SELECT * FROM animales ORDER BY nombre_comun", engine)
print(df2)

   id       nombre     ciudad    pais  tamanio_m2  especies_total  \
0   2  Safari Park  San Diego    EEUU    450000.0             200   
1   1  Zoo Central     Madrid  España    120000.0             150   

     encargado  presupuesto_anual  
0   John Smith          5000000.0  
1  María López          2500000.0  
   id nombre_comun nombre_cientifico  familia especie estatus_conservacion  \
0   1         León      Panthera leo  Felidae     leo           Vulnerable   
1   2        Tigre   Panthera tigris  Felidae  tigris           En Peligro   

  numero_identificacion sexo  anio_nacimiento pais_origen continente_origen  \
0                ID1001    M             2012   Sudáfrica            África   
1                ID1002    H             2015       India              Asia   

   zoologico_id  
0             1  
1             2  


In [14]:
import logging
handler = logging.FileHandler("etl_zoologico.log")
formatter = logging.Formatter("%(asctime)s %(levelname)s %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)