In [None]:
# Install the libraries
# !pip install transformers datasets peft torch accelerate evaluate
!pip install  evaluate # '-q' flag to quietly install the packages without showing the output logs

In [None]:
import torch
import random
import numpy as np
import pandas as pd
import gc

# Import Hugging Face libraries
import evaluate
from datasets import load_dataset, Dataset, DatasetDict, IterableDataset, IterableDatasetDict
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer, DataCollatorWithPadding, EvalPrediction
from peft import LoraConfig, TaskType, get_peft_model

In [None]:
# Import the Python types
from typing import List, Dict, Any, Tuple, cast, Optional

from dataclasses import dataclass, asdict

Constants

In [None]:
SEED = 42
TRAIN_SAMPLE_SIZE = 3000
TOTAL_TRIALS = 20
NUM_LABELS = 6
MAX_LENGTH = 128
MODEL = "distilbert-base-uncased"

Hyperparameters

In [None]:
# Search Space (Discrete Options)

LR_MIN, LR_MAX = 1e-5, 2e-4
WARMUP_OPTIONS = [0.0, 0.03, 0.06, 0.1]

WARMUP_OPTIONS = [0.0, 0.06, 0.1]

RANK_OPTIONS = [2, 4, 8, 16, 24]

ALPHA_OPTIONS = [8, 16, 32, 64, 96]

DROPOUT_OPTIONS = [0.0, 0.05, 0.1, 0.2]

TARGET_MODULE_OPTIONS = [
    ["q_lin", "v_lin"],
    ["q_lin", "v_lin", "ffn.lin1", "ffn.lin2"]
]

In [None]:
def set_global_seed(seed: int):
  """
  Set the global seed for reproducibility.
  """
  random.seed(seed)
  np.random.seed(seed)
  torch.manual_seed(seed)

  # Check if CUDA GPU is available
  if torch.cuda.is_available():
    torch.cuda.manual_seed(seed)


In [None]:
set_global_seed(SEED)

In [None]:
class Individual:
  def __init__(self, genes: np.ndarray = None):
    if genes is None:
      self.genes = np.array([
        np.random.uniform(LR_MIN, LR_MAX), # learning rate
        np.random.randint(0, len(WARMUP_OPTIONS)), # warmup ratio
        np.random.randint(0, len(RANK_OPTIONS)), # rank
        np.random.randint(0, len(ALPHA_OPTIONS)), # alpha
        np.random.randint(0, len(DROPOUT_OPTIONS)), # dropout
        np.random.randint(0, len(TARGET_MODULE_OPTIONS)) # target modules
      ], dtype=object)

    else:
      self.genes = genes.copy()

    self.fitness = None

  def decode(self):
    # Continuous gene
    self.learning_rate = self.genes[0]

    warmup_index = np.clip(int(round(self.genes[1])), 0, len(WARMUP_OPTIONS) - 1)
    self.warmup_ratio = WARMUP_OPTIONS[warmup_index]

    rank_index = np.clip(int(round(self.genes[2])), 0, len(RANK_OPTIONS) - 1)
    self.rank = RANK_OPTIONS[rank_index]

    alpha_index = np.clip(int(round(self.genes[3])), 0, len(ALPHA_OPTIONS) - 1)
    self.alpha = ALPHA_OPTIONS[alpha_index]

    dropout_index = np.clip(int(round(self.genes[4])), 0, len(DROPOUT_OPTIONS) - 1)
    self.dropout = DROPOUT_OPTIONS[dropout_index]

    target_modules_index = np.clip(int(round(self.genes[5])), 0, len(TARGET_MODULE_OPTIONS) - 1)
    self.target_modules = TARGET_MODULE_OPTIONS[target_modules_index]

In [None]:
# for the parents
def selection(population, num_parents):
  parents = []
  for i in range(num_parents):
    idx1, idx2 = random.sample(range(len(population)), 2)
    if population[idx1].fitness > population[idx2].fitness:
      parents.append(population[idx1])
    else:
      parents.append(population[idx2])
  return parents

