In [1]:
# Install the libraries
# !pip install transformers datasets peft torch accelerate evaluate
!pip install -q transformers datasets peft torch accelerate evaluate # '-q' flag to quietly install the packages without showing the output logs

In [2]:
import torch
import random
import numpy as np
import pandas as pd
import gc

# Import Hugging Face libraries
import evaluate
from datasets import load_dataset, Dataset, DatasetDict, IterableDataset, IterableDatasetDict
from transformers import AutoTokenizer, AutoModelForSequenceClassification, TrainingArguments, Trainer, DataCollatorWithPadding, EvalPrediction
from peft import LoraConfig, TaskType, get_peft_model

In [3]:
# Import the Python types
from typing import List, Dict, Any, Tuple, cast, Optional

from dataclasses import dataclass, asdict

In [4]:
import shutil
import os

if os.path.exists("/content/results"):
  shutil.rmtree("/content/results")

Constants

In [5]:
SEED = 42
TRAIN_SAMPLE_SIZE = 3000
TOTAL_TRIALS = 20
NUM_LABELS = 6
MAX_LENGTH = 128
MODEL = "distilbert-base-uncased"

Hyperparameters

In [6]:
# Search Space (Discrete Options)
WARMUP_OPTIONS = [0.0, 0.06, 0.1]
RANK_OPTIONS = [2, 4, 8, 16, 24, 32, 48]
ALPHA_OPTIONS = [8, 16, 32, 64, 96]
DROPOUT_OPTIONS = [0, 0.05, 0.1, 0.2]
TARGET_MODULE_OPTIONS = [
    ["q_lin", "v_lin"],                           # Index 0
    ["q_lin", "v_lin", "ffn.lin1", "ffn.lin2"]    # Index 1
]

In [7]:
def set_global_seed(seed: int):
  """
  Set the global seed for reproducibility.
  """
  random.seed(seed)
  np.random.seed(seed)
  torch.manual_seed(seed)

  # Check if CUDA GPU is available
  if torch.cuda.is_available():
    torch.cuda.manual_seed(seed)


In [8]:
set_global_seed(SEED)

In [9]:
class Individual:
  def __init__(self, genes: np.ndarray = None):
    if genes is None:
      self.genes = np.array([
        np.random.uniform(5e-6, 5e-4), # learning rate
        np.random.uniform(0.0, 0.1), # warmup ratio
        np.random.uniform(2, 24), # rank
        np.random.uniform(8, 96), # alpha
        np.random.uniform(0.0, 0.2), # dropout
        np.random.random() #v to choose between the target modules
      ])
    else:
      self.genes = genes.copy()

    self.fitness = None

  def decode(self):
    self.learning_rate = self.genes[0]
    self.warmup_ratio  = min(WARMUP_OPTIONS, key=lambda x: abs(x - self.genes[1]))
    self.rank = min(RANK_OPTIONS, key=lambda x: abs(x - self.genes[2]))
    self.alpha = min(ALPHA_OPTIONS, key=lambda x : abs(x - self.genes[3]))
    self.dropout = min(DROPOUT_OPTIONS, key=lambda x : abs(x - self.genes[4]))
    if self.genes[5] < 0.5:
      self.target_modules = ["q_lin", "v_lin"]
    else:
        self.target_modules = ["q_lin", "v_lin", "ffn.lin1", "ffn.lin2"]

In [10]:
# for the parents
def selection(population, num_parents):
  parents = []
  for i in range(num_parents):
    idx1, idx2 = random.sample(range(len(population)), 2)
    if population[idx1].fitness > population[idx2].fitness:
      parents.append(population[idx1])
    else:
      parents.append(population[idx2])
  return parents

# for the children
def crossover(parents, offspring_size):
  offspring = []
  for _ in range(offspring_size):
    parent1, parent2 = random.sample(parents, 2)
    crossover_point = random.randint(1, len(parent1.genes)-1)
    child_genes = np.concatenate([parent1.genes[:crossover_point], parent2.genes[crossover_point:]])
    offspring.append(Individual(child_genes))
  return offspring

# to create new children from current children (with different characteristics)
def mutation(individual: Individual, mutation_rate: float = 0.1, mutation_strength: float = 0.1):
    bounds = [(5e-6, 5e-4), (0.0, 0.1), (2, 24), (8, 96), (0.0, 0.2), (0.0, 1.0)]

    for i in range(len(individual.genes)):
        if random.random() < mutation_rate:
            low, high = bounds[i]
            range_size = high - low
            noise = np.random.normal(0, mutation_strength * range_size)
            individual.genes[i] = np.clip(individual.genes[i] + noise, low, high)


