In [1]:
import os
import time
import math
import itertools
import yaml
import numpy as np
import torch
import torch.optim as optim
import matplotlib.pyplot as plt
from scipy.optimize import minimize, Bounds
from hypernet_MLP import Hypernet_MLP
from hypernet_trans import Hypernet_trans

In [2]:
def set_seed(seed=702):
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)

set_seed(702)

In [3]:
case = case = "_Ex_7_2"

# Const

In [4]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device = torch.device("cpu")
print(f"Using device: {device}")

Using device: cpu


In [5]:
GROUND_TRUTH_FILE = f"../4_Pareto_front/test/{case}/pf_dynamic_true.npy" 

if os.path.exists(GROUND_TRUTH_FILE):
    pf_true = np.load(GROUND_TRUTH_FILE)
    print(f"‚úÖ ƒê√£ t·∫£i ground truth Pareto front t·ª´: {GROUND_TRUTH_FILE}, Shape: {pf_true.shape}")
else:
    print(f"‚ùå KH√îNG t√¨m th·∫•y file ground truth t·∫°i: {GROUND_TRUTH_FILE}")

‚úÖ ƒê√£ t·∫£i ground truth Pareto front t·ª´: ../4_Pareto_front/test/_Ex_7_2/pf_dynamic_true.npy, Shape: (20, 2)


# Def

In [6]:
import autograd.numpy as np
from scipy.optimize import minimize            
        
class Projection:
    def __init__(self, cons, bounds, dim, proj_type='euclid'):
        self.cons = cons
        self.bounds = bounds
        self.dim = dim
        self.proj_type = proj_type
        
        if self.proj_type == 'qplus':
            self.objective_func = self._obj_positive_diff
        elif self.proj_type == 'euclid':
            self.objective_func = self._obj_l2_norm
        else:
            print(f"Ph√©p chi·∫øu {self.objective_func} kh√¥ng c√†i ƒë·∫∑t, ch·ªçn 'qplus' ho·∫∑c 'euclid'")

    def _obj_l2_norm(self, x, y):
        return np.sqrt(np.sum((x - y)**2))
    
    def _obj_positive_diff(self, x, y):
        v = np.maximum(y - x, 0) 
        return np.sum(v**2)

    def project(self, target_point):
        init_point = np.random.rand(1, self.dim).tolist()[0]
        
        res = minimize(
            self.objective_func,
            init_point,
            args=(target_point, ),
            constraints=self.cons,
            bounds=self.bounds,
            options={'disp': False}
        )
        
        optim_point = res.x
        
        if self.proj_type == 'qplus':
            return target_point - np.maximum(target_point - optim_point, 0)
        else:
            return optim_point

class Problem():
    def __init__(self, f, dim_x, dim_y, proj_C, proj_Qplus):
        self.f = f
        self.dim_x = dim_x
        self.dim_y = dim_y
        self.proj_C = proj_C
        self.proj_Qplus = proj_Qplus
    
    def objective_func(self, x):
        vals = [func(x) for func in self.f]
        return np.concatenate(vals)  

In [7]:
def f1(x):    return (x[0]**2 + x[1]**2)/50
def f2(x):    return ((x[0] - 5)**2 + (x[1] - 5)**2)/50
def f(x):    return np.array([
    (x[0]**2 + x[1]**2)/50,
    ((x[0] - 5)**2 + (x[1] - 5)**2)/50])
#--------------- C --------------------#
bounds_x = Bounds([0,0],[5, 5])

#--------------- Q --------------------#
def q1(y):    return 0.2**2 - (y[0] - 0.4)**2 - (y[1] - 0.4)**2

def q_plus(y):
    center = 0.4
    radius_sq = 0.2**2  
    dx = np.maximum(0, y[0] - center)
    dy = np.maximum(0, y[1] - center)
    return radius_sq - (dx**2 + dy**2)
