# ETL usando PL/pgSQL (Procedural Language/PostgreSQL).

---

## Configuración Inicial

Primero, configuraremos el entorno como antes, instalando PostgreSQL y creando las bases de datos `pagila` y `pagila_dw`.

In [None]:
# Instalar los paquetes necesarios
!pip install ipython-sql prettytable==0.7.2 psycopg2-binary

# Actualizar los paquetes e instalar PostgreSQL
!apt-get update -y
!apt-get install postgresql postgresql-contrib -y

# Iniciar el servicio de PostgreSQL
import os
os.system('service postgresql start')

# Cambiar la contraseña del usuario 'postgres'
!sudo -u postgres psql -c "ALTER USER postgres PASSWORD 'postgres';"

# Crear las bases de datos 'pagila' y 'pagila_dw'
!sudo -u postgres createdb pagila
!sudo -u postgres createdb pagila_dw

# Descargar el script de creación del esquema de Pagila
!wget https://raw.githubusercontent.com/devrimgunduz/pagila/master/pagila-schema.sql

# Descargar el script de inserción de datos de Pagila
!wget https://raw.githubusercontent.com/devrimgunduz/pagila/master/pagila-data.sql

# Ejecutar el script de creación del esquema en 'pagila'
!sudo -u postgres psql pagila < pagila-schema.sql

# Ejecutar el script de inserción de datos en 'pagila'
!sudo -u postgres psql pagila < pagila-data.sql

## Configuración de `postgres_fdw` y Esquemas

Configuraremos `postgres_fdw` para acceder a los datos de `pagila` desde `pagila_dw` y crearemos los esquemas necesarios.

In [None]:
import psycopg2

# Conectar a la base de datos pagila_dw
conn_dw = psycopg2.connect(
    dbname="pagila_dw",
    user="postgres",
    password="postgres",
    host="localhost",
    port="5432"
)

cursor_dw = conn_dw.cursor()

# Crear el esquema 'dw' en pagila_dw
cursor_dw.execute("CREATE SCHEMA IF NOT EXISTS dw;")
conn_dw.commit()

# Instalar la extensión postgres_fdw
cursor_dw.execute("CREATE EXTENSION IF NOT EXISTS postgres_fdw;")
conn_dw.commit()

# Crear el servidor FDW para conectarse a pagila
cursor_dw.execute("""
CREATE SERVER IF NOT EXISTS pagila_server
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'localhost', dbname 'pagila', port '5432');
""")
conn_dw.commit()

# Crear el mapeo de usuario
cursor_dw.execute("""
CREATE USER MAPPING IF NOT EXISTS FOR postgres
SERVER pagila_server
OPTIONS (user 'postgres', password 'postgres');
""")
conn_dw.commit()

# Importar el esquema público de pagila como esquema 'pagila' en pagila_dw
cursor_dw.execute("""
DROP SCHEMA IF EXISTS pagila CASCADE;
CREATE SCHEMA pagila;
IMPORT FOREIGN SCHEMA public
FROM SERVER pagila_server
INTO pagila;
""")
conn_dw.commit()

---

## Creación de los Procedimientos Almacenados con Upsert

### Consideraciones Previas

Para utilizar la funcionalidad de upsert (`INSERT ... ON CONFLICT DO UPDATE`), necesitamos definir claves únicas o primarias en las tablas de destino que nos permitan identificar los registros para la operación de actualización. Añadiremos restricciones de clave primaria o única en los campos adecuados de las tablas de dimensiones y hechos.

### Dimensión Película

#### Creación de la Tabla con Clave Primaria

Primero, creamos la tabla `dw.dim_pelicula` con una clave primaria en `film_id`.

In [None]:
procedure_create_dim_pelicula_table = """
CREATE TABLE IF NOT EXISTS dw.dim_pelicula (
    pelicula_key SERIAL PRIMARY KEY,
    film_id INTEGER UNIQUE,
    titulo TEXT,
    descripcion TEXT,
    anio_lanzamiento INTEGER,
    idioma TEXT,
    duracion_renta INTEGER,
    tasa_renta NUMERIC,
    duracion INTEGER,
    costo_reemplazo NUMERIC,
    clasificacion TEXT,
    caracteristicas_especiales TEXT,
    categoria TEXT,
    fecha_ultima_actualizacion TIMESTAMP
);
"""
cursor_dw.execute(procedure_create_dim_pelicula_table)
conn_dw.commit()