# for the children
def crossover(parents, offspring_size):
  offspring = []
  for _ in range(offspring_size):
    parent1, parent2 = random.sample(parents, 2)
    crossover_point = random.randint(1, len(parent1.genes)-1)
    child_genes = np.concatenate([parent1.genes[:crossover_point], parent2.genes[crossover_point:]])
    offspring.append(Individual(child_genes))
  return offspring

# to create new children from current children (with different characteristics)
# to create new children from current children (with different characteristics)
def mutation(individual: Individual):
  if random.random() < 0.1:
    mutation_index = random.randint(0, len(individual.genes)-1)

    if mutation_index == 0:
      individual.genes[mutation_index] = np.random.uniform(LR_MIN, LR_MAX)

    elif mutation_index == 1:
      individual.genes[mutation_index] = np.random.randint(0, len(WARMUP_OPTIONS))

    elif mutation_index == 2:
      individual.genes[mutation_index] = np.random.randint(0, len(RANK_OPTIONS))

    elif mutation_index == 3:
      individual.genes[mutation_index] = np.random.randint(0, len(ALPHA_OPTIONS))

    elif mutation_index == 4:
      individual.genes[mutation_index] = np.random.randint(0, len(DROPOUT_OPTIONS))

    elif mutation_index == 5:
      individual.genes[mutation_index] = np.random.randint(0, len(TARGET_MODULE_OPTIONS))

  return individual # Return the mutated individual, not the list


def blx_alpha_crossover(parent1: Individual, parent2: Individual,
                       min_bounds: np.ndarray, max_bounds: np.ndarray,
                       alpha: float = 0.5) -> Individual:
    """
    BLX-α crossover for continuous and index-based discrete variables.
    """
    child_genes = np.zeros(len(parent1.genes), dtype=object)

    for i in range(len(parent1.genes)):
        g1, g2 = parent1.genes[i], parent2.genes[i]

        if i == 0: # Learning rate is continuous
            g1, g2 = float(g1), float(g2)
            cmin, cmax = min(g1, g2), max(g1, g2)
            interval = cmax - cmin

            child_genes[i] = np.random.uniform(
                cmin - alpha * interval,
                cmax + alpha * interval
            )
        else:
            g1, g2 = float(g1), float(g2)
            cmin, cmax = min(g1, g2), max(g1, g2)
            interval = cmax - cmin

            child_genes[i] = np.random.uniform(
                cmin - alpha * interval,
                cmax + alpha * interval)

    child_genes = np.clip(child_genes.astype(float), min_bounds.astype(float), max_bounds.astype(float))

    for i in range(1, len(child_genes)):
        child_genes[i] = int(round(child_genes[i]))

    return Individual(child_genes.astype(object))



# def fitness_function(individual):
#   individual.decode()
#   learning_rate = individual.learning_rate
#   warmup_ratio = individual.warmup_ratio
#   rank = individual.warmup_ratio
#   alpha = individual.alpha
#   dropout = individual.dropout
#   target_modules = individual.target_modules

#   print





In [None]:
class DataManager:
  def __init__(self, model_name: str = MODEL):
    self.tokenizer = AutoTokenizer.from_pretrained(model_name)
    self.dataset: Optional[Dict[str, Any]] = None

  def prepare_data(self) -> Dict[str, Any]:
    """
    Loads the dataset and processes it.
    """

    # Check if the dataset is correctly loaded into the instance memory
    if self.dataset is not None:
        return self.dataset

    print("Loading and processing data...")

    # Load full dataset
    full_dataset = cast(DatasetDict, load_dataset("dair-ai/emotion"))

    # Use seed to ensure every run uses the SAME subset of data
    train_subset = full_dataset["train"].shuffle(seed=SEED).select(range(TRAIN_SAMPLE_SIZE))

    # Private helper method for text embeddings
    def _tokenize(examples):
      return self.tokenizer(
        examples["text"],
        truncation=True,
        padding="max_length",
        max_length=MAX_LENGTH
      )

    tokenized_train_dataset = train_subset.map(_tokenize, batched=True)
    tokenized_validation_dataset = full_dataset["validation"].map(_tokenize, batched=True)

    self.dataset = {
        "train": tokenized_train_dataset,
        "validation": tokenized_validation_dataset,
        "tokenizer": self.tokenizer,
        "num_labels": NUM_LABELS
    }

    print("Data preparation complete.")

    return self.dataset

