In [1]:
# Cell 1: Imports básicos y configuración

import os
import math
import random
from datetime import datetime, timedelta

import yaml
import numpy as np
import pandas as pd


In [None]:
# Cell 2: Función para cargar la OMS desde YAML

RUTA_OMS = "../data/oms/99_oms_stde_v2.yaml"
OUTPUT_DIR = "../data/synthetic/"   # donde irán los CSV STDE

os.makedirs(OUTPUT_DIR, exist_ok=True)


def cargar_oms(ruta_yaml: str) -> dict:
    with open(ruta_yaml, "r", encoding="utf-8") as f:
        data = yaml.safe_load(f)
    return data

oms = cargar_oms(RUTA_OMS)

# Vista rápida
print("ID OMS:", oms.get("id_oms"))
print("Nombre:", oms.get("nombre"))
print("Fecha inicio:", oms["escenario_operacion"]["fecha_inicio"])
print("Número de semanas:", oms["escenario_operacion"]["numero_semanas"])


ID OMS: OMS_STDE_001
Nombre: OMS K9 Mining Safety - Escenario Sintético Aster 7 semanas
Fecha inicio: 2025-01-20
Número de semanas: 7


In [None]:
# Cell 3: Utilidades varias (calendario, IDs, etc.)

# Para reproducibilidad
np.random.seed(42)
random.seed(42)

# Mapeo días en español (datetime.weekday() -> nombre)
# weekday(): Monday=0, Sunday=6
DIA_SEMANA_ES = {
    0: "Lunes",
    1: "Martes",
    2: "Miércoles",
    3: "Jueves",
    4: "Viernes",
    5: "Sábado",
    6: "Domingo",
}

def generar_calendario(oms: dict) -> pd.DataFrame:
    """
    Genera un calendario según el número de semanas definido en la OMS.
    Columnas:
        - fecha
        - semana (1..N)
        - dia_semana (texto)
    """
    fecha_inicio = datetime.strptime(
        oms["escenario_operacion"]["fecha_inicio"], "%Y-%m-%d"
    )
    n_semanas = oms["escenario_operacion"]["numero_semanas"]
    n_dias_total = n_semanas * 7

    fechas = [fecha_inicio + timedelta(days=i) for i in range(n_dias_total)]
    data = []
    for i, fecha in enumerate(fechas):
        semana = i // 7 + 1
        dia_semana = DIA_SEMANA_ES[fecha.weekday()]
        data.append(
            {
                "fecha": fecha.date(),
                "semana": semana,
                "dia_semana": dia_semana,
            }
        )

    df = pd.DataFrame(data)
    return df

# Crear calendario real
calendario = generar_calendario(oms)
calendario.head()

In [None]:
# Cell 4: Extraer listas de áreas, roles, turnos y distribuciones

escenario = oms["escenario_operacion"]

TURNOS = escenario["turnos"]
AREAS = escenario["areas_operacionales"]
ROLES = escenario["roles_clave"]

DISTRIB = oms["distribuciones"]

print("Turnos:", [t["id_turno"] for t in TURNOS])
print("Áreas:", [a["id_area"] for a in AREAS])
print("Roles:", [r["id_rol"] for r in ROLES])
print("Semanas definidas en distribuciones:", list(DISTRIB.keys()))


Parámetros de simulación globales

Aquí definimos el tamaño de la “población base” (100 trabajadores -> las tasas de la OMS se tratan como conteos por semana; con 200 trabajadores, se duplican, etc.).

In [None]:
# Cell 5: Parámetros de simulación

POBLACION_BASE = 120   # trabajadores sintéticos "equivalentes"
OUTPUT_DIR = "../data/synthetic/"

os.makedirs(OUTPUT_DIR, exist_ok=True)

print("POBLACION_BASE:", POBLACION_BASE)
print("Archivos se guardarán en:", OUTPUT_DIR)


In [None]:
# Cell 6: Helper para convertir "por 100" semanal a lambda diario - tasa semanal → λ diario (Poisson)

def lambda_diario(desde_por_100_semana: float, poblacion: int = POBLACION_BASE) -> float:
    """
    Convierte una tasa semanal "por 100 trabajadores" a una lambda diaria
    para una distribución Poisson, según la población base.
    """
    # Escalamos por población
    tasa_semana_real = desde_por_100_semana * (poblacion / 100.0)
    # Asumimos distribución uniforme en los 7 días para la base
    return tasa_semana_real / 7.0


In [None]:
# Cell 7: Generación de observaciones sintéticas - Función principal