#### Procedimiento Almacenado con Upsert

In [None]:
procedure_dim_pelicula_upsert = """
CREATE OR REPLACE PROCEDURE sp_upsert_dim_pelicula()
LANGUAGE plpgsql
AS $$
BEGIN
    -- Insertar o actualizar registros en dim_pelicula
    WITH film_base AS (
        SELECT
            film_id,
            title AS titulo,
            description AS descripcion,
            release_year AS anio_lanzamiento,
            language_id,
            rental_duration AS duracion_renta,
            rental_rate AS tasa_renta,
            length AS duracion,
            replacement_cost AS costo_reemplazo,
            rating AS clasificacion,
            special_features AS caracteristicas_especiales,
            last_update AS fecha_ultima_actualizacion
        FROM pagila.film
    ),
    idioma AS (
        SELECT
            language_id,
            name AS idioma
        FROM pagila.language
    ),
    categoria AS (
        SELECT
            fc.film_id,
            c.name AS categoria
        FROM pagila.film_category AS fc
        JOIN pagila.category AS c ON fc.category_id = c.category_id
    ),
    data_to_upsert AS (
        SELECT
            fb.film_id,
            fb.titulo,
            fb.descripcion,
            fb.anio_lanzamiento,
            i.idioma,
            fb.duracion_renta,
            fb.tasa_renta,
            fb.duracion,
            fb.costo_reemplazo,
            fb.clasificacion,
            fb.caracteristicas_especiales,
            c.categoria,
            fb.fecha_ultima_actualizacion
        FROM film_base AS fb
        LEFT JOIN idioma AS i ON fb.language_id = i.language_id
        LEFT JOIN categoria AS c ON fb.film_id = c.film_id
    )
    INSERT INTO dw.dim_pelicula (
        film_id,
        titulo,
        descripcion,
        anio_lanzamiento,
        idioma,
        duracion_renta,
        tasa_renta,
        duracion,
        costo_reemplazo,
        clasificacion,
        caracteristicas_especiales,
        categoria,
        fecha_ultima_actualizacion
    )
    SELECT
        film_id,
        titulo,
        descripcion,
        anio_lanzamiento,
        idioma,
        duracion_renta,
        tasa_renta,
        duracion,
        costo_reemplazo,
        clasificacion,
        caracteristicas_especiales,
        categoria,
        fecha_ultima_actualizacion
    FROM data_to_upsert
    ON CONFLICT (film_id) DO UPDATE
    SET
        titulo = EXCLUDED.titulo,
        descripcion = EXCLUDED.descripcion,
        anio_lanzamiento = EXCLUDED.anio_lanzamiento,
        idioma = EXCLUDED.idioma,
        duracion_renta = EXCLUDED.duracion_renta,
        tasa_renta = EXCLUDED.tasa_renta,
        duracion = EXCLUDED.duracion,
        costo_reemplazo = EXCLUDED.costo_reemplazo,
        clasificacion = EXCLUDED.clasificacion,
        caracteristicas_especiales = EXCLUDED.caracteristicas_especiales,
        categoria = EXCLUDED.categoria,
        fecha_ultima_actualizacion = EXCLUDED.fecha_ultima_actualizacion;
END;
$$;
"""
cursor_dw.execute(procedure_dim_pelicula_upsert)
conn_dw.commit()
cursor_dw.execute("CALL sp_upsert_dim_pelicula();")
conn_dw.commit()

### Dimensión Cliente

#### Creación de la Tabla con Clave Primaria

In [None]:
procedure_create_dim_cliente_table = """
CREATE TABLE IF NOT EXISTS dw.dim_cliente (
    cliente_key SERIAL PRIMARY KEY,
    customer_id INTEGER UNIQUE,
    nombre TEXT,
    apellido TEXT,
    email TEXT,
    activo BOOLEAN,
    direccion TEXT,
    ciudad TEXT,
    pais TEXT,
    fecha_creacion TIMESTAMP,
    fecha_ultima_actualizacion TIMESTAMP
);
"""
cursor_dw.execute(procedure_create_dim_cliente_table)
conn_dw.commit()

