# Optimización de Pesos con Umbral Variable (Oráculo Avanzado)

Este notebook implementa un **Oráculo Avanzado** para refinar la "Zona de Duda" (Promedio entre 3.0 y 3.5).

## Reglas del Oráculo
0.  **Defecto**: Usar reglas básicas (Muerte Súbita, Debilidad Promedio).
1.  **Zona de Duda (3.0 $\le$ Promedio $\le$ 3.5)**:
    *   Si **Examen < 2.0**: Pierde (0).
    *   Si **2.0 $\le$ Examen < 3.0**:
        *   Gana (1) SI Y SOLO SI gana **todos** los items "advanced".
        *   Si no, Pierde (0).
    *   Si **Examen $\ge$ 3.0**:
        *   Gana (1) si gana **todos** los items "advanced" O "filtered".
        *   Pierde (0) si pierde **2 o más** de esos items.
        *   (Gana si pierde exactamente 1).

$$ Loss = MSE + FPR + (5.0 \cdot FNR) $$


In [35]:
import numpy as np
import pandas as pd
import copy
from scipy.optimize import minimize, Bounds, LinearConstraint

np.random.seed(42)
pd.set_option('display.float_format', '{:.2f}'.format)
pd.set_option('display.max_columns', None)


In [36]:
# 1. CONFIGURACIÓN DE EVALUACIÓN
config_evaluacion = {
    "regular_items": [
        # MÓDULO 1
        {"name": "Quiz_1", "weakness": 3.5, "correlated_with": "Tarea_1", "correlation_factor": 0.5, "type": "easy", "threshold": 2.2},
        {"name": "Tarea_1", "weakness": 2.5, "min": 0.05, "max": 0.20, "suggested": 0.15, "type": "advanced", "threshold": 3.0},
        
        # MÓDULO 2
        {"name": "Quiz_2", "correlated_with": "Quiz_1", "correlation_factor": 1.2, "weakness": 3.5, "type": "easy", "threshold": 2.5},
        {"name": "Tarea_2", "correlated_with": "Tarea_1", "correlation_factor": 1.2, "weakness": 2.5, "type": "filtered", "threshold": 2.8},
        
        # MÓDULO 3
        {"name": "Quiz_3", "correlated_with": "Quiz_1", "correlation_factor": 1.0, "weakness": 3.5, "type": "easy", "threshold": 2.0},
        {"name": "Tarea_3", "weakness": 2.5, "correlated_with": "Tarea_1", "correlation_factor": 1.0, "type": "advanced", "threshold": 3.0},
    ],
    "definitory_item": {
        "name": "Examen Final", "sudden_death": 2.50, "max_weight": 0.40, "type": "exam", "threshold": 2.5
    }
}

# Fix Type: 'filtered' vs 'filter' inconsistency
for item in config_evaluacion['regular_items']:
    if item['type'] == 'filtered': item['type'] = 'filter'

# --- AUTO-AJUSTE ---
print("Autoconfigurando items correlacionados...")
reg_items = config_evaluacion['regular_items']
item_map = {item['name']: item for item in reg_items}

# No simple loop property update here because we handle deep chains in optimization


Autoconfigurando items correlacionados...


In [37]:
# 2. GENERACIÓN DE DATOS (Por Tipo)

def generar_distribucion_bimodal(n, low_center=1.5, high_center=4.5, ratio=0.5, sigma=0.6):
    n_high = int(n * ratio)
    n_low = n - n_high
    h = np.random.normal(high_center, sigma, n_high)
    l = np.random.normal(low_center, sigma, n_low)
    return np.clip(np.concatenate([h, l]), 0, 5)

def generar_distribucion_negative_skew(n, mode=4.5, sigma=1.0):
    return np.clip(mode - np.random.exponential(scale=0.8, size=n), 0, 5)

def generar_distribucion_normal(n, mean=3.0, sigma=1.0):
    return np.clip(np.random.normal(mean, sigma, n), 0, 5)

def generar_distribucion_exam(n, mean=2.8, sigma=1.1):
    return np.clip(np.random.normal(mean, sigma, n), 0, 5)

