In [1]:
!pip install datasets transformers evaluate deap 



In [2]:
!pip install accelerate -U



In [3]:
!pip install transformers[torch]



In [4]:
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification, DataCollatorWithPadding, TrainingArguments, Trainer
from datasets import load_dataset
import numpy as np
import random
import copy
import accelerate
import evaluate

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Load the SST-2 dataset
dataset = load_dataset("glue", "sst2")

# Check label distribution for inspection (optional)
label_distribution = np.array(dataset['train']['label']).sum() / len(dataset['train']['label'])
print(f"Label distribution in training set: {label_distribution}")

# Model checkpoint
model_checkpoint = 'roberta-base'

# Label mapping
id2label = {0: "Negative", 1: "Positive"}
label2id = {"Negative": 0, "Positive": 1}

# Load the pretrained model and tokenizer
model = AutoModelForSequenceClassification.from_pretrained(model_checkpoint, num_labels=2, id2label=id2label, label2id=label2id)
tokenizer = AutoTokenizer.from_pretrained(model_checkpoint, add_prefix_space=True)

# Add padding token if not present
if tokenizer.pad_token is None:
    tokenizer.add_special_tokens({'pad_token': '[PAD]'})
    model.resize_token_embeddings(len(tokenizer))

# Preprocess data
def tokenize_function(examples):
    return tokenizer(examples["sentence"], truncation=True, padding='max_length', max_length=128)

tokenized_dataset = dataset.map(tokenize_function, batched=True)
tokenized_dataset = tokenized_dataset.rename_column("label", "labels")

data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

# Load accuracy metric
accuracy = evaluate.load("accuracy")

def compute_metrics(eval_preds):
    logits, labels = eval_preds
    predictions = np.argmax(logits, axis=-1)
    return accuracy.compute(predictions=predictions, references=labels)

# Define the Genetic Algorithm
class GeneticAlgorithm:
    def __init__(self, objective_function, population_size=6, num_generations=3, target_modules=None):
        self.objective_function = objective_function
        self.population_size = population_size
        self.num_generations = num_generations
        self.target_modules = target_modules if target_modules is not None else []

    def optimize(self):
        # Initialize the population
        print("Initializing population...")
        population = [self.create_individual() for _ in range(self.population_size)]

        # Evaluate the initial population
        print("Evaluating initial population...")
        fitness = self.objective_function(population[0])
        fitness = [fitness for i in range(0,6)]
        print(f"Initial population fitness: {fitness}")

        # Evolve the population
        for generation in range(self.num_generations):
            print(f"\nGeneration {generation + 1}/{self.num_generations}")
            new_population = []
            while len(new_population) < self.population_size:
                # Select parents
                parents = self.select_parents(population, fitness)
                print(f"Selected parents for crossover: {parents}")

                # Perform crossover and mutation
                child1, child2 = self.crossover_and_mutate(parents[0], parents[1])

                # Evaluate the offspring
                print("Evaluating offspring...")
                child1_fitness = self.objective_function(child1)
                child2_fitness = self.objective_function(child2)
                print(f"Offspring fitness: {[child1_fitness, child2_fitness]}")

                new_population.extend([(child1, child1_fitness), (child2, child2_fitness)])

                # Sort new_population by fitness (ascending order)
                new_population.sort(key=lambda x: x[1])

                # Strip the last two individuals with the least fitness
                new_population = new_population[:self.population_size]

            # Replace the old population with the new one
            population = [individual for individual, fit in new_population]
            fitness = [fit for individual, fit in new_population]
            print(f"Population fitness after generation {generation + 1}: {fitness}")

        # Return the best individual
        best_index = fitness.index(min(fitness))
        print(f"\nBest individual found with fitness: {fitness[best_index]}")
        return population[best_index]

    def create_individual(self):
      individual = AutoModelForSequenceClassification.from_pretrained(model_checkpoint, num_labels=2, id2label=id2label, label2id=label2id)
      for name, param in individual.named_parameters():
          if any(name.startswith(target) for target in self.target_modules):
              param.requires_grad = True
          else:
              param.requires_grad = False
      return individual.to(device)


    def select_parents(self, population, fitness):
        # Sort the population by fitness (ascending order)
        sorted_population = [x for _, x in sorted(zip(fitness, population), key=lambda pair: pair[0])]
        # Select the best two parents
        parents = sorted_population[:2]
        return parents

    def crossover_and_mutate(self, parent1, parent2):
        child1 = copy.deepcopy(parent1)
        child2 = copy.deepcopy(parent2)

        # Crossover and mutation
        for name, param in child1.named_parameters():
            if 'classifier' not in name:  # Avoid changing the classifier layer directly
                param.data = 0.5 * parent1.state_dict()[name].data + 0.5 * parent2.state_dict()[name].data
                param.data += torch.randn_like(param) * 0.05  # Increased Mutation

        for name, param in child2.named_parameters():
            if 'classifier' not in name:  # Avoid changing the classifier layer directly
                param.data = 0.5 * parent2.state_dict()[name].data + 0.5 * parent1.state_dict()[name].data
                param.data += torch.randn_like(param) * 0.05  # Increased Mutation

        return child1, child2

def print_trainable_params(model):
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print(f"Number of trainable parameters: {trainable_params}")

def objective_function(model):
    # Define the training arguments
    # print(model)
    print_trainable_params(model)

    training_args = TrainingArguments(
        output_dir="./results",
        learning_rate=1e-5,
        per_device_train_batch_size=32,
        per_device_eval_batch_size=32,
        num_train_epochs=1,  # Use a smaller number of epochs for faster fitness evaluation
        weight_decay=0.01,
        evaluation_strategy="epoch",
        save_strategy="no",
        logging_strategy="no"
    )

    # Create the trainer object
    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=tokenized_dataset["train"],
        eval_dataset=tokenized_dataset["validation"],
        tokenizer=tokenizer,
        data_collator=data_collator,
        compute_metrics=compute_metrics
    )

    # Train the model
    print("Training model for fitness evaluation...")
    trainer.train()

    # Evaluate the model
    results = trainer.evaluate()
    acc = results["eval_accuracy"]
    print(f"Evaluated model accuracy: {acc}")

    return -acc  # Negative accuracy for minimization

# Define the GA parameters
population_size = 6
num_generations = 3

# Define the target modules to freeze (example: 'encoder.layer.0' will freeze the first encoder layer)
target_modules = ['roberta.encoder.layer.11','roberta.encoder.layer.10', 'roberta.encoder.layer.9','classifier']

# Create the GA optimizer
optimizer = GeneticAlgorithm(
    objective_function,
    population_size=population_size,
    num_generations=num_generations,
    target_modules=target_modules
)

# Run the GA optimization
best_model = optimizer.optimize()
print(f"Best model accuracy: {-objective_function(best_model)}")

# Generate predictions
best_model.to(device)

print("\nTrained model predictions:")
print("--------------------------")
text_list = [
    "a feel-good picture in the best sense of the term .",
    "resourceful and ingenious entertainment .",
    "it 's just incredibly dull .",
    "the movie 's biggest offense is its complete and utter lack of tension .",
    "impresses you with its open-endedness and surprises .",
    "unless you are in dire need of a diesel fix , there is no real reason to see it ."
]

for text in text_list:
    inputs = tokenizer.encode(text, return_tensors="pt").to(device)
    logits = best_model(inputs).logits
    predictions = torch.argmax(logits, dim=-1)
    print(text + " - " + id2label[predictions.item()])


ModuleNotFoundError: No module named 'accelerate'