#### Procedimiento Almacenado con Upsert

In [None]:
procedure_dim_cliente_upsert = """
CREATE OR REPLACE PROCEDURE sp_upsert_dim_cliente()
LANGUAGE plpgsql
AS $$
BEGIN
    WITH data_to_upsert AS (
        SELECT
            c.customer_id,
            c.first_name AS nombre,
            c.last_name AS apellido,
            c.email,
            c.active AS activo,
            a.address AS direccion,
            ci.city AS ciudad,
            co.country AS pais,
            c.create_date AS fecha_creacion,
            c.last_update AS fecha_ultima_actualizacion
        FROM pagila.customer AS c
        JOIN pagila.address AS a ON c.address_id = a.address_id
        JOIN pagila.city AS ci ON a.city_id = ci.city_id
        JOIN pagila.country AS co ON ci.country_id = co.country_id
    )
    INSERT INTO dw.dim_cliente (
        customer_id,
        nombre,
        apellido,
        email,
        activo,
        direccion,
        ciudad,
        pais,
        fecha_creacion,
        fecha_ultima_actualizacion
    )
    SELECT
        customer_id,
        nombre,
        apellido,
        email,
        activo,
        direccion,
        ciudad,
        pais,
        fecha_creacion,
        fecha_ultima_actualizacion
    FROM data_to_upsert
    ON CONFLICT (customer_id) DO UPDATE
    SET
        nombre = EXCLUDED.nombre,
        apellido = EXCLUDED.apellido,
        email = EXCLUDED.email,
        activo = EXCLUDED.activo,
        direccion = EXCLUDED.direccion,
        ciudad = EXCLUDED.ciudad,
        pais = EXCLUDED.pais,
        fecha_creacion = EXCLUDED.fecha_creacion,
        fecha_ultima_actualizacion = EXCLUDED.fecha_ultima_actualizacion;
END;
$$;
"""
cursor_dw.execute(procedure_dim_cliente_upsert)
conn_dw.commit()
cursor_dw.execute("CALL sp_upsert_dim_cliente();")
conn_dw.commit()

### Dimensión Empleado

#### Creación de la Tabla con Clave Primaria

In [None]:
procedure_create_dim_empleado_table = """
CREATE TABLE IF NOT EXISTS dw.dim_empleado (
    empleado_key SERIAL PRIMARY KEY,
    staff_id INTEGER UNIQUE,
    nombre TEXT,
    apellido TEXT,
    email TEXT,
    tienda_id INTEGER,
    activo BOOLEAN,
    nombre_usuario TEXT,
    fecha_ultima_actualizacion TIMESTAMP
);
"""
cursor_dw.execute(procedure_create_dim_empleado_table)
conn_dw.commit()

#### Procedimiento Almacenado con Upsert

In [None]:
procedure_dim_empleado_upsert = """
CREATE OR REPLACE PROCEDURE sp_upsert_dim_empleado()
LANGUAGE plpgsql
AS $$
BEGIN
    WITH data_to_upsert AS (
        SELECT
            s.staff_id,
            s.first_name AS nombre,
            s.last_name AS apellido,
            s.email,
            s.store_id AS tienda_id,
            s.active AS activo,
            s.username AS nombre_usuario,
            s.last_update AS fecha_ultima_actualizacion
        FROM pagila.staff AS s
    )
    INSERT INTO dw.dim_empleado (
        staff_id,
        nombre,
        apellido,
        email,
        tienda_id,
        activo,
        nombre_usuario,
        fecha_ultima_actualizacion
    )
    SELECT
        staff_id,
        nombre,
        apellido,
        email,
        tienda_id,
        activo,
        nombre_usuario,
        fecha_ultima_actualizacion
    FROM data_to_upsert
    ON CONFLICT (staff_id) DO UPDATE
    SET
        nombre = EXCLUDED.nombre,
        apellido = EXCLUDED.apellido,
        email = EXCLUDED.email,
        tienda_id = EXCLUDED.tienda_id,
        activo = EXCLUDED.activo,
        nombre_usuario = EXCLUDED.nombre_usuario,
        fecha_ultima_actualizacion = EXCLUDED.fecha_ultima_actualizacion;
END;
$$;
"""
cursor_dw.execute(procedure_dim_empleado_upsert)
conn_dw.commit()
cursor_dw.execute("CALL sp_upsert_dim_empleado();")
conn_dw.commit()

