In [None]:
!pip install evaluate

In [None]:
import torch
import gc
import random
import numpy as np
import pandas as pd
import evaluate
from typing import Dict, Any, List, Optional, Tuple
from dataclasses import dataclass, asdict

# Hugging Face Libraries
from datasets import load_dataset, DatasetDict
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
    DataCollatorWithPadding,
    EvalPrediction
)
from peft import LoraConfig, TaskType, get_peft_model

In [None]:
SEED = 42
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# Hyperparameter Search Space -> discrete sets
LR_MIN, LR_MAX = 1e-5, 2e-4
WARMUP_OPTIONS = [0.0, 0.06, 0.1]
RANK_OPTIONS = [2, 4, 8, 16, 24]
ALPHA_OPTIONS = [8, 16, 32, 64, 96]
DROPOUT_OPTIONS = [0.0, 0.05, 0.1, 0.2]
TARGET_MODULE_OPTIONS = [
    ["q_lin", "v_lin"],
    ["q_lin", "v_lin", "ffn.lin1", "ffn.lin2"]
]  # -> binary choice

# Search bounds -> indices for discrete, actual values for continuous params
MIN_BOUNDS = [LR_MIN, 0, 0, 0, 0, 0]
MAX_BOUNDS = [
    LR_MAX,
    len(WARMUP_OPTIONS) - 0.01,
    len(RANK_OPTIONS) - 0.01,
    len(ALPHA_OPTIONS) - 0.01,
    len(DROPOUT_OPTIONS) - 0.01,
    len(TARGET_MODULE_OPTIONS) - 0.01
]

In [None]:
# Data Loading
dataset = load_dataset('dair-ai/emotion')

train_dataset = dataset['train'].shuffle(seed=SEED).select(range(3000)) # recommendation from cw brief to reduce compute time
val_dataset = dataset['validation']

In [None]:
# Tokenization
tokenizer = AutoTokenizer.from_pretrained("distilbert-base-uncased")

def tokenize_func(examples):
  return tokenizer(
      examples['text'],
      truncation=True,
      padding=True,
      max_length=128
  )

tokenized_train = train_dataset.map(tokenize_func, batched=True)
tokenized_val = val_dataset.map(tokenize_func, batched=True)

In [None]:
# Helper functions
def set_global_seed(seed: int):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)


def cleanup_memory():
    """Forcefully releases GPU memory"""
    torch.cuda.empty_cache()
    gc.collect()


def find_nearest(value, options):
    return min(options, key=lambda x: abs(x - value)) # find nearest value from discrete set


def random_individual() -> List[float]: # -> generate one vector of random (index based) hyperparams from options
    return [
        random.uniform(LR_MIN, LR_MAX), # LR -> continuous
        random.uniform(0, len(WARMUP_OPTIONS) - 0.01), # warmup index
        random.uniform(0, len(RANK_OPTIONS) - 0.01), # rank index
        random.uniform(0, len(ALPHA_OPTIONS) - 0.01), # alpha index
        random.uniform(0, len(DROPOUT_OPTIONS) - 0.01), # dropout index
        random.uniform(0, len(TARGET_MODULE_OPTIONS) - 0.01) # modules index
    ]


# takes in an individual and repairs it -> need to change name later
def repair_pop_list(pop_list: list) -> list:
    """Repair bounds and snap to valid discrete values"""
    repaired = []
    
    # LR - continuous, just clip
    repaired.append(float(np.clip(pop_list[0], MIN_BOUNDS[0], MAX_BOUNDS[0])))
    
    # discrete params - clip index then map to actual value
    repaired.append(WARMUP_OPTIONS[int(np.clip(pop_list[1], MIN_BOUNDS[1], MAX_BOUNDS[1]))])
    repaired.append(RANK_OPTIONS[int(np.clip(pop_list[2], MIN_BOUNDS[2], MAX_BOUNDS[2]))])
    repaired.append(ALPHA_OPTIONS[int(np.clip(pop_list[3], MIN_BOUNDS[3], MAX_BOUNDS[3]))])
    repaired.append(DROPOUT_OPTIONS[int(np.clip(pop_list[4], MIN_BOUNDS[4], MAX_BOUNDS[4]))])
    repaired.append(int(np.clip(round(pop_list[5]), MIN_BOUNDS[5], MAX_BOUNDS[5])))
    
    return repaired