def blx_alpha_crossover(parent1: Individual, parent2: Individual, alpha: float=0.5) -> Individual:
  child_genes = np.zeros(len(parent1.genes))

  for i in range(len(parent1.genes)):
    g1, g2 = parent1.genes[i], parent2.genes[i]
    cmin, cmax = min(g1, g2), max(g1, g2)
    interval = cmax - cmin

    child_genes[i] = np.random.uniform(cmin - alpha * interval, cmax + alpha * interval)

    bounds = [(5e-6, 5e-4), (0.0, 0.1), (2, 24), (8, 96), (0.0, 0.2), (0.0, 1.0)]
    for i, (low, high) in enumerate(bounds):
        child_genes[i] = np.clip(child_genes[i], low, high)

    return Individual(child_genes)



# def fitness_function(individual):
#   individual.decode()
#   learning_rate = individual.learning_rate
#   warmup_ratio = individual.warmup_ratio
#   rank = individual.warmup_ratio
#   alpha = individual.alpha
#   dropout = individual.dropout
#   target_modules = individual.target_modules

#   print





In [11]:
class DataManager:
  def __init__(self, model_name: str = MODEL):
    self.tokenizer = AutoTokenizer.from_pretrained(model_name)
    self.dataset: Optional[Dict[str, Any]] = None

  def prepare_data(self) -> Dict[str, Any]:
    """
    Loads the dataset and processes it.
    """

    # Check if the dataset is correctly loaded into the instance memory
    if self.dataset is not None:
        return self.dataset

    print("Loading and processing data...")

    # Load full dataset
    full_dataset = cast(DatasetDict, load_dataset("dair-ai/emotion"))

    # Use seed to ensure every run uses the SAME subset of data
    train_subset = full_dataset["train"].shuffle(seed=SEED).select(range(TRAIN_SAMPLE_SIZE))

    # Private helper method for text embeddings
    def _tokenize(examples):
      return self.tokenizer(
        examples["text"],
        truncation=True,
        padding="max_length",
        max_length=MAX_LENGTH
      )

    tokenized_train_dataset = train_subset.map(_tokenize, batched=True)
    tokenized_validation_dataset = full_dataset["validation"].map(_tokenize, batched=True)

    self.dataset = {
        "train": tokenized_train_dataset,
        "validation": tokenized_validation_dataset,
        "tokenizer": self.tokenizer,
        "num_labels": NUM_LABELS
    }

    print("Data preparation complete.")

    return self.dataset

In [12]:
data_manager = DataManager()
data_bundle = data_manager.prepare_data()

# **RCGA-BLX-ALPHA Engine**