### Dimensión Tienda

#### Creación de la Tabla con Clave Primaria

In [None]:
procedure_create_dim_tienda_table = """
CREATE TABLE IF NOT EXISTS dw.dim_tienda (
    tienda_key SERIAL PRIMARY KEY,
    store_id INTEGER UNIQUE,
    gerente_staff_id INTEGER,
    direccion TEXT,
    ciudad TEXT,
    pais TEXT,
    codigo_postal TEXT,
    telefono TEXT,
    fecha_ultima_actualizacion TIMESTAMP
);
"""
cursor_dw.execute(procedure_create_dim_tienda_table)
conn_dw.commit()

#### Procedimiento Almacenado con Upsert

In [None]:
procedure_dim_tienda_upsert = """
CREATE OR REPLACE PROCEDURE sp_upsert_dim_tienda()
LANGUAGE plpgsql
AS $$
BEGIN
    WITH data_to_upsert AS (
        SELECT
            st.store_id,
            st.manager_staff_id AS gerente_staff_id,
            a.address AS direccion,
            ci.city AS ciudad,
            co.country AS pais,
            a.postal_code AS codigo_postal,
            a.phone AS telefono,
            st.last_update AS fecha_ultima_actualizacion
        FROM pagila.store AS st
        JOIN pagila.address AS a ON st.address_id = a.address_id
        JOIN pagila.city AS ci ON a.city_id = ci.city_id
        JOIN pagila.country AS co ON ci.country_id = co.country_id
    )
    INSERT INTO dw.dim_tienda (
        store_id,
        gerente_staff_id,
        direccion,
        ciudad,
        pais,
        codigo_postal,
        telefono,
        fecha_ultima_actualizacion
    )
    SELECT
        store_id,
        gerente_staff_id,
        direccion,
        ciudad,
        pais,
        codigo_postal,
        telefono,
        fecha_ultima_actualizacion
    FROM data_to_upsert
    ON CONFLICT (store_id) DO UPDATE
    SET
        gerente_staff_id = EXCLUDED.gerente_staff_id,
        direccion = EXCLUDED.direccion,
        ciudad = EXCLUDED.ciudad,
        pais = EXCLUDED.pais,
        codigo_postal = EXCLUDED.codigo_postal,
        telefono = EXCLUDED.telefono,
        fecha_ultima_actualizacion = EXCLUDED.fecha_ultima_actualizacion;
END;
$$;
"""
cursor_dw.execute(procedure_dim_tienda_upsert)
conn_dw.commit()
cursor_dw.execute("CALL sp_upsert_dim_tienda();")
conn_dw.commit()

### Dimensión Tiempo

La dimensión tiempo es un poco diferente, ya que se basa en un rango de fechas generado. Podemos asumir que las fechas no cambiarán, pero para mantener consistencia, también aplicaremos upsert.

#### Creación de la Tabla con Clave Primaria

In [None]:
procedure_create_dim_tiempo_table = """
CREATE TABLE IF NOT EXISTS dw.dim_tiempo (
    tiempo_key SERIAL PRIMARY KEY,
    fecha DATE UNIQUE,
    dia INTEGER,
    mes INTEGER,
    anio INTEGER,
    trimestre INTEGER,
    nombre_dia TEXT,
    nombre_mes TEXT,
    es_fin_de_semana BOOLEAN
);
"""
cursor_dw.execute(procedure_create_dim_tiempo_table)
conn_dw.commit()

#### Procedimiento Almacenado con Upsert