def generate_data(config, N=2000):
    data = {}
    
    def generate_column_by_type(dtype):
        if dtype == 'filter' or dtype == 'filtered': return generar_distribucion_bimodal(N, 1.2, 4.0, 0.4)
        elif dtype == 'easy': return generar_distribucion_negative_skew(N, mode=4.5)
        elif dtype == 'exam': return generar_distribucion_exam(N, mean=2.8, sigma=1.1)
        else: return generar_distribucion_normal(N, mean=3.0, sigma=1.0)

    for item in config['regular_items']:
        data[item['name']] = generate_column_by_type(item.get('type', 'advanced'))
    def_item = config['definitory_item']
    data[def_item['name']] = generate_column_by_type(def_item.get('type', 'advanced'))
    
    df = pd.DataFrame(data).sample(frac=1, random_state=42).reset_index(drop=True)
    return df.round(2)

df_notas = generate_data(config_evaluacion)
print(f"Datos generados: {df_notas.shape}")


Datos generados: (2000, 7)


In [38]:
# 3. FUNCIONES AUXILIARES DE PESOS
def get_independent_indices(config):
    return [i for i, item in enumerate(config['regular_items']) if 'correlated_with' not in item]

def reconstruct_full_weights(x_independent, config):
    items = config['regular_items']
    indep_indices = get_independent_indices(config)
    weight_map = {}
    full_weights = np.zeros(len(items))
    
    for k, idx in enumerate(indep_indices):
        val = x_independent[k]
        weight_map[items[idx]['name']] = val
        full_weights[idx] = val
       
    # Loop 3 times for dependencies (safe for chains <= 3 deep)
    for _ in range(3):
        for i, item in enumerate(items):
            if full_weights[i] == 0.0 and 'correlated_with' in item:
                parent = item['correlated_with']
                if parent in weight_map:
                    val = weight_map[parent] * item['correlation_factor']
                    full_weights[i] = val
                    weight_map[item['name']] = val
    return full_weights


In [39]:
# 4. ORÁCULO AVANZADO (Reglas Especiales 3.0 - 3.5)

def get_oracle_decisions_general(weights_regular, df, config):
    # --- BASIC ORACLE (Fallback) ---
    reg_items = config['regular_items']
    def_item = config['definitory_item']
    
    w_sum_reg = np.sum(weights_regular)
    reg_names = [item['name'] for item in reg_items]
    def_name = def_item['name']
    
    reg_scores = df[reg_names].values
    weights_regular = np.array(weights_regular)
    score_regular = np.sum(reg_scores * weights_regular, axis=1) / (w_sum_reg + 1e-9)
    score_def = df[def_name].values
    
    decisions = np.ones(len(df), dtype=int)
    
    # 1. Sudden Death
    decisions[score_def < def_item['sudden_death']] = 0
    
    # 2. Weakness
    weakness_thresholds = np.array([item['weakness'] for item in reg_items])
    agg_weakness = np.sum(weakness_thresholds * weights_regular) / (w_sum_reg + 1e-9)
    
    mask_weak = (score_def >= def_item['sudden_death']) & (score_def < 3.0) & (score_regular < agg_weakness)
    decisions[mask_weak] = 0
    
    return decisions, score_regular, score_def