In [None]:
# SHADE Settings
SHADE_POPULATION_SIZE = 20
MAX_GENERATIONS = 4
MEMORY_SIZE = 20       # H parameter (matching pop_size)
ARCHIVE_RATE = 1.0     # Archive size = pop_size × archive_rate
P_BEST_RATE = 0.4      # Top 40% for pbest selection

''' Collect all individuals -> this will be sorted and used for final training loop
                               of top 5 best solutions trained on 3 different seeds'''

In [None]:
class SHADE_HyperparameterOptimizer:
    def __init__(self):
        self.metric = evaluate.load("accuracy")

        # SHADE Parameters
        self.pop_size = SHADE_POPULATION_SIZE
        self.H = MEMORY_SIZE
        self.arc_size = int(ARCHIVE_RATE * self.pop_size)
        self.p_num = max(2, int(P_BEST_RATE * self.pop_size))  # At least 2 for pbest

        # Initialize Memory (Historical CR and F values)
        self.M_cr = [0.5] * self.H
        self.M_f = [0.5] * self.H
        self.mem_k = 0

        # Population & Archive
        self.population = [random_individual() for _ in range(self.pop_size)]
        self.fitness = [0.0] * self.pop_size
        self.archive = []

        # Tracking
        self.nfes = 0  # Number of function evaluations
        self.results = []
        self.best_solution = None
        self.best_fitness = -float('inf')
        
        # Result Collection
        self.all_individuals = {}
        self.final_results = [] 


    def _compute_metrics(self, eval_pred: EvalPrediction):
        preds, labels = eval_pred
        preds = np.argmax(preds, axis=1)
        return self.metric.compute(predictions=preds, references=labels)


    def evaluate_individual(self, individual: List[float], trial_id: int) -> float:
        """Train model with given hyperparameters and return validation accuracy"""
        params = repair_pop_list(individual)

        print(f"   > LR={params[0]:.2e}, Rank={params[2]}, "
              f"Alpha={params[3]}, Dropout={params[4]}")

        # Load fresh model each time
        model = AutoModelForSequenceClassification.from_pretrained(
          "distilbert-base-uncased",
          num_labels=6 # for 6 emotions
        )

        peft_config = LoraConfig(
            task_type=TaskType.SEQ_CLS,
            r=params[2],
            lora_alpha=params[3],
            lora_dropout=params[4],
            target_modules=["q_lin", "v_lin"] if params[5]==0 else ["q_lin", "v_lin", "ffn.lin1", "ffn.lin2"],
        )

        peft_model = get_peft_model(model, peft_config)
        args = TrainingArguments(
            output_dir=f"./results/trial_{trial_id}",
            learning_rate=params[0],
            per_device_train_batch_size=16,
            per_device_eval_batch_size=32,
            num_train_epochs=3,
            warmup_ratio=params[1],
            logging_steps = 100,
            weight_decay=0.01,
            eval_strategy="epoch",
            save_strategy="no",
            logging_strategy="epoch",
            seed=SEED + trial_id,
            report_to="none",
            load_best_model_at_end=False
        )

        data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

        trainer = Trainer(
            model=peft_model,
            args=args,
            train_dataset=tokenized_train,
            data_collator=data_collator,
            eval_dataset=tokenized_val,
            compute_metrics=self._compute_metrics
        )

        trainer.train()
        eval_results = trainer.evaluate()
        accuracy = eval_results["eval_accuracy"]

        # Cleanup
        del model, peft_model, trainer
        cleanup_memory()

        return accuracy
    
    
    # Reevaluate top solutions with 3 seeds to get robust accuracy estimates
    def evaluate_top_solutions_with_seeds(self, all_individuals, num_top=5, num_seeds=3):
        
        print(f"\n{'='*60}")
        print(f"EVALUATING TOP {num_top} SOLUTIONS WITH {num_seeds} DIFFERENT SEEDS")
        print(f"{'='*60}")
        
        # Sort by fitness (descending)
        sorted_solutions = sorted(all_individuals.items(), key=lambda x: x[1], reverse=True)
        
        final_results = []
        
        for rank, (individual_tuple, original_accuracy) in enumerate(sorted_solutions[:num_top], 1):
            individual = list(individual_tuple)
            params = repair_pop_list(individual)
            
            print(f"\n{'='*60}")
            print(f"RANK {rank} - Original Accuracy: {original_accuracy:.4%}")
            print(f"{'='*60}")
            print(f"Params: LR={params[0]:.2e}, Warmup={params[1]}, Rank={params[2]}, "
                f"Alpha={params[3]}, Dropout={params[4]}, Modules={params[5]}")
            print(f"\nRunning {num_seeds} evaluations with different seeds...")
            
            seed_accuracies = []
            
            for seed_run in range(num_seeds):
                # high trial_id offset to avoid collision with optimization trials
                trial_id = 10000 + (rank * 100) + seed_run
                
                # change global seed temporarily for data shuffling consistency
                current_seed = SEED + trial_id
                
                print(f"  Seed run {seed_run + 1}/{num_seeds} (seed={current_seed})...", end=" ")
                
                accuracy = self.evaluate_individual(individual, trial_id)
                seed_accuracies.append(accuracy)
                
                print(f"Accuracy: {accuracy:.4%}")
            
            # Calculate stats
            mean_acc = np.mean(seed_accuracies)
            std_acc = np.std(seed_accuracies)
            
            print(f"\n  Results: {mean_acc:.4%} ± {std_acc:.4%}")
            print(f"  Individual runs: {[f'{acc:.4%}' for acc in seed_accuracies]}")
            
            final_results.append({
                'rank': rank,
                'params': params,
                'original_accuracy': original_accuracy,
                'mean_accuracy': mean_acc,
                'std_accuracy': std_acc,
                'seed_accuracies': seed_accuracies
            })
        
        # final summary
        print(f"\n{'='*60}")
        print(f"FINAL SUMMARY - TOP {num_top} SOLUTIONS")
        print(f"{'='*60}")
        
        for result in final_results:
            modules = ["q_lin", "v_lin"] if result['params'][5]==0 else ["q_lin", "v_lin", "ffn.lin1", "ffn.lin2"]
            print(f"\n{result['rank']}. Mean Accuracy: {result['mean_accuracy']:.4%} ± {result['std_accuracy']:.4%}")
            print(f"   LR: {result['params'][0]:.2e}, Warmup: {result['params'][1]}, Rank: {result['params'][2]}")
            print(f"   Alpha: {result['params'][3]}, Dropout: {result['params'][4]}, Modules: {modules}")
            runs_str = [f"{acc:.4%}" for acc in result['seed_accuracies']]
            print(f"   Runs: {runs_str}")
        
        return final_results
    
    
    def generate_child(self, target_idx: int, sorted_indices: np.ndarray) -> Tuple[List[float], float, float]:
        """Generate one child using current-to-pbest/1 with archive mutation + crossover"""

        ri = random.randint(0, self.H - 1) # H spaces, pick a random one & account for no. indices
        mu_cr = self.M_cr[ri] # -> get mean from random point in memory
        mu_f = self.M_f[ri]

        # generate CR_i
        if mu_cr == -1:
            cr_i = 0.0
        else:
            cr_i = np.clip(np.random.normal(mu_cr, 0.1), 0, 1)

        # generate F_i
        while True:
            f_i = np.random.standard_cauchy() * 0.1 + mu_f # scale = 0.1 | location = M_f[ri]
            if f_i > 0:
                break
        f_i = min(f_i, 1.0)

        # select p-best (from top p_num individuals)
        p_best_idx = sorted_indices[random.randint(0, self.p_num - 1)]

        # select r1 (different from target)
        available = [i for i in range(self.pop_size) if i != target_idx]
        r1_idx = random.choice(available)

        # select r2 (from population + archive, different from target and r1)
        combined_pop = self.population + self.archive
        combined_size = len(combined_pop)

        r2_idx = random.randint(0, combined_size - 1)
        while (r2_idx == target_idx or
               (r2_idx < self.pop_size and r2_idx == r1_idx)):
            r2_idx = random.randint(0, combined_size - 1)

        # create the mutation using -> current-to-pbest/1
        x_i = np.array(self.population[target_idx])
        x_pbest = np.array(self.population[p_best_idx])
        x_r1 = np.array(self.population[r1_idx])
        x_r2 = np.array(combined_pop[r2_idx])

        mutant = x_i + f_i * (x_pbest - x_i) + f_i * (x_r1 - x_r2)

        # binomial crossover
        child = []
        j_rand = random.randint(0, len(x_i) - 1)  # ensures at least one change

        for j in range(len(x_i)):
            if random.random() < cr_i or j == j_rand:
                child.append(mutant[j])
            else:
                child.append(x_i[j])

        # repair bounds
        child = repair_pop_list(child)

        return child, cr_i, f_i


    def run_optimization(self):
        print(f"Starting SHADE: Pop={self.pop_size}, H={self.H}, Budget={SHADE_POPULATION_SIZE * MAX_GENERATIONS}\n")
        
        all_individuals = {}
        generation = 1

        # Initialise Population
        print("=== INITIALIZATION ===")
        for i in range(self.pop_size):
            self.nfes += 1
            print(f"[Eval {self.nfes}] Initial Individual {i+1}/{self.pop_size}")

            self.fitness[i] = self.evaluate_individual(self.population[i], self.nfes)
            all_individuals[tuple(self.population[i])] = self.fitness[i]
            
            print(f"   > Accuracy: {self.fitness[i]:.4%}")

            if self.fitness[i] > self.best_fitness:
                self.best_fitness = self.fitness[i]
                self.best_solution = self.population[i].copy()
                print(f"   >>> New Best: {self.best_fitness:.4%}")
            
            repaired_indiv = repair_pop_list(self.population[i])
            
            record = {
                "learning_rate": repaired_indiv[0],
                "warmup_ratio": repaired_indiv[1],
                "rank": repaired_indiv[2],
                "alpha": repaired_indiv[3],
                "dropout": repaired_indiv[4],
                "target_modules": ["q_lin", "v_lin"] if repaired_indiv[5]==0 else ["q_lin", "v_lin", "ffn.lin1", "ffn.lin2"]
            }
            record.update({
                "trial_id": self.nfes,
                "generation": generation,
                "accuracy": self.fitness[i]
            })
            self.results.append(record)

        # Main Loop
        while generation <= MAX_GENERATIONS:
            print(f"\n{'='*60}")
            print(f"GENERATION {generation}/{MAX_GENERATIONS}")
            print(f"{'='*60}")

            # Sort population by fitness (descending -> best first)
            sorted_indices = np.argsort(self.fitness)[::-1]

            # Generate all children
            children = []
            children_cr = []
            children_f = []
            children_fitness = []

            for i in range(self.pop_size):

                self.nfes += 1
                print(f"\n[Eval {self.nfes}] Individual {i+1}/{self.pop_size}")

                child, cr_i, f_i = self.generate_child(i, sorted_indices)
                child_fitness = self.evaluate_individual(child, self.nfes)
                
                all_individuals[tuple(child)] = child_fitness

                children.append(child)
                children_cr.append(cr_i)
                children_f.append(f_i)
                children_fitness.append(child_fitness)

                print(f"   > Child Accuracy: {child_fitness:.4%} vs Parent: {self.fitness[i]:.4%}")

                # Track results
                params = repair_pop_list(child)
                record = {
                  "learning_rate": params[0],
                  "warmup_ratio": params[1],
                  "rank": params[2],
                  "alpha": params[3],
                  "dropout": params[4],
                  "target_modules": ["q_lin", "v_lin"] if params[5]==0 else ["q_lin", "v_lin", "ffn.lin1", "ffn.lin2"]
                }
                record.update({
                    "trial_id": self.nfes,
                    "generation": generation,
                    "accuracy": child_fitness
                })
                self.results.append(record)

            # Selection and Memory Update
            S_cr, S_f, dif_fitness = [], [], []

            for i in range(len(children)):
                parent_fit = self.fitness[i]
                child_fit = children_fitness[i]

                # Selection
                if child_fit >= parent_fit:
                    # child wins or ties -> replace parent
                    if child_fit > parent_fit:
                        # archive the old parent (loser)
                        if len(self.archive) < self.arc_size:
                            self.archive.append(self.population[i].copy())
                        else:
                            # replace random archive member
                            rand_idx = random.randint(0, self.arc_size - 1)
                            self.archive[rand_idx] = self.population[i].copy()

                        # track successful parameters
                        dif_fitness.append(abs(child_fit - parent_fit))
                        S_cr.append(children_cr[i])
                        S_f.append(children_f[i])

                    # replace parent with child
                    self.population[i] = children[i]
                    self.fitness[i] = child_fit

                    # update global best
                    if child_fit > self.best_fitness:
                        self.best_fitness = child_fit
                        self.best_solution = children[i].copy()
                        print(f"NEW GLOBAL BEST: {self.best_fitness:.4%}")

            # Update Memory using -> Weighted Lehmer Mean
            if len(S_cr) > 0:
                total_improvement = sum(dif_fitness)
                weights = [df / total_improvement for df in dif_fitness]

                # Weighted Lehmer Mean for F
                f_num = sum(w * f**2 for w, f in zip(weights, S_f))
                f_den = sum(w * f for w, f in zip(weights, S_f))
                self.M_f[self.mem_k] = f_num / f_den

                # Weighted Lehmer Mean for CR
                cr_sum = sum(S_cr)
                if cr_sum == 0 or self.M_cr[self.mem_k] == -1:
                    self.M_cr[self.mem_k] = -1  # terminal value
                else:
                    cr_num = sum(w * cr**2 for w, cr in zip(weights, S_cr))
                    cr_den = sum(w * cr for w, cr in zip(weights, S_cr))
                    self.M_cr[self.mem_k] = cr_num / cr_den

                # increment memory position -> (circular)
                self.mem_k = (self.mem_k + 1) % self.H

                print(f"\nMemory updated: {len(S_cr)} improvements this generation")

            generation += 1

        print(f"\n{'='*60}")
        print(f"OPTIMIZATION COMPLETE")
        print(f"{'='*60}")
        print(f"Total Evals: {self.nfes}")
        print(f"Best Acc: {self.best_fitness:.4%}")
        
        self.all_individuals = all_individuals
        
        
    def save_top_solutions_results(self, final_results, filename: str):
        # save top solutions multi-seed evaluation results to CSV
        if not final_results:
            print("No top solutions results to save.")
            return
        
        # flatten results 
        rows = []
        for result in final_results:
            params = result['params']
            row = {
                'rank': result['rank'],
                'learning_rate': params[0],
                'warmup_ratio': params[1],
                'rank_r': params[2],
                'alpha': params[3],
                'dropout': params[4],
                'target_modules': 0 if params[5] == 0 else 1,  # Binary encoding
                'original_accuracy': result['original_accuracy'],
                'mean_accuracy': result['mean_accuracy'],
                'std_accuracy': result['std_accuracy'],
            }
            
            # individual seed runs as separate columns
            for i, acc in enumerate(result['seed_accuracies'], 1):
                row[f'seed_{i}_accuracy'] = acc
            
            rows.append(row)
        
        df = pd.DataFrame(rows)
        df.to_csv(filename, index=False)
        print(f"\nTop solutions results saved to {filename}")
        
        # Print summary
        best_mean = df.loc[df['mean_accuracy'].idxmax()]
        print("\n" + "="*60)
        print("BEST CONFIGURATION (by mean accuracy):")
        print("="*60)
        print(f"Rank: {best_mean['rank']}")
        print(f"Mean Accuracy: {best_mean['mean_accuracy']:.4%} ± {best_mean['std_accuracy']:.4%}")
        print(f"Learning Rate: {best_mean['learning_rate']:.2e}")
        print(f"Warmup Ratio: {best_mean['warmup_ratio']}")
        print(f"Rank: {best_mean['rank_r']}")
        print(f"Alpha: {best_mean['alpha']}")
        print(f"Dropout: {best_mean['dropout']}")
        print(f"Target Modules: {'attention-only' if best_mean['target_modules']==0 else 'attention+feedforward'}")
        print("="*60)
        
        
    def save_results(self, filename: str):
        if not self.results:
            print("No results to save.")
            return

        df = pd.DataFrame(self.results)
        df.to_csv(filename, index=False)
        print(f"\nResults saved to {filename}")

        # Print best result
        best_run = df.loc[df['accuracy'].idxmax()]
        print("\n" + "="*60)
        print("BEST CONFIGURATION FOUND:")
        print("="*60)
        print(f"Accuracy: {best_run['accuracy']:.4%}")
        print(f"Learning Rate: {best_run['learning_rate']:.2e}")
        print(f"Warmup Ratio: {best_run['warmup_ratio']}")
        print(f"Rank: {best_run['rank']}")
        print(f"Alpha: {best_run['alpha']}")
        print(f"Dropout: {best_run['dropout']}")
        print(f"Target Modules: {best_run['target_modules']}")
        print("="*60)