In [None]:
procedure_dim_tiempo_upsert = """
CREATE OR REPLACE PROCEDURE sp_upsert_dim_tiempo()
LANGUAGE plpgsql
AS $$
BEGIN
    WITH date_range AS (
        SELECT
            MIN(DATE(rental_date)) AS start_date,
            MAX(DATE(rental_date)) AS end_date
        FROM pagila.rental
    ),
    dates AS (
        SELECT
            generate_series(
                (SELECT start_date FROM date_range),
                (SELECT end_date FROM date_range),
                interval '1 day'
            )::date AS fecha
    ),
    data_to_upsert AS (
        SELECT
            fecha,
            EXTRACT(DAY FROM fecha)::INT AS dia,
            EXTRACT(MONTH FROM fecha)::INT AS mes,
            EXTRACT(YEAR FROM fecha)::INT AS anio,
            EXTRACT(QUARTER FROM fecha)::INT AS trimestre,
            TO_CHAR(fecha, 'Day') AS nombre_dia,
            TO_CHAR(fecha, 'Month') AS nombre_mes,
            CASE WHEN EXTRACT(ISODOW FROM fecha) IN (6,7) THEN TRUE ELSE FALSE END AS es_fin_de_semana
        FROM dates
        ORDER BY fecha
    )
    INSERT INTO dw.dim_tiempo (
        fecha,
        dia,
        mes,
        anio,
        trimestre,
        nombre_dia,
        nombre_mes,
        es_fin_de_semana
    )
    SELECT
        fecha,
        dia,
        mes,
        anio,
        trimestre,
        nombre_dia,
        nombre_mes,
        es_fin_de_semana
    FROM data_to_upsert
    ON CONFLICT (fecha) DO UPDATE
    SET
        dia = EXCLUDED.dia,
        mes = EXCLUDED.mes,
        anio = EXCLUDED.anio,
        trimestre = EXCLUDED.trimestre,
        nombre_dia = EXCLUDED.nombre_dia,
        nombre_mes = EXCLUDED.nombre_mes,
        es_fin_de_semana = EXCLUDED.es_fin_de_semana;
END;
$$;
"""
cursor_dw.execute(procedure_dim_tiempo_upsert)
conn_dw.commit()
cursor_dw.execute("CALL sp_upsert_dim_tiempo();")
conn_dw.commit()

### Tabla de Hechos Alquileres

#### Creación de la Tabla con Clave Primaria

In [None]:
procedure_create_hechos_alquileres_table = """
CREATE TABLE IF NOT EXISTS dw.hechos_alquileres (
    alquiler_key SERIAL PRIMARY KEY,
    rental_id INTEGER UNIQUE,
    pelicula_key INTEGER,
    cliente_key INTEGER,
    empleado_key INTEGER,
    tienda_key INTEGER,
    tiempo_key INTEGER,
    duracion_alquiler NUMERIC,
    monto NUMERIC
);
"""
cursor_dw.execute(procedure_create_hechos_alquileres_table)
conn_dw.commit()

#### Procedimiento Almacenado con Upsert

In [None]:
procedure_hechos_alquileres_upsert = """
CREATE OR REPLACE PROCEDURE sp_upsert_hechos_alquileres()
LANGUAGE plpgsql
AS $$
BEGIN
    WITH data_to_upsert AS (
        SELECT
            r.rental_id,
            dp.pelicula_key,
            dc.cliente_key,
            de.empleado_key,
            dt.tienda_key,
            dtm.tiempo_key,
            EXTRACT(EPOCH FROM (r.return_date - r.rental_date))/60 AS duracion_alquiler,
            p.amount AS monto
        FROM pagila.rental AS r
        JOIN pagila.payment AS p ON r.rental_id = p.rental_id
        JOIN pagila.inventory AS i ON r.inventory_id = i.inventory_id
        JOIN dw.dim_pelicula AS dp ON i.film_id = dp.film_id
        JOIN dw.dim_cliente AS dc ON r.customer_id = dc.customer_id
        JOIN dw.dim_empleado AS de ON r.staff_id = de.staff_id
        JOIN dw.dim_tienda AS dt ON i.store_id = dt.store_id
        JOIN dw.dim_tiempo AS dtm ON DATE(r.rental_date) = dtm.fecha
    )
    INSERT INTO dw.hechos_alquileres (
        rental_id,
        pelicula_key,
        cliente_key,
        empleado_key,
        tienda_key,
        tiempo_key,
        duracion_alquiler,
        monto
    )
    SELECT
        rental_id,
        pelicula_key,
        cliente_key,
        empleado_key,
        tienda_key,
        tiempo_key,
        duracion_alquiler,
        monto
    FROM data_to_upsert
    ON CONFLICT (rental_id) DO UPDATE
    SET
        pelicula_key = EXCLUDED.pelicula_key,
        cliente_key = EXCLUDED.cliente_key,
        empleado_key = EXCLUDED.empleado_key,
        tienda_key = EXCLUDED.tienda_key,
        tiempo_key = EXCLUDED.tiempo_key,
        duracion_alquiler = EXCLUDED.duracion_alquiler,
        monto = EXCLUDED.monto;
END;
$$;
"""
cursor_dw.execute(procedure_hechos_alquileres_upsert)
conn_dw.commit()
cursor_dw.execute("CALL sp_upsert_hechos_alquileres();")
conn_dw.commit()