In [13]:
class RCGAExperiment:
    def __init__(self, data_bundle: Dict[str, Any]):
        self.data = data_bundle
        self.results: List[Dict[str, Any]] = []
        self.metric = evaluate.load("accuracy")
        self.trial_counter = 0

        # Bounds for the 6 genes
        self.min_bounds = np.array([5e-6, 0.0, 2, 8, 0.0, 0.0])
        self.max_bounds = np.array([5e-4, 0.1, 24, 96, 0.2, 1.0])

    def _compute_metrics(self, eval_pred: EvalPrediction) -> Dict[str, float]:
        predictions, labels = eval_pred
        predictions = np.argmax(predictions, axis=1)
        result = self.metric.compute(predictions=predictions, references=labels)
        return cast(Dict[str, float], result)

    def _cleanup_memory(self):
        torch.cuda.empty_cache()
        gc.collect()

    def train_model(self, trial_id: int, params: Individual) -> float:
        print(f"Params: LR={params.learning_rate:.2e}, Rank={params.rank}, Alpha={params.alpha}")

        model = AutoModelForSequenceClassification.from_pretrained(
            MODEL, num_labels=self.data["num_labels"]
        )

        peft_config = LoraConfig(
            task_type=TaskType.SEQ_CLS,
            r=params.rank,
            lora_alpha=params.alpha,
            lora_dropout=params.dropout,
            target_modules=params.target_modules
        )
        model = get_peft_model(model, peft_config)

        current_seed = SEED + trial_id

        args = TrainingArguments(
            output_dir=f"./results/rcga_trial_{trial_id}",
            learning_rate=params.learning_rate,
            per_device_train_batch_size=16,
            per_device_eval_batch_size=16,
            num_train_epochs=3,
            warmup_ratio=params.warmup_ratio,
            weight_decay=0.01,
            eval_strategy="epoch",
            save_strategy="no",
            logging_strategy="epoch",
            seed=current_seed,
            report_to="none",
            load_best_model_at_end=False,
            optim="adamw_torch"
        )

        data_collator = DataCollatorWithPadding(tokenizer=self.data["tokenizer"])

        trainer = Trainer(
            model=model,
            args=args,
            train_dataset=self.data["train"],
            eval_dataset=self.data["validation"],
            data_collator=data_collator,
            compute_metrics=self._compute_metrics
        )

        trainer.train()
        eval_results = trainer.evaluate()

        del model
        del trainer
        self._cleanup_memory()

        return eval_results["eval_accuracy"]

    def fitness_function(self, individual: Individual) -> float:
        self.trial_counter += 1

        individual.decode()  # Decode the genes to set learning_rate, rank, etc.

        try:
            accuracy = self.train_model(self.trial_counter, individual)  # ✅ Pass the individual directly
            print(f"Accuracy: {accuracy:.4%}")
            return accuracy
        except Exception as e:
          print(f"Error in Trial {self.trial_counter}: {e}")
          self._cleanup_memory()
          return 0.0

    def run_rcga(self, population_size: int = 20, generations: int = 10):
        print(f"Starting RCGA: {population_size} individuals, {generations} generations.")

        # 1. Initialization
        population = [Individual() for _ in range(population_size)]

        best_individual = None
        best_fitness = -float('inf')

        for gen in range(generations):
            print(f"\n{'='*50}")
            print(f"GENERATION {gen + 1}/{generations}")
            print(f"{'='*50}")

            # Evaluation
            for i, individual in enumerate(population):
                print(f"\n[Gen {gen+1}, Individual {i+1}/{population_size}]")
                individual.fitness = self.fitness_function(individual)

                # tracking best solution
                if individual.fitness > best_fitness:
                    best_fitness = individual.fitness
                    best_individual = Individual(individual.genes.copy())
                    print(f"NEW BEST FITNESS: {best_fitness:.4%}")

                # Log result
                individual.decode()
                record = {
                    "generation": gen + 1,
                    "trial_id": self.trial_counter,
                    "accuracy": individual.fitness,
                    "learning_rate": individual.learning_rate,
                    "warmup_ratio": individual.warmup_ratio,
                    "rank": individual.rank,
                    "alpha": individual.alpha,
                    "dropout": individual.dropout,
                    "target_modules": str(individual.target_modules)
                }
                self.results.append(record)

            # Stopping if last generation
            if gen == generations - 1:
                break

            # Selection
            num_parents = population_size // 2
            parents = selection(population, num_parents)

            # Crossover + Mutation
            offspring = []
            offspring.append(Individual(best_individual.genes.copy()))  # Elitism

            while len(offspring) < population_size:
                parent1, parent2 = random.sample(parents, 2)
                child = blx_alpha_crossover(parent1, parent2, alpha=0.5)
                mutation(child)
                offspring.append(child)

            # Replacing the population
            population = offspring

        print(f"\n{'='*50}")
        print("RCGA OPTIMIZATION COMPLETE")
        print(f"{'='*50}")
        print(f"Best Fitness: {best_fitness:.4%}")

        return best_individual, best_fitness

    def save_results(self, filename: str = "rcga_results.csv"):
        """Saves results to CSV and prints summary"""
        if not self.results:
            print("No results to save.")
            return

        df = pd.DataFrame(self.results)
        df.to_csv(filename, index=False)
        print(f"\nResults saved to {filename}")

        best_run = df.loc[df['accuracy'].idxmax()]
        print("\n" + "="*40)
        print("TOP RESULT:")
        print("="*40)
        print(f"Accuracy: {best_run['accuracy']:.4%}")
        print(f"Generation: {int(best_run['generation'])}")
        print(f"Rank: {int(best_run['rank'])}, Alpha: {int(best_run['alpha'])}")
        print(f"LR: {best_run['learning_rate']:.2e}")
        print(f"Warmup: {best_run['warmup_ratio']}")
        print(f"Dropout: {best_run['dropout']}")

In [14]:
experiment = RCGAExperiment(data_bundle)

# Use run_rcga() instead of run_experiment()
best_individual, best_fitness = experiment.run_rcga(population_size=20, generations=5)

experiment.save_results()