def generar_observaciones(oms: dict, calendario: pd.DataFrame) -> pd.DataFrame:
    stde_obs_cfg = oms["stde"]["observaciones"]
    tipos_validos = stde_obs_cfg["tipo_observaciones_validos"]
    distrib = oms["distribuciones"]

    # Map de tipos (ej. {"OPG": "Observación Preventiva Gestión", ...})
    tipos_map = {t["codigo"]: t["nombre"] for t in tipos_validos}

    registros = []
    contador_obs = 1

    for _, row in calendario.iterrows():
        fecha = row["fecha"]
        semana = int(row["semana"])
        dia_semana = row["dia_semana"]

        sem_key = f"semana_{semana}"
        sem_cfg = distrib[sem_key]
        obs_por_100 = sem_cfg["observaciones_por_100"]

        # Factores por tipo (lunes crítico)
        factor_OCC = 1.0
        if semana == 4 and dia_semana == "Lunes":
            lunes_critico_cfg = sem_cfg.get("lunes_critico", {})
            factor_OCC = lunes_critico_cfg.get("factor_multiplicador_OCC", 1.0)

        # Para cada tipo (OPG, OCC) calculamos λ diario (Poisson)
        for tipo, tasa_semana_100 in obs_por_100.items():
            base_lambda = lambda_diario(tasa_semana_100, POBLACION_BASE)

            if tipo == "OCC":
                lam = base_lambda * factor_OCC
            else:
                lam = base_lambda

            # Número de observaciones para este día y tipo
            n_obs = np.random.poisson(lam)

            if n_obs <= 0:
                continue

            # Distribuimos las observaciones entre turnos, áreas y roles
            for _ in range(n_obs):
                turno = random.choice(TURNOS)["id_turno"]
                area = random.choice(AREAS)["id_area"]
                rol = random.choice(ROLES)["id_rol"]

                registros.append(
                    {
                        "id_observacion": f"OBS_{contador_obs:06d}",
                        "fecha": fecha,
                        "semana": semana,
                        "dia_semana": dia_semana,
                        "turno": turno,
                        "id_area": area,
                        "id_rol": rol,
                        "tipo_observacion": tipo,
                        "descripcion_observacion": f"Observación sintética tipo {tipo_map[tipo]}",
                        "accion_inmediata": "Acción sintética registrada.",
                    }
                )
                contador_obs += 1

    df_obs = pd.DataFrame(registros)
    return df_obs

df_observaciones = generar_observaciones(oms, calendario)
print("Total observaciones generadas:", len(df_observaciones))
df_observaciones.head()


In [None]:
# Cell 8: Generación de Incidentes - Helpers para severidad de incidentes

def con_lesion_por_tipo(tipo_incidente: str) -> bool:
    # Assumption simple:
    # IMAY, IMED, ILEV => con lesión; INMI => sin lesión
    if tipo_incidente in ("IMAY", "IMED", "ILEV"):
        return True
    return False

def dias_perdidos_por_tipo(tipo_incidente: str) -> int:
    # Lógica sintética simple
    if tipo_incidente == "IMAY":
        # Incidente mayor: normalmente alto impacto
        return max(0, int(np.random.poisson(15)) + 5)
    if tipo_incidente == "IMED":
        return max(0, int(np.random.poisson(5)) + 1)
    if tipo_incidente == "ILEV":
        return max(0, int(np.random.poisson(1)))
    # INMI: sin lesión => 0 días
    return 0


In [None]:
# Cell 9: Generación de incidentes sintéticos - Función principal

def generar_incidentes(oms: dict, calendario: pd.DataFrame) -> pd.DataFrame:
    stde_inc_cfg = oms["stde"]["incidentes"]
    tipos_validos = stde_inc_cfg["tipos_incidentes_validos"]
    distrib = oms["distribuciones"]

    tipos_map = {t["codigo"]: t["nombre"] for t in tipos_validos}

    registros = []
    contador_inc = 1

    for _, row in calendario.iterrows():
        fecha = row["fecha"]
        semana = int(row["semana"])
        dia_semana = row["dia_semana"]

        sem_key = f"semana_{semana}"
        sem_cfg = distrib[sem_key]
        inc_por_100 = sem_cfg["incidentes_por_100"]

        # Factor de lunes crítico para incidentes
        factor_incidentes = 1.0
        if semana == 4 and dia_semana == "Lunes":
            lunes_critico_cfg = sem_cfg.get("lunes_critico", {})
            factor_incidentes = lunes_critico_cfg.get(
                "factor_multiplicador_incidentes", 1.0
            )

        for tipo, tasa_semana_100 in inc_por_100.items():
            base_lambda = lambda_diario(tasa_semana_100, POBLACION_BASE)
            lam = base_lambda * factor_incidentes

            n_inc = np.random.poisson(lam)
            if n_inc <= 0:
                continue

            for _ in range(n_inc):
                turno = random.choice(TURNOS)["id_turno"]
                area = random.choice(AREAS)["id_area"]
                rol = random.choice(ROLES)["id_rol"]

                con_les = con_lesion_por_tipo(tipo)
                dias_perdidos = dias_perdidos_por_tipo(tipo) if con_les else 0

                registros.append(
                    {
                        "id_incidente": f"INC_{contador_inc:06d}",
                        "fecha": fecha,
                        "semana": semana,
                        "dia_semana": dia_semana,
                        "turno": turno,
                        "id_area": area,
                        "id_rol": rol,
                        "tipo_incidente": tipo,
                        "descripcion_incidente": f"Incidente sintético tipo {tipos_map[tipo]}",
                        "con_lesion": con_les,
                        "dias_perdidos": dias_perdidos,
                    }
                )
                contador_inc += 1

    df_inc = pd.DataFrame(registros)
    return df_inc