# H√†m d√πng cho Projection 
cons_C = ()
dim_x = 2
cons_Q = ({'type': 'ineq', 'fun' : q1,},)
cons_Qplus = ({'type': 'ineq', 'fun': q_plus},)
dim_y = 2
# Setup Projections
proj_C_handler = Projection(cons=cons_C, bounds=bounds_x, dim=dim_x, proj_type='euclid')
proj_Q_handler = Projection(cons=cons_Q, bounds=None, dim=dim_y, proj_type='qplus')
# Setup Problem
prob = Problem(
    f=[f1, f2], 
    dim_x=dim_x, dim_y=dim_y,
    proj_C=proj_C_handler.project,
    proj_Qplus=proj_Q_handler.project
)
z_star = np.array([0.0, 0.0])
x_init = np.array([-10.0, -10.0])

In [8]:
def evaluate_objectives_single(functions, x_tensor):
    vals = []
    for func in functions:
        val = func(x_tensor)
        if not torch.is_tensor(val): val = torch.tensor(val, dtype=torch.float32, device=x_tensor.device)
        vals.append(val)
    return torch.stack(vals).reshape(-1)

def calculate_mse_igd(pf_pred, pf_true):
    if len(pf_pred) == 0: return np.inf
    total_dist_sq = 0
    # V·ªõi m·ªói ƒëi·ªÉm ground truth, t√¨m ƒëi·ªÉm d·ª± ƒëo√°n g·∫ßn nh·∫•t
    for p_true in pf_true:
        dists_sq = np.sum((pf_pred - p_true)**2, axis=1)
        total_dist_sq += np.min(dists_sq)
    return total_dist_sq / len(pf_true)

def calculate_mse(pf_pred, pf_true):
    pf_pred_ = np.array(pf_pred)
    pf_true_ = np.array(pf_true)
    
    if pf_pred_.shape != pf_true_.shape:
        print(f"‚ö†Ô∏è Warning: Shape mismatch {pf_pred_.shape} vs {pf_true_.shape}. MSE c√≥ th·ªÉ kh√¥ng ch√≠nh x√°c.")
        return np.inf

    return np.mean((pf_pred_ - pf_true_)**2)