def get_oracle_decisions_advanced(weights_regular, df, config):
    # 1. Get Basic Decisions & Scores
    decisions, score_reg, score_exam = get_oracle_decisions_general(weights_regular, df, config)
    
    # 2. Calculate "Promedio Normal" (Weighted Average)
    w_sum_reg = np.sum(weights_regular)
    w_final = 1.0 - w_sum_reg
    
    avg_normal = (score_reg * w_sum_reg) + (score_exam * w_final)
    
    # 3. Zone of Doubt: 3.0 <= Avg <= 3.5
    mask_zone = (avg_normal >= 3.0) & (avg_normal <= 3.5)
    
    if not mask_zone.any():
        return decisions
        
    # Apply Advanced Logic to Masked Rows
    df_zone = df[mask_zone]
    exam_grades = score_exam[mask_zone]
    
    # Identify Advanced/Filter items
    reg_items = config['regular_items']
    adv_names = [i['name'] for i in reg_items if i.get('type') in ['advanced']]
    crit_names = [i['name'] for i in reg_items if i.get('type') in ['advanced', 'filter', 'filtered']]
    
    grad_zone_adv = df_zone[adv_names].values
    grad_zone_crit = df_zone[crit_names].values
    
    # New Decisions for Zone
    zone_decisions = decisions[mask_zone].copy()
    
    # A) Exam < 2.0 -> Lose
    mask_exam_fail = exam_grades < 2.0
    zone_decisions[mask_exam_fail] = 0
    
    # B) 2.0 <= Exam < 3.0
    mask_exam_mid = (exam_grades >= 2.0) & (exam_grades < 3.0)
    # Win if ALL advanced >= 3.0
    all_adv_win = (grad_zone_adv >= 3.0).all(axis=1)
    zone_decisions[mask_exam_mid] = 0 # Default fail
    zone_decisions[mask_exam_mid & all_adv_win] = 1
    
    # C) Exam >= 3.0
    mask_exam_pass = exam_grades >= 3.0
    # Win if ALL (advanced OR filter) >= 3.0
    all_crit_win = (grad_zone_crit >= 3.0).all(axis=1)
    
    # Lose if >= 2 lost
    count_lost = (grad_zone_crit < 3.0).sum(axis=1)
    mask_lost_2 = count_lost >= 2
    
    # Logic:
    # i. All won -> Pass (1)
    # ii. >=2 lost -> Fail (0)
    # iii. Else (1 lost) -> Inherit/Pass?
    # Assuming Pass for 1 lost unless overridden by Basic (but Basic might be 1).
    zone_decisions[mask_exam_pass] = 1 
    zone_decisions[mask_exam_pass & mask_lost_2] = 0
    
    # Enforce Win for "All Win"
    zone_decisions[mask_exam_pass & all_crit_win] = 1
    
    # Apply back
    decisions[mask_zone] = zone_decisions
    
    return decisions


In [40]:
# 5. CÁLCULO DE NOTAS (Strict)
def calculate_grade_strict_general(weights_regular, df, config):
    reg_items = config['regular_items']
    def_item = config['definitory_item']
    
    w_sum_reg = np.sum(weights_regular)
    w_def = 1.0 - w_sum_reg
    all_weights = np.concatenate([weights_regular, [w_def]])
    
    reg_names = [item['name'] for item in reg_items]
    def_name = def_item['name']
    all_names = reg_names + [def_name]
    
    grades_matrix = df[all_names].values
    
    # Extract Thresholds Vector
    thresholds = [item.get('threshold', 2.5) for item in reg_items]
    thresholds.append(def_item.get('threshold', 2.5))
    T_vector = np.array(thresholds)
    
    # U-Factors
    ratios = np.clip(grades_matrix / T_vector, 0, 1)
    n_items = len(all_names)
    u_factors = np.zeros_like(grades_matrix)
    for i in range(n_items):
        mask = np.ones(n_items, dtype=bool)
        mask[i] = False
        u_factors[:, i] = np.prod(ratios[:, mask], axis=1)
        
    final_strict = np.sum(grades_matrix * all_weights * u_factors, axis=1)
    final_raw = np.sum(grades_matrix * all_weights, axis=1)
    
    mask_fail = final_raw < 3.0
    return np.where(mask_fail, 0.8 * final_raw, final_strict)


In [41]:
# 6. OPTIMIZACIÓN (Usando Oráculo Avanzado)

def objective_function(x_independent, df, config):
    weights_regular = reconstruct_full_weights(x_independent, config)
    y_true = get_oracle_decisions_advanced(weights_regular, df, config) 
    y_pred = calculate_grade_strict_general(weights_regular, df, config)
    
    pass_th = 2.95
    N = len(y_true)
    
    mse_fn = np.sum(np.maximum(0, pass_th - y_pred[y_true==1])**2) / N
    mse_fp = np.sum(np.maximum(0, y_pred[y_true==0] - pass_th)**2) / N
    
    fpr = ((y_true == 0) & (y_pred >= 2.95)).sum() / (((y_true == 0).sum()) + 1e-9)
    fnr = ((y_true == 1) & (y_pred < 2.95)).sum() / (((y_true == 1).sum()) + 1e-9)
    return mse_fn + mse_fp + fpr + (5.0 * fnr)