df_incidentes = generar_incidentes(oms, calendario)
print("Total incidentes generados:", len(df_incidentes))
df_incidentes.head()


In [None]:
# Cell 10: Generación de auditorías sintéticas

def generar_auditorias(oms: dict, calendario: pd.DataFrame) -> pd.DataFrame:
    stde_aud_cfg = oms["stde"]["auditorias"]
    tipos_validos = stde_aud_cfg["tipos_auditoria_validos"]
    distrib = oms["distribuciones"]

    tipos_map = {t["codigo"]: t["nombre"] for t in tipos_validos}
    registros = []
    contador_aud = 1

    for _, row in calendario.iterrows():
        fecha = row["fecha"]
        semana = int(row["semana"])
        dia_semana = row["dia_semana"]

        sem_key = f"semana_{semana}"
        sem_cfg = distrib[sem_key]
        aud_por_100 = sem_cfg["auditorias_por_100"]

        # λ diarios por tipo de auditoría
        lambdas = {
            tipo: lambda_diario(tasa_semana_100, POBLACION_BASE)
            for tipo, tasa_semana_100 in aud_por_100.items()
        }

        for tipo, lam in lambdas.items():
            n_aud = np.random.poisson(lam)
            if n_aud <= 0:
                continue

            for _ in range(n_aud):
                turno = random.choice(TURNOS)["id_turno"]
                area = random.choice(AREAS)["id_area"]

                if tipo == "AUC":
                    resultado = "Cumple requisitos revisados."
                    hallazgos = "Sin hallazgos críticos; observaciones menores."
                else:  # AUF
                    resultado = "No cumple todos los requisitos."
                    hallazgos = "Se identifican desviaciones relevantes."

                registros.append(
                    {
                        "id_auditoria": f"AUD_{contador_aud:06d}",
                        "fecha": fecha,
                        "semana": semana,
                        "dia_semana": dia_semana,
                        "turno": turno,
                        "id_area": area,
                        "tipo_auditoria": tipo,
                        "resultado": resultado,
                        "hallazgos_clave": hallazgos,
                    }
                )
                contador_aud += 1

    df_aud = pd.DataFrame(registros)
    return df_aud

df_auditorias = generar_auditorias(oms, calendario)
print("Total auditorías generadas:", len(df_auditorias))
df_auditorias.head()


In [None]:
# Cell 11: Guardar CSV y hacer un resumen rápido

ruta_obs = os.path.join(OUTPUT_DIR, "stde_observaciones_7s.csv")
ruta_inc = os.path.join(OUTPUT_DIR, "stde_incidentes_7s.csv")
ruta_aud = os.path.join(OUTPUT_DIR, "stde_auditorias_7s.csv")

df_observaciones.to_csv(ruta_obs, index=False, encoding="utf-8")
df_incidentes.to_csv(ruta_inc, index=False, encoding="utf-8")
df_auditorias.to_csv(ruta_aud, index=False, encoding="utf-8")

print("Archivos guardados:")
print(" -", ruta_obs)
print(" -", ruta_inc)
print(" -", ruta_aud)

print("\nResumen por semana - Observaciones:")
print(df_observaciones.groupby("semana")["id_observacion"].count())

print("\nResumen por semana - Incidentes:")
print(df_incidentes.groupby("semana")["id_incidente"].count())

print("\nResumen por semana - Auditorías:")
print(df_auditorias.groupby("semana")["id_auditoria"].count())


In [None]:
# Cell 12: Vista rápida del lunes crítico (semana 4, Lunes)

lunes_critico_obs = df_observaciones[
    (df_observaciones["semana"] == 4) & (df_observaciones["dia_semana"] == "Lunes")
]
lunes_critico_inc = df_incidentes[
    (df_incidentes["semana"] == 4) & (df_incidentes["dia_semana"] == "Lunes")
]

print("Lunes crítico - Observaciones por tipo:")
print(lunes_critico_obs["tipo_observacion"].value_counts())

print("\nLunes crítico - Incidentes por tipo:")
print(lunes_critico_inc["tipo_incidente"].value_counts())


Con este notebook tienes:

Carga de la OMS 99_oms_stde_v1.yaml.

Calendario de 7 semanas.

Generación de:

stde_observaciones_7s.csv

stde_incidentes_7s.csv

stde_auditorias_7s.csv

Patrón de degradación → lunes crítico → intervención → recuperación ya reflejado en las tasas, con impacto explícito en la semana 4 / Lunes.