In [9]:
def train_hypernet(hypernet, prob, z_star, 
                   num_epochs=1000, lr=1e-3, num_partitions=50, 
                   lr_step_size=300, lr_gamma=0.5,
                   init_pen_C=1.0, max_pen_C=1000.0, pen_C_growth=1.01,
                   init_pen_Q=50.0, min_pen_Q=1.0, pen_Q_decay=0.995,
                   verbose=False):
    
    hypernet = hypernet.to(device)
    optimizer = optim.Adam(hypernet.parameters(), lr=lr)
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=lr_step_size, gamma=lr_gamma)
    z_star_tensor = torch.tensor(z_star, dtype=torch.float32, device=device).view(1, -1)
    
    current_pen_C = init_pen_C
    current_pen_Q = init_pen_Q
    angle_step = (math.pi / 2) / num_partitions
    
    for epoch in range(num_epochs):
        hypernet.train()
        optimizer.zero_grad()
        
        # 1. Stratified Sampling
        starts = torch.arange(num_partitions) * angle_step
        noise = torch.rand(num_partitions) * angle_step
        thetas = starts + noise
        r_batch_np = np.stack([np.cos(thetas.numpy()), np.sin(thetas.numpy())], axis=1)
        r_tensor_batch = torch.tensor(r_batch_np, dtype=torch.float32, device=device)
        
        # 2. Forward & Loss Calculation (Loop safe logic)
        loss_C_list = []
        loss_Q_list = []
        y_pred_list = []
        
        # Forward pass batch (Nhanh h∆°n g·ªçi t·ª´ng c√°i)
        x_vec_batch = hypernet(r_tensor_batch) # (Batch, 2)
        x_np_batch = x_vec_batch.detach().cpu().numpy()
        
        for i in range(num_partitions):
            x_i_tensor = x_vec_batch[i]
            x_i_np = x_np_batch[i]
            
            # Loss C
            x_proj_i_np = prob.proj_C(x_i_np)
            x_proj_i_tensor = torch.tensor(x_proj_i_np, dtype=torch.float32, device=device)
            loss_C_list.append(torch.sum((x_i_tensor - x_proj_i_tensor)**2))
            
            # F(x)
            y_pred_i = evaluate_objectives_single(prob.f, x_i_tensor)
            y_pred_list.append(y_pred_i)
            
            # Loss Q
            y_i_np = y_pred_i.detach().cpu().numpy()
            y_proj_i_np = prob.proj_Qplus(y_i_np)
            y_proj_i_tensor = torch.tensor(y_proj_i_np, dtype=torch.float32, device=device)
            loss_Q_list.append(torch.sum((y_pred_i - y_proj_i_tensor)**2))
            
        y_pred_batch = torch.stack(y_pred_list)
        loss_C = torch.mean(torch.stack(loss_C_list))
        loss_Q = torch.mean(torch.stack(loss_Q_list))
        
        # Chebyshev Loss
        diff = y_pred_batch - z_star_tensor
        weighted_diff = r_tensor_batch * diff
        max_vals, _ = torch.max(weighted_diff, dim=1)
        loss_obj = torch.mean(max_vals)
        
        total_loss = loss_obj + (current_pen_C * loss_C) + (current_pen_Q * loss_Q)
        
        total_loss.backward()
        optimizer.step()
        scheduler.step()
        
        # Update Penalties
        current_pen_C = min(max_pen_C, current_pen_C * pen_C_growth)
        current_pen_Q = max(min_pen_Q, current_pen_Q * pen_Q_decay)
        
        if verbose and epoch % (num_epochs // 5) == 0:
            print(f"  Ep {epoch}: Loss={total_loss.item():.2f} (Obj={loss_obj.item():.2f}, C={loss_C.item():.4f}, Q={loss_C.item():.4f})")
            
    return hypernet

In [10]:
def evaluate_model(hypernet, prob, test_rays, pf_true, calculate_metric):
    hypernet.eval()
    pf_pred = []
    
    # Chuy·ªÉn test_rays sang tensor batch ƒë·ªÉ infer nhanh
    rays_tensor = torch.tensor(test_rays, dtype=torch.float32, device=device)
    
    with torch.no_grad():
        x_raw = hypernet(rays_tensor) # (N, 2)
        x_raw_np = x_raw.cpu().numpy()
        
        # Loop project and eval
        for i in range(len(x_raw_np)):
            # Chi·∫øu l√™n C (Quan tr·ªçng ƒë·ªÉ feasible)
            x_proj = prob.proj_C(x_raw_np[i])
            # T√≠nh f
            val = [func(x_proj) for func in prob.f]
            pf_pred.append(val)
            
    pf_pred = np.array(pf_pred)
    igd_score = calculate_metric(pf_pred, pf_true)
    return igd_score, pf_pred

# Config

In [11]:
param_grid = {
    'lr': [1e-3],
    'num_epochs': [500, 800],
    'pen_C_growth': [1.01],      # TƒÉng ph·∫°t C nhanh hay ch·∫≠m
    'pen_Q_decay': [0.99, 0.999],      
    'init_pen_C': [10.0],
    'init_pen_Q': [20.0, 50.0, 100.0],
    'max_pen_C': [500.0],
    'min_pen_Q': [10.0, 20.0, 50.0]
}

config_path='../4_Pareto_front/config.yaml'
with open(config_path, 'r') as f:
    cfg = yaml.safe_load(f)
test_rays = np.array(cfg['data']['test_ray'])

# Run

## Ch·∫°y n·ªët MLP

In [12]:
models = ["trans", "MLP"]
models = ["MLP"]
mode_tests = ["MSE"]

In [13]:
results = []
best_scores_tracker = {}
save_dir = f"model/{case}"

In [14]:
for mode_test in mode_tests:
    if mode_test == "MSE":
        calculate_metric = calculate_mse
        metric_label = "MSE"
    else:
        calculate_metric = calculate_mse_igd
        metric_label = "IGD"
        
    print(f"\n{'='*40}")
    print(f"üöÄ B·∫ÆT ƒê·∫¶U TEST V·ªöI METRIC: {mode_test}")
    print(f"{'='*40}")

    for model_name in models:
        print(f"\nüîπ ƒêang train Model: {model_name} | Metric t·ªëi ∆∞u: {metric_label}")
        
        # Reset tracker cho c·∫∑p (model, metric) n√†y
        current_best_score = float('inf')
        current_best_config = None
        
        # T·∫°o l∆∞·ªõi tham s·ªë
        keys, values = zip(*param_grid.items())
        param_combinations = [dict(zip(keys, v)) for v in itertools.product(*values)]
        
        for idx, params in enumerate(param_combinations):
            print(f"\n   >>> Config {idx+1}/{len(param_combinations)}: {params}")
            
            # 1. Init Model
            if model_name == "MLP":
                # ƒê·∫£m b·∫£o dim_x ƒë√£ ƒë∆∞·ª£c ƒë·ªãnh nghƒ©a t·ª´ tr∆∞·ªõc
                model = Hypernet_MLP(ray_hidden_dim=32, out_dim=dim_x, n_tasks=2)
            else:
                model = Hypernet_trans(ray_hidden_dim=32, out_dim=dim_x, n_tasks=2)
            
            # ƒê∆∞a model l√™n device (n·∫øu ch∆∞a x·ª≠ l√Ω trong init/train)
            # model = model.to(device) 

            # 2. Train
            start_time = time.time()
            # L∆∞u √Ω: H√†m train_hypernet c·∫ßn ƒë∆∞·ª£c ƒë·ªãnh nghƒ©a ƒë√∫ng c√°c tham s·ªë truy·ªÅn v√†o
            trained_model = train_hypernet(
                model, prob, z_star, 
                num_epochs=params['num_epochs'],
                lr=params['lr'],
                num_partitions=20, 
                init_pen_C=params['init_pen_C'],
                pen_C_growth=params['pen_C_growth'],
                init_pen_Q=params['init_pen_Q'],
                pen_Q_decay=params['pen_Q_decay'],
                min_pen_Q=params['min_pen_Q'],
                verbose=False # T·∫Øt verbose chi ti·∫øt ƒë·ªÉ log g·ªçn h∆°n
            )
            train_time = time.time() - start_time

            # 3. Evaluate
            # L∆∞u √Ω: pf_true c·∫ßn ƒë∆∞·ª£c ƒë·ªãnh nghƒ©a t·ª´ tr∆∞·ªõc
            score, pf_pred = evaluate_model(trained_model, prob, test_rays, pf_true, calculate_metric)

            print(f"      ‚è±Ô∏è Time: {train_time:.2f}s | üìâ {metric_label}: {score:.4f}")

            # 4. Save result to List
            res = {
                'model_type': model_name,
                'metric_type': mode_test,
                'config_id': idx,
                'params': params,
                'score': score,
                'time': train_time
            }
            results.append(res)

            # 5. Update Best Model cho C·∫∑p (Model, Metric) hi·ªán t·∫°i
            if score < current_best_score:
                current_best_score = score
                current_best_config = params
                
                save_path = f"{save_dir}/best_hnet_{model_name}_{mode_test}.pth"
                torch.save(trained_model.state_dict(), save_path)
                print(f"      üèÜ New Best Found! Saved to: {save_path}")

        print(f"\n‚úÖ Ho√†n th√†nh {model_name} - {mode_test}. Best Score: {current_best_score:.4f}")
        print(f"   Best Config: {current_best_config}")



üöÄ B·∫ÆT ƒê·∫¶U TEST V·ªöI METRIC: MSE

üîπ ƒêang train Model: MLP | Metric t·ªëi ∆∞u: MSE

   >>> Config 1/36: {'lr': 0.001, 'num_epochs': 500, 'pen_C_growth': 1.01, 'pen_Q_decay': 0.99, 'init_pen_C': 10.0, 'init_pen_Q': 20.0, 'max_pen_C': 500.0, 'min_pen_Q': 10.0}


  res = minimize(


      ‚è±Ô∏è Time: 58.09s | üìâ MSE: 0.0005
      üèÜ New Best Found! Saved to: model/_Ex_7_2/best_hnet_MLP_MSE.pth

   >>> Config 2/36: {'lr': 0.001, 'num_epochs': 500, 'pen_C_growth': 1.01, 'pen_Q_decay': 0.99, 'init_pen_C': 10.0, 'init_pen_Q': 20.0, 'max_pen_C': 500.0, 'min_pen_Q': 20.0}
      ‚è±Ô∏è Time: 56.21s | üìâ MSE: 0.0007

   >>> Config 3/36: {'lr': 0.001, 'num_epochs': 500, 'pen_C_growth': 1.01, 'pen_Q_decay': 0.99, 'init_pen_C': 10.0, 'init_pen_Q': 20.0, 'max_pen_C': 500.0, 'min_pen_Q': 50.0}
      ‚è±Ô∏è Time: 56.58s | üìâ MSE: 0.0006

   >>> Config 4/36: {'lr': 0.001, 'num_epochs': 500, 'pen_C_growth': 1.01, 'pen_Q_decay': 0.99, 'init_pen_C': 10.0, 'init_pen_Q': 50.0, 'max_pen_C': 500.0, 'min_pen_Q': 10.0}
      ‚è±Ô∏è Time: 57.12s | üìâ MSE: 0.0010

   >>> Config 5/36: {'lr': 0.001, 'num_epochs': 500, 'pen_C_growth': 1.01, 'pen_Q_decay': 0.99, 'init_pen_C': 10.0, 'init_pen_Q': 50.0, 'max_pen_C': 500.0, 'min_pen_Q': 20.0}
      ‚è±Ô∏è Time: 56.70s | üìâ MSE: 0.002

In [40]:
df_results = pd.DataFrame(results)
print("\n=== T·ªîNG H·ª¢P K·∫æT QU·∫¢ ===")
df_results.sort_values(by=['metric_type', 'model_type', 'score'])


=== T·ªîNG H·ª¢P K·∫æT QU·∫¢ ===


Unnamed: 0,model_type,metric_type,config_id,params,score,time
129,MLP,MSE,21,"{'lr': 0.001, 'num_epochs': 500, 'pen_C_growth...",0.000978,24.960053
134,MLP,MSE,26,"{'lr': 0.001, 'num_epochs': 500, 'pen_C_growth...",0.002487,26.212012
173,MLP,MSE,65,"{'lr': 0.0001, 'num_epochs': 500, 'pen_C_growt...",0.003188,27.223085
159,MLP,MSE,51,"{'lr': 0.001, 'num_epochs': 800, 'pen_C_growth...",0.003311,41.300932
155,MLP,MSE,47,"{'lr': 0.001, 'num_epochs': 800, 'pen_C_growth...",0.003603,42.033796
...,...,...,...,...,...,...
216,trans,MSE_IGD,0,"{'lr': 0.001, 'num_epochs': 500, 'pen_C_growth...",0.000162,27.464283
219,trans,MSE_IGD,3,"{'lr': 0.001, 'num_epochs': 500, 'pen_C_growth...",0.000169,28.853848
220,trans,MSE_IGD,4,"{'lr': 0.001, 'num_epochs': 500, 'pen_C_growth...",0.000193,28.213165
222,trans,MSE_IGD,6,"{'lr': 0.001, 'num_epochs': 500, 'pen_C_growth...",0.001464,26.028553


# Viz

In [41]:
print("\n" + "="*40)
print(f"üèÜ BEST CONFIGURATION FOUND (IGD={best_igd:.4f})")
print("="*40)
for k, v in best_config.items():
    print(f"{k}: {v}")

# S·∫Øp x·∫øp k·∫øt qu·∫£ theo IGD
sorted_results = sorted(results, key=lambda x: x['igd'])
print("\nTop 5 Configs:")
for i in range(min(5, len(sorted_results))):
    r = sorted_results[i]
    print(f"Rank {i+1}: IGD={r['igd']:.4f} | Params={r['params']}")

# --- Plotting ---
plt.figure(figsize=(10, 6))

# Plot Ground Truth
if pf_true is not None:
    plt.scatter(pf_true[:, 0], pf_true[:, 1], c='gray', alpha=0.5, label='Ground Truth', s=20)

# Plot Best Prediction
if best_pf_pred is not None:
    plt.scatter(best_pf_pred[:, 0], best_pf_pred[:, 1], c='red', marker='x', label='Best Prediction', s=40)

# Plot Ideal Point
plt.scatter(z_star[0], z_star[1], c='green', marker='*', s=150, label='Ideal Point (z*)')

plt.xlabel('f1')
plt.ylabel('f2')
plt.title(f'Pareto Front Approximation (Best IGD: {best_igd:.4f})')
plt.legend()
plt.grid(True, linestyle='--', alpha=0.6)
plt.show()


üèÜ BEST CONFIGURATION FOUND (IGD=inf)


AttributeError: 'NoneType' object has no attribute 'items'