In [None]:
import os

# Asegurarse que el directorio de trabajo es la raíz del proyecto
import os
os.chdir(r'D:\ITMeet\Operaciones\BP010-data-pipelines-auditoria')
print(f'Working directory: {os.getcwd()}')
    
print(f"📍 Current working directory: {os.getcwd()}")

In [None]:
import sqlalchemy
from sqlalchemy import create_engine, text, pool, insert
from dotenv import load_dotenv
from datetime import datetime, timedelta

import logging

# Configuración básica de logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

load_dotenv()

In [None]:
def execute_sql(engine, esquema_stage_sql):
    """
    Ejecuta el DDL de Stage para crear todas las tablas.
    """
    try:
        with engine.connect() as connection:

            connection.execute(text(esquema_stage_sql))
            
            connection.commit()
        logger.info("consulta ejecutada exitosamente.")
    except Exception as e:
        logger.error(f"Error al crear tablas de Stage: {e}")
        raise

In [None]:
# Es buena práctica usar variables de entorno para las credenciales
DB_USER = os.getenv("DB_USER", "audit")
DB_PASSWORD = os.getenv("DEV_DB_PASSWORD", "audit")
DB_NAME = os.getenv("DB_NAME", "etl_data")

# --- PARÁMETROS DEL TÚNEL SSH ---
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = os.getenv("DB_PORT", "5432")

# 1. Construir la cadena de conexión
STAGE_DATABASE_URL = (
    f"postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@"
    f"{DB_HOST}:{DB_PORT}/{DB_NAME}"
)

print(f"Stage Database URL: {STAGE_DATABASE_URL}")

In [None]:
RAW_DATABASE_URL = (
        f'postgresql://{os.environ["RAW_DB_USER"]}:{os.environ["RAW_DB_PASSWORD"]}'
        f'@{os.environ["RAW_DB_HOST"]}:{os.environ.get("RAW_DB_PORT", 5432)}'
        f'/{os.environ["RAW_DB_NAME"]}'
    )

print(f"Raw Database URL: {RAW_DATABASE_URL}")

In [None]:
raw_engine = create_engine(
        RAW_DATABASE_URL,
        poolclass=pool.QueuePool,
        pool_size=10,                    # Conexiones permanentes
        max_overflow=20,                 # Conexiones adicionales bajo demanda
        pool_timeout=30,                 # Timeout para obtener conexión
        pool_recycle=3600,              # Reciclar conexiones cada hora
        pool_pre_ping=True,             # Verificar conexión antes de usar
        echo=False,                     # Logging de SQL
        connect_args={
            'connect_timeout': 10,
            'application_name': 'hydrog-scada-app'
        }
    )

In [None]:
# Crear el motor y ejecutar
stage_engine = create_engine(STAGE_DATABASE_URL)

In [None]:
with open("src/sql/esquema_stage.sql", "r") as file:
    esquema_stage_sql = file.read()

with open("src/sql/esquema_ref.sql", "r") as file:
    esquema_ref_sql = file.read()

with open("src/sql/datos_muestra.sql", "r") as file:
    datos_muestra_sql = file.read()

In [None]:
try:
    execute_sql(stage_engine, esquema_stage_sql)
    logger.info("Esquema de Stage creado exitosamente.")
except Exception as e:
    logger.error(f"Error al crear esquema de Stage: {e}")

In [None]:
execute_sql(stage_engine, esquema_ref_sql)

In [None]:
execute_sql(stage_engine, datos_muestra_sql)

In [None]:
with stage_engine.connect() as connection:
    response = connection.execute(text("SELECT * FROM landing_scada_data;"))

response.fetchall()