In [None]:
data_manager = DataManager()
data_bundle = data_manager.prepare_data()

# **RCGA-BLX-ALPHA Engine**

In [None]:
class RCGAExperiment:
    def __init__(self, data_bundle: Dict[str, Any]):
        self.data = data_bundle
        self.results: List[Dict[str, Any]] = []
        self.metric = evaluate.load("accuracy")
        self.trial_counter = 0

        # Bounds for the 6 genes
        self.min_bounds = np.array([5e-6, 0.0, 2, 8, 0.0, 0.0])
        self.max_bounds = np.array([5e-4, 0.1, 24, 96, 0.2, 1.0])

    def _compute_metrics(self, eval_pred: EvalPrediction) -> Dict[str, float]:
        predictions, labels = eval_pred
        predictions = np.argmax(predictions, axis=1)
        result = self.metric.compute(predictions=predictions, references=labels)
        return cast(Dict[str, float], result)

    def _cleanup_memory(self):
        torch.cuda.empty_cache()
        gc.collect()

    def train_model(self, trial_id: int, params: Individual) -> float:
        print(f"Params: LR={params.learning_rate:.2e}, Rank={params.rank}, Alpha={params.alpha}")

        model = AutoModelForSequenceClassification.from_pretrained(
            MODEL, num_labels=self.data["num_labels"]
        )

        peft_config = LoraConfig(
            task_type=TaskType.SEQ_CLS,
            r=params.rank,
            lora_alpha=params.alpha,
            lora_dropout=params.dropout,
            target_modules=params.target_modules
        )
        model = get_peft_model(model, peft_config)

        current_seed = SEED + trial_id

        args = TrainingArguments(
            output_dir=f"./results/rcga_trial_{trial_id}",
            learning_rate=params.learning_rate,
            per_device_train_batch_size=16,
            per_device_eval_batch_size=16,
            num_train_epochs=3,
            warmup_ratio=params.warmup_ratio,
            weight_decay=0.01,
            eval_strategy="epoch",
            save_strategy="no",
            logging_strategy="epoch",
            seed=current_seed,
            report_to="none",
            load_best_model_at_end=False,
            optim="adamw_torch"
        )

        data_collator = DataCollatorWithPadding(tokenizer=self.data["tokenizer"])

        trainer = Trainer(
            model=model,
            args=args,
            train_dataset=self.data["train"],
            eval_dataset=self.data["validation"],
            data_collator=data_collator,
            compute_metrics=self._compute_metrics
        )

        trainer.train()
        eval_results = trainer.evaluate()

        del model
        del trainer
        self._cleanup_memory()

        return eval_results["eval_accuracy"]

    def fitness_function(self, individual: Individual) -> float:
        self.trial_counter += 1

        individual.decode()  # Decode the genes to set learning_rate, rank, etc.

        try:
            accuracy = self.train_model(self.trial_counter, individual)
            print(f"Accuracy: {accuracy:.4%}")
            return accuracy
        except Exception as e:
          print(f"Error in Trial {self.trial_counter}: {e}")
          self._cleanup_memory()
          return 0.0

    def run_rcga(self, population_size: int = 20, generations: int = 5):
        print(f"Starting RCGA: {population_size} individuals, {generations} generations.")

        # 1. Initialization
        population = [Individual() for _ in range(population_size)]

        best_individual = None
        best_fitness = -float('inf')

        for gen in range(generations):
            print(f"\n{'='*50}")
            print(f"GENERATION {gen + 1}/{generations}")
            print(f"{'='*50}")

            # Evaluation
            for i, individual in enumerate(population):
                print(f"\n[Gen {gen+1}, Individual {i+1}/{population_size}]")
                individual.fitness = self.fitness_function(individual)

                # tracking best solution
                if individual.fitness > best_fitness:
                    best_fitness = individual.fitness
                    best_individual = Individual(individual.genes.copy())
                    print(f"NEW BEST FITNESS: {best_fitness:.4%}")

                # Log result
                individual.decode()
                record = {
                    "generation": gen + 1,
                    "trial_id": self.trial_counter,
                    "accuracy": individual.fitness,
                    "learning_rate": individual.learning_rate,
                    "warmup_ratio": individual.warmup_ratio,
                    "rank": individual.rank,
                    "alpha": individual.alpha,
                    "dropout": individual.dropout,
                    "target_modules": str(individual.target_modules)
                }
                self.results.append(record)

            # Stopping if last generation
            if gen == generations - 1:
                break

            # Selection
            num_parents = population_size // 2
            parents = selection(population, num_parents)

            # Crossover + Mutation
            offspring = []
            offspring.append(Individual(best_individual.genes.copy()))  # Elitism

            while len(offspring) < population_size:
                parent1, parent2 = random.sample(parents, 2)
                child = blx_alpha_crossover(parent1, parent2, min_bounds=self.min_bounds,max_bounds=self.max_bounds, alpha=0.5)
                mutation(child)
                offspring.append(child)

            # Replacing the population
            population = offspring

        print(f"\n{'='*50}")
        print("RCGA OPTIMIZATION COMPLETE")
        print(f"{'='*50}")
        print(f"Best Fitness: {best_fitness:.4%}")

        return best_individual, best_fitness

    def save_results(self, filename: str = "rcga_results.csv"):
        """Saves results to CSV and prints summary"""
        if not self.results:
            print("No results to save.")
            return

        df = pd.DataFrame(self.results)
        df.to_csv(filename, index=False)
        print(f"\nResults saved to {filename}")

        best_run = df.loc[df['accuracy'].idxmax()]
        print("\n" + "="*40)
        print("TOP RESULT:")
        print("="*40)
        print(f"Accuracy: {best_run['accuracy']:.4%}")
        print(f"Generation: {int(best_run['generation'])}")
        print(f"Rank: {int(best_run['rank'])}, Alpha: {int(best_run['alpha'])}")
        print(f"LR: {best_run['learning_rate']:.2e}")
        print(f"Warmup: {best_run['warmup_ratio']}")
        print(f"Dropout: {best_run['dropout']}")