# Vars
indep_indices = get_independent_indices(config_evaluacion)
indep_items = [config_evaluacion['regular_items'][i] for i in indep_indices]
x0 = np.array([i['suggested'] for i in indep_items])
bounds = Bounds([i['min'] for i in indep_items], [i['max'] for i in indep_items])

# --- BUG FIX: Recursive Dependency Resolver ---
reg_items = config_evaluacion['regular_items']
item_map = {i['name']: i for i in reg_items}
name_to_idx = {item['name']: i for i, item in enumerate(indep_items)}

def resolve_root(name, current_factor=1.0):
    item = item_map[name]
    if 'correlated_with' not in item:
        # It's an independent root (or at least supposed to be)
        return name, current_factor
    else:
        # Go deeper
        parent = item['correlated_with']
        factor = item['correlation_factor']
        return resolve_root(parent, current_factor * factor)

coeffs = np.zeros(len(indep_items))

print("Configurando restricciones (resolviendo cadenas de dependencia)...")
for item in reg_items:
    root_name, total_factor = resolve_root(item['name'])
    
    if root_name in name_to_idx:
        idx = name_to_idx[root_name]
        coeffs[idx] += total_factor
        print(f"  + {item['name']} -> {root_name} (x{total_factor:.2f})")
    else:
        print(f"  [!] Warning: Raíz {root_name} no es variable independiente conocida.")

linear_constraint = LinearConstraint(coeffs, 1.0 - config_evaluacion['definitory_item']['max_weight'], 0.99)

print("Optimizando modelo...")
res = minimize(
    objective_function, x0, args=(df_notas, config_evaluacion),
    method='trust-constr', bounds=bounds, constraints=[linear_constraint]
)

print(f"Success: {res.success}")
opt_weights_full = reconstruct_full_weights(res.x, config_evaluacion)
opt_final_weight = 1.0 - np.sum(opt_weights_full)


Configurando restricciones (resolviendo cadenas de dependencia)...
  + Quiz_1 -> Tarea_1 (x0.50)
  + Tarea_1 -> Tarea_1 (x1.00)
  + Quiz_2 -> Tarea_1 (x0.60)
  + Tarea_2 -> Tarea_1 (x1.20)
  + Quiz_3 -> Tarea_1 (x0.50)
  + Tarea_3 -> Tarea_1 (x1.00)
Optimizando modelo...


  self.H.update(self.x - self.x_prev, self.g - self.g_prev)


Success: True


In [42]:
# 7. COMPARATIVA FINAL
import copy
config_opt = copy.deepcopy(config_evaluacion)
for i, item in enumerate(config_opt['regular_items']): item['weight'] = float(opt_weights_full[i])
config_opt['definitory_item']['weight'] = float(opt_final_weight)

def report(conf, df, tit):
    # Quick helper to get weights since 'weight' key is now in items
    mp = {}
    items=conf['regular_items']
    for i in items:
        if 'weight' in i: mp[i['name']]=i['weight']
        else: mp[i['name']]=0.1
    for _ in range(3):
        for i in items:
            if 'weight' not in i and 'correlated_with' in i:
                mp[i['name']] = mp[i['correlated_with']]*i['correlation_factor']
    wr = np.array([mp[i['name']] for i in items])
    
    y_t = get_oracle_decisions_advanced(wr, df, conf)
    y_p = calculate_grade_strict_general(wr, df, conf)
    
    fpr = ((y_t==0)&(y_p>=2.95)).sum() / ((y_t==0).sum()+1e-9)
    fnr = ((y_t==1)&(y_p<2.95)).sum() / ((y_t==1).sum()+1e-9)
    loss = fpr + 5*fnr 
    print(f"[{tit}] FPR: {fpr:.2%} | FNR: {fnr:.2%} | LossProx: {loss:.4f}")

report(config_evaluacion, df_notas, "ORIGINAL")
report(config_opt, df_notas, "OPTIMIZADO")


[ORIGINAL] FPR: 1.13% | FNR: 73.86% | LossProx: 3.7041
[OPTIMIZADO] FPR: 1.63% | FNR: 73.39% | LossProx: 3.6856
