In [1]:
import yaml
import psycopg2
from psycopg2 import sql
from sqlalchemy import create_engine, text
import pandas as pd

In [8]:
def load_config(file_path="config.yaml"):
    with open(file_path, "r") as file: # leer el documento del yaml
        return yaml.safe_load(file) 
        
    

In [12]:
config = load_config() # cargar los datos del yaml
db_config = config["database"]

# Load credentials
db_user = db_config["user"]
db_password = db_config["password"]
db_host = db_config["host"]
db_port = db_config["port"]
db_name = db_config["name"]

# DB connection
conn = psycopg2.connect( # conexion con la base de datos postgres
    dbname="postgres", 
    user=db_user,
    password=db_password,
    host=db_host,
    port=db_port
)
conn.autocommit = True

{'database': {'user': 'santiagoaristizabal', 'password': 'new_password', 'host': 'localhost', 'port': 5432, 'name': 'etl_db'}}


In [13]:
db_name = "etl_db" # nombre de la base de datos
try:
    with conn.cursor() as cur:
        cur.execute(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(db_name))) # creacion de la base de datos
        print(f"Base de datos '{db_name}' creada exitosamente.")
except psycopg2.errors.DuplicateDatabase:
    print(f"La base de datos '{db_name}' ya existe.")
finally:
    conn.close()

Base de datos 'etl_db' creada exitosamente.


In [14]:
engine = create_engine(f"postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}") # carga de la base de datos completa

with engine.connect() as conn:
    conn.execute(text("""
        CREATE TABLE IF NOT EXISTS tabla_etl (
            id SERIAL PRIMARY KEY,
            nombre VARCHAR(100),
            edad INT,
            email VARCHAR(100),
            fecha_registro TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        );
    """))
    conn.commit()  # Asegúrate de confirmar los cambios
    print("Tabla 'tabla_etl' creada exitosamente en PostgreSQL.")

Tabla 'tabla_etl' creada exitosamente en PostgreSQL.


In [15]:
from sqlalchemy import text

# Insertar datos en la tabla
with engine.connect() as conn:
    conn.execute(text("""
        INSERT INTO tabla_etl (nombre, edad, email)
        VALUES 
            ('Juan Pérez', 28, 'juan@example.com'),
            ('María García', 34, 'maria@example.com'),
            ('Carlos López', 25, 'carlos@example.com');
    """))
    conn.commit()  # Confirmar los cambios
    print("Datos insertados exitosamente en 'tabla_etl'.")

Datos insertados exitosamente en 'tabla_etl'.


In [16]:
# Leer datos de la tabla
with engine.connect() as conn:
    result = conn.execute(text("SELECT * FROM tabla_etl;"))
    rows = result.fetchall()

# Mostrar los resultados
print("Datos en 'tabla_etl':")
for row in rows:
    print(row)

Datos en 'tabla_etl':
(1, 'Juan Pérez', 28, 'juan@example.com', datetime.datetime(2025, 2, 20, 19, 33, 8, 790953))
(2, 'María García', 34, 'maria@example.com', datetime.datetime(2025, 2, 20, 19, 33, 8, 790953))
(3, 'Carlos López', 25, 'carlos@example.com', datetime.datetime(2025, 2, 20, 19, 33, 8, 790953))


In [17]:
with engine.connect() as conn:
    df = pd.read_sql("SELECT * FROM tabla_etl", conn)

In [18]:
df


Unnamed: 0,id,nombre,edad,email,fecha_registro
0,1,Juan Pérez,28,juan@example.com,2025-02-20 19:33:08.790953
1,2,María García,34,maria@example.com,2025-02-20 19:33:08.790953
2,3,Carlos López,25,carlos@example.com,2025-02-20 19:33:08.790953


In [19]:
df_transformed = df.copy()
df_transformed["nombre"] = df_transformed["nombre"].str.upper()  # Uppercase names
df_transformed["age_category"] = df_transformed["edad"].apply(
    lambda x: "Young" if x < 30 else "Middle-aged" if x < 50 else "Senior"
)

# Display transformed DataFrame
df_transformed.head()

Unnamed: 0,id,nombre,edad,email,fecha_registro,age_category
0,1,JUAN PÉREZ,28,juan@example.com,2025-02-20 19:33:08.790953,Young
1,2,MARÍA GARCÍA,34,maria@example.com,2025-02-20 19:33:08.790953,Middle-aged
2,3,CARLOS LÓPEZ,25,carlos@example.com,2025-02-20 19:33:08.790953,Young


In [20]:
with engine.connect() as conn:
    conn.execute(text("""
        CREATE TABLE IF NOT EXISTS transformed_data (
            id SERIAL PRIMARY KEY,
            nombre VARCHAR(100),
            edad INT,
            email VARCHAR(100),
            fecha_registro TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            age_category VARCHAR(20)
        );
    """))
    conn.commit()  # Asegúrate de confirmar los cambios
    print("Tabla 'transformed_data' creada exitosamente en PostgreSQL.")

Tabla 'transformed_data' creada exitosamente en PostgreSQL.


In [21]:
with engine.connect() as conn:
    df_transformed.to_sql("transformed_etl", con=engine, if_exists="append", index=False)

print("Transformed data stored successfully in 'transformed_etl'.")

Transformed data stored successfully in 'transformed_etl'.


In [22]:
with engine.connect() as conn:
    db_transformed_df = pd.read_sql("SELECT * FROM transformed_etl", conn)
    
db_transformed_df

Unnamed: 0,id,nombre,edad,email,fecha_registro,age_category
0,1,JUAN PÉREZ,28,juan@example.com,2025-02-20 19:33:08.790953,Young
1,2,MARÍA GARCÍA,34,maria@example.com,2025-02-20 19:33:08.790953,Middle-aged
2,3,CARLOS LÓPEZ,25,carlos@example.com,2025-02-20 19:33:08.790953,Young