In [None]:
import time

In [None]:
if __name__ == "__main__":
    set_global_seed(SEED)
    
    try:
        # 1. Initialize SHADE Optimizer
        optimizer = SHADE_HyperparameterOptimizer()
        
        # 2. Run Optimization
        start_time = time.time()
        optimizer.run_optimization()
        end_time = time.time()
        
        elapsed_time = end_time - start_time
        print(f"Optimization completed in {elapsed_time/60:.2f} minutes")
        
        
    except KeyboardInterrupt:
        print("\n\n" + "="*60)
        print("OPTIMIZATION INTERRUPTED")
        print("="*60)
        
        # still evaluate what we have so far
        if 'optimizer' in locals() and hasattr(optimizer, 'all_individuals'):
            print("\n" + "="*60)
            print("PHASE 2: ROBUST EVALUATION OF TOP SOLUTIONS")
            print("="*60)
            
            final_results = optimizer.evaluate_top_solutions_with_seeds(
                optimizer.all_individuals,
                num_top=min(5, len(optimizer.all_individuals)),  # In case fewer than 5
                num_seeds=3
            )
            optimizer.final_results = final_results
        else:
            print("No solutions to evaluate yet.")
            
    except Exception as e:
        print(f"\n\nCritical failure: {e}")
        import traceback
        traceback.print_exc()
    else:
        # Only run if no exception occurred
        print("\n" + "="*60)
        print("PHASE 2: ROBUST EVALUATION OF TOP SOLUTIONS")
        print("="*60)
        
        final_results = optimizer.evaluate_top_solutions_with_seeds(
            optimizer.all_individuals,
            num_top=5,
            num_seeds=3
        )
        optimizer.final_results = final_results
        
    finally:
        # Always save whatever results we have
        if 'optimizer' in locals():
            optimizer.save_results("shade_optimization_results.csv")
            
        if hasattr(optimizer, 'final_results') and optimizer.final_results:
            optimizer.save_top_solutions_results(
                optimizer.final_results,
                "shade_top_solutions_multiseed.csv"
            )
        cleanup_memory()
        print("\nProcess Complete.")