In [None]:
EXTRACTION_PROCESS_RAW = """/*
 * CONSULTA DE EXTRACCIÓN (PARA EJECUTAR EN EL MOTOR RAW)
 * Obtiene el último valor registrado hoy (CURRENT_DATE) para cada
 * combinación de unit_id, location_id y var_id.
 */
WITH DatosHoy AS (
    -- Paso 1: Combinar la fecha y hora, y filtrar solo por los registros de HOY
    SELECT
        idn,
        unit_id,
        location_id,
        var_id,
        measure,
        (craetedate + datatime)::TIMESTAMP AS datatime, 
        createuser,
        craetedate AS craetedate, -- Renombramos para STAGE
        moduser,
        moddate AS moddate         -- Renombramos para STAGE
    FROM
        data
    WHERE
        craetedate = CURRENT_DATE -- Filtra solo registros de hoy
),
DatosRankeados AS (
    -- Paso 2: Asignar un ranking a cada registro. 
    -- El 'rn = 1' será el más reciente para ese grupo.
    SELECT
        *,
        ROW_NUMBER() OVER(
            -- Particionamos por la clave única del sensor/variable
            PARTITION BY unit_id, location_id, var_id 
            -- Ordenamos por la fecha/hora descendente (el más nuevo primero)
            ORDER BY datatime DESC, idn DESC -- idn como desempate
        ) AS rn
    FROM
        DatosHoy
)
-- Paso 3: Seleccionar solo los registros más recientes (rn = 1)
-- Las columnas ya coinciden con la tabla STAGE 'landing_scada_data'
SELECT
    idn,
    unit_id,
    location_id,
    var_id,
    measure,
    datatime,
    createuser,
    craetedate,
    moduser,
    moddate
FROM
    DatosRankeados
WHERE
    rn = 1;"""

In [None]:
with raw_engine.connect() as connection:
    result = connection.execute(text(EXTRACTION_PROCESS_RAW))
    rows = result.fetchall()

In [None]:
columnas = [
    "idn", "unit_id", "location_id", "var_id", "measure", "datatime",
    "createuser", "craetedate", "moduser", "moddate"
]

In [None]:
datos_para_insertar = []
for tupla in rows:
    # zip() une los nombres de las columnas con los valores de la tupla
    diccionario_fila = dict(zip(columnas, tupla))
    datos_para_insertar.append(diccionario_fila)

In [None]:
truncate_statement = text(f"TRUNCATE TABLE landing_scada_data;")

with stage_engine.connect() as connection:
    connection.execute(truncate_statement)
    connection.commit()

In [None]:
ingest_to_stage = text("""                       
    INSERT INTO landing_scada_data (
        idn, 
        unit_id, 
        location_id, 
        var_id, 
        measure, 
        datatime, 
        createuser, 
        craetedate, 
        moduser, 
        moddate
    ) VALUES (
        :idn, 
        :unit_id, 
        :location_id, 
        :var_id, 
        :measure, 
        :datatime, 
        :createuser, 
        :craetedate, 
        :moduser, 
        :moddate
    )
""")



In [None]:
len(datos_para_insertar)

In [None]:
with stage_engine.connect() as connection:
    connection.execute(ingest_to_stage, datos_para_insertar)
    connection.commit()

In [None]:
from sqlalchemy import MetaData, Table
from sqlalchemy.engine import Engine # Asegúrate de importar Engine si no lo has hecho

# Asumimos que 'stage_engine' es tu motor (Engine) ya creado
# stage_engine = create_engine(...)

# 1. Crea un objeto MetaData
# Este objeto actuará como un catálogo
metadata_obj = MetaData()

# 2. Refleja la tabla
# Reemplaza 'landing_scada_data' con el nombre exacto de tu tabla
try:
    mi_tabla = Table(
        'landing_scada_data',
        metadata_obj,
        autoload_with=stage_engine  
    )

    # 3. Itera e imprime las columnas
    print(f"--- Esquema de la tabla: {mi_tabla.name} ---")
    
    for columna in mi_tabla.columns:
        print(f"  Columna: {columna.name}")
        print(f"     Tipo: {columna.type}")
        print(f"  Nullable: {columna.nullable}")
        # print(f"  Clave Primaria: {columna.primary_key}") # Descomenta si lo necesitas
        print("-" * 20)

except Exception as e:
    print(f"Error al reflejar la tabla: {e}")

In [None]:
with stage_engine.connect() as connection:
    result = connection.execute(text("SELECT * FROM landing_scada_data;"))
    rows = result.fetchall()

In [None]:
len(rows)