In [None]:
# Create the Experiment
import time

experiment = RCGAExperiment(data_bundle)

start_time = time.time()

# Run the experiment
experiment.run_rcga()

end_time = time.time()
elapsed_time = (end_time - start_time)

print(f"\n{'='*50}")
print("Initial training loop - time taken", elapsed_time)
print(f"{'='*50}\n")

# Save the results
experiment.save_results()

# getting top 5 best individuals

In [None]:
    def evaluate_top_solutions_with_seeds(self, num_top: int = 5, num_seeds: int = 3,
                                            results_filename: str = "top5_reeval_results.csv"):
          if not self.results:
              print("No results available. Run the experiment first.")
              return
    
          print(f"\n{'='*60}")
          print(f"EVALUATING TOP {num_top} SOLUTIONS WITH {num_seeds} DIFFERENT SEEDS")
          print(f"{'='*60}")
    
          # Convert results to DataFrame and get top solutions
          df = pd.DataFrame(self.results)
          df_sorted = df.sort_values('accuracy', ascending=False).head(num_top)
    
          final_results = []
          detailed_results = []  # For CSV export
    
          for rank, (idx, row) in enumerate(df_sorted.iterrows(), 1):
              individual = Individual()
              individual.learning_rate = row['learning_rate']
              individual.warmup_ratio = row['warmup_ratio']
              individual.rank = row['rank']
              individual.alpha = row['alpha']
              individual.dropout = row['dropout']
    
              # parsing target_modules back from string
              target_modules_str = row['target_modules']
              if 'ffn' in target_modules_str:
                  individual.target_modules = ["q_lin", "v_lin", "ffn.lin1", "ffn.lin2"]
              else:
                  individual.target_modules = ["q_lin", "v_lin"]
    
              original_accuracy = row['accuracy']
    
              print(f"\n{'='*60}")
              print(f"RANK {rank} - Original Accuracy: {original_accuracy:.4%}")
              print(f"{'='*60}")
              print(f"Params: LR={individual.learning_rate:.2e}, "
                    f"Warmup={individual.warmup_ratio}, Rank={individual.rank}")
              print(f"        Alpha={individual.alpha}, Dropout={individual.dropout}, "
                    f"Modules={len(individual.target_modules)}")
              print(f"\nRunning {num_seeds} evaluations with different seeds...")
    
              seed_accuracies = []
    
              for seed_run in range(num_seeds):
                  # high trial_id offset to avoid collision with optimization trials
                  trial_id = 10000 + (rank * 100) + seed_run
    
                  # using different seed for each run
                  current_seed = SEED + trial_id
    
                  print(f"  Seed run {seed_run + 1}/{num_seeds} (seed={current_seed})...", end=" ")
    
                  try:
                      accuracy = self.train_model(trial_id, individual)
                      seed_accuracies.append(accuracy)
                      print(f"Accuracy: {accuracy:.4%}")
    
                      # Save detailed result
                      detailed_results.append({
                          'rank': rank,
                          'seed_run': seed_run + 1,
                          'seed': current_seed,
                          'accuracy': accuracy,
                          'learning_rate': individual.learning_rate,
                          'warmup_ratio': individual.warmup_ratio,
                          'rank_param': individual.rank,
                          'alpha': individual.alpha,
                          'dropout': individual.dropout,
                          'target_modules': str(individual.target_modules),
                          'original_accuracy': original_accuracy
                      })
    
                  except Exception as e:
                      print(f"ERROR: {e}")
                      self._cleanup_memory()
                      seed_accuracies.append(0.0)
    
              # calculating statistics
              mean_acc = np.mean(seed_accuracies)
              std_acc = np.std(seed_accuracies)
              min_acc = np.min(seed_accuracies)
              max_acc = np.max(seed_accuracies)
    
              print(f"\n  Results: {mean_acc:.4%} ± {std_acc:.4%}")
              print(f"  Range: [{min_acc:.4%}, {max_acc:.4%}]")
              print(f"  Individual runs: {[f'{acc:.4%}' for acc in seed_accuracies]}")
    
              final_results.append({
                  'rank': rank,
                  'learning_rate': individual.learning_rate,
                  'warmup_ratio': individual.warmup_ratio,
                  'rank_param': individual.rank,
                  'alpha': individual.alpha,
                  'dropout': individual.dropout,
                  'target_modules': str(individual.target_modules),
                  'original_accuracy': original_accuracy,
                  'mean_accuracy': mean_acc,
                  'std_accuracy': std_acc,
                  'min_accuracy': min_acc,
                  'max_accuracy': max_acc,
                  'seed_accuracies': seed_accuracies
              })
    
          # saving detailed results to CSV in the exemplar format
          if detailed_results:
              df_detailed = pd.DataFrame(detailed_results)
    
              # Pivot to match the exemplar format - one row per rank with seed columns
              df_pivot = df_detailed.pivot_table(
                  index=['rank', 'learning_rate', 'warmup_ratio', 'rank_param', 'alpha',
                         'dropout', 'target_modules', 'original_accuracy'],
                  columns='seed_run',
                  values='accuracy',
                  aggfunc='first'
              ).reset_index()
    
              # Rename the seed columns to match exemplar format
              seed_columns = {i: f'seed_{i}_accuracy' for i in range(1, num_seeds + 1)}
              df_pivot.rename(columns=seed_columns, inplace=True)
    
              # Add mean and std accuracy columns
              seed_cols = [f'seed_{i}_accuracy' for i in range(1, num_seeds + 1)]
              df_pivot['mean_accuracy'] = df_pivot[seed_cols].mean(axis=1)
              df_pivot['std_accuracy'] = df_pivot[seed_cols].std(axis=1)
    
              # Rename columns to match exemplar exactly
              df_pivot.rename(columns={'rank_param': 'rank_r'}, inplace=True)
    
              # Reorder columns to match exemplar format
              column_order = ['rank', 'learning_rate', 'warmup_ratio', 'rank_r', 'alpha',
                              'dropout', 'target_modules', 'original_accuracy', 'mean_accuracy',
                              'std_accuracy'] + seed_cols
              df_pivot = df_pivot[column_order]
    
              # Save to CSV
              df_pivot.to_csv(results_filename, index=False)
              print(f"\n{'='*60}")
              print(f"Detailed results saved to {results_filename}")
    
          # Print final summary
          print(f"\n{'='*60}")
          print(f"FINAL SUMMARY - TOP {num_top} SOLUTIONS")
          print(f"{'='*60}")
    
          for result in final_results:
              print(f"\n{result['rank']}. Mean Accuracy: {result['mean_accuracy']:.4%} ± {result['std_accuracy']:.4%}")
              print(f"   Original: {result['original_accuracy']:.4%}, "
                    f"Range: [{result['min_accuracy']:.4%}, {result['max_accuracy']:.4%}]")
              print(f"   LR: {result['learning_rate']:.2e}, Warmup: {result['warmup_ratio']}, "
                    f"Rank: {result['rank_param']}")
              print(f"   Alpha: {result['alpha']}, Dropout: {result['dropout']}, "
                    f"Modules: {result['target_modules']}")
              runs_str = [f"{acc:.4%}" for acc in result['seed_accuracies']]
              print(f"   Runs: {runs_str}")
    
          # identifying most robust solution (highest mean - std)
          best_robust_idx = max(range(len(final_results)),
                                key=lambda i: final_results[i]['mean_accuracy'] - final_results[i]['std_accuracy'])
    
          print(f"\n{'='*60}")
          print("MOST ROBUST SOLUTION (Mean - Std):")
          print(f"{'='*60}")
          robust_result = final_results[best_robust_idx]
          print(f"Rank {robust_result['rank']}: {robust_result['mean_accuracy']:.4%} ± {robust_result['std_accuracy']:.4%}")
          print(f"Robustness Score: {robust_result['mean_accuracy'] - robust_result['std_accuracy']:.4%}")
    
          return final_results

Evaluation of top 5 best individuals

In [None]:
# Import the types module
import types

# Bind the function to your existing experiment instance
experiment.evaluate_top_solutions_with_seeds = types.MethodType(
    evaluate_top_solutions_with_seeds, 
    experiment
)

In [None]:
top5_results = experiment.evaluate_top_solutions_with_seeds(
    num_top=5,
    num_seeds=3,
    results_filename="top5_re_evaluation_results.csv"
)

# Step 3: Access results
best_robust = top5_results[0]
print(f"Best robust solution: {best_robust['mean_accuracy']:.4%} ± {best_robust['std_accuracy']:.4%}")

# experiment.plot_top5_reevaluation(top5_results)