---

## Verificación de las Tablas Creadas

Verificaremos que las tablas se hayan creado correctamente en la base de datos `pagila_dw` y que contengan los datos esperados.

In [None]:
import psycopg2

conn = psycopg2.connect(
    dbname="pagila_dw",
    user="postgres",
    password="postgres",
    host="localhost",
    port="5432"
)

cursor = conn.cursor()
cursor.execute("""
    SELECT table_schema, table_name
    FROM information_schema.tables
    WHERE table_schema = 'dw';
""")
tables = cursor.fetchall()
for schema, table in tables:
    print(f"Schema: {schema}, Table: {table}")
    
cursor.close()
conn.close()

### Salida Esperada

Esto debería mostrar todas las tablas creadas en el esquema `dw`:

```
Schema: dw, Table: dim_pelicula
Schema: dw, Table: dim_cliente
Schema: dw, Table: dim_empleado
Schema: dw, Table: dim_tienda
Schema: dw, Table: dim_tiempo
Schema: dw, Table: hechos_alquileres
```

## Consultar los Datos

Usaremos `ipython-sql` para conectarnos a la base de datos y consultar las tablas creadas.

In [None]:
# Cargar la extensión SQL
%load_ext sql
%sql postgresql://postgres:postgres@localhost/pagila_dw

# Consultar la tabla dim_pelicula
%%sql
SELECT * FROM dw.dim_pelicula LIMIT 10;

### Resultado

Al ejecutar el código anterior, deberías ver las primeras 10 filas de la tabla `dim_pelicula` en la base de datos `pagila_dw`.

---

## Explicación de los Cambios Realizados

- **Uso de Claves Primarias y Únicas**: Añadimos claves primarias y únicas en las tablas de destino para poder utilizar la cláusula `ON CONFLICT` en las operaciones de inserción.

- **Implementación de Upsert**: En lugar de eliminar y volver a crear las tablas, los procedimientos almacenados ahora utilizan `INSERT ... ON CONFLICT DO UPDATE` para insertar nuevos registros y actualizar los existentes basándose en las claves únicas definidas.

- **Mantener Datos Históricos**: Al utilizar upsert, podemos mantener los datos existentes y actualizarlos solo si hay cambios, lo que es útil para escenarios donde los datos de origen pueden cambiar con el tiempo.

---

## Ejecución Regular del ETL

Puedes programar la ejecución de estos procedimientos almacenados para que se ejecuten periódicamente y mantengan actualizadas las tablas de tu data warehouse.

In [None]:
# Llamar a los procedimientos para actualizar los datos
cursor_dw = conn_dw.cursor()
cursor_dw.execute("CALL sp_upsert_dim_pelicula();")
cursor_dw.execute("CALL sp_upsert_dim_cliente();")
cursor_dw.execute("CALL sp_upsert_dim_empleado();")
cursor_dw.execute("CALL sp_upsert_dim_tienda();")
cursor_dw.execute("CALL sp_upsert_dim_tiempo();")
cursor_dw.execute("CALL sp_upsert_hechos_alquileres();")
conn_dw.commit()
cursor_dw.close()
conn_dw.close()

---

Este proceso ha actualizado los procedimientos almacenados para que utilicen **insert-update (upsert)**, permitiendo así que el ETL sea más eficiente y que las tablas en `pagila_dw` reflejen los cambios en los datos de origen sin necesidad de recrearlas cada vez.