In [1]:
import time
import torch
import torch.nn as nn
import numpy as np
from tqdm import tqdm
from torch.utils.data import Dataset, DataLoader
from datasets import load_dataset
from transformers import AutoModelForSequenceClassification, AutoTokenizer, logging

logging.set_verbosity_error()
device = 'cuda' if torch.cuda.is_available() else 'mps' if torch.backends.mps.is_available() else 'cpu'

In [2]:
def print_count_parameters(model):
    """
    This function takes a PyTorch model and returns the number of trainable and non-trainable parameters.
    
    Args:
    model (torch.nn.Module): The PyTorch model to inspect.

    """
    trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
    non_trainable_params = sum(p.numel() for p in model.parameters() if not p.requires_grad)

    percent_trainable = (trainable_params / (trainable_params + non_trainable_params)) * 100
    percent_frozen = 100 - percent_trainable

    print('Total Parameters:', trainable_params + non_trainable_params)
    print('Trainable:', trainable_params, f'({percent_trainable:.2f}%)')
    print('Frozen:', non_trainable_params, f'({percent_frozen:.2f}%)')



In [3]:
# Load the Gemma 2B tokenizer and model
model_name = "openai-community/gpt2-large" 

tokenizer = AutoTokenizer.from_pretrained(model_name)
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token  # Using eos_token as the pad_token if it's not defined


In [4]:
model = AutoModelForSequenceClassification.from_pretrained(model_name, ignore_mismatched_sizes=True).to(device)
model.config.pad_token_id = tokenizer.pad_token_id

In [5]:
# Load the SST-2 dataset
dataset = load_dataset("glue", "sst2")

In [6]:
class SST2Dataset(Dataset):
    def __init__(self, dataset, tokenizer, max_length):
        self.dataset = dataset
        self.tokenizer = tokenizer

        self.max_length = max_length
        
        # Tokenize the sentences in the dataset during initialization
        self.tokenized_data = self.tokenizer(
            [example['sentence'] for example in self.dataset],
            padding=True,
            truncation=True,
            max_length=self.max_length,
            return_tensors='pt'
        )
        
        # Store the labels separately
        self.labels = [example['label'] for example in self.dataset]

    
    def __getitem__(self, idx):
        input_ids = self.tokenized_data['input_ids'][idx]
        attention_mask = self.tokenized_data['attention_mask'][idx]
        label = self.labels[idx]
        return input_ids, attention_mask, label

    def __len__(self):
        # Return the total number of samples
        return len(self.dataset)




In [7]:
dataset_10000 = dataset['train'].select(range(5000))

In [8]:
train_data = SST2Dataset(dataset_10000, tokenizer=tokenizer, max_length=32)
val_data = SST2Dataset(dataset['validation'], tokenizer=tokenizer, max_length=32)
test_data = SST2Dataset(dataset['test'], tokenizer=tokenizer, max_length=32)

train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
val_loader = DataLoader(val_data, batch_size=32)
test_loader = DataLoader(test_data, batch_size=32)

# Full Fine-Tuning


In [9]:
print_count_parameters(model)

Total Parameters: 774032640
Trainable: 774032640 (100.00%)
Frozen: 0 (0.00%)


In [10]:
def train_model(model, train_loader, epochs, lr=5e-5, print_loss_per_step=0):
    accuracies, losses = [], []
    optimizer = torch.optim.AdamW(model.parameters(), lr=lr)
    model.train()
    for i in range(epochs):
        step = 0
        total_loss = 0
        predictions, truth_values = [], []
        for input_ids, attention_mask, labels in tqdm(train_loader):
            input_ids = input_ids.to(device)
            attention_mask = attention_mask.to(device)
            labels = labels.to(device)

            outputs = model(input_ids, attention_mask=attention_mask, labels=labels)
            loss = outputs.loss
            
            batch_predictions = torch.argmax(outputs.logits, dim=1).tolist()
            predictions.extend(batch_predictions)
            truth_values.extend(labels.tolist())

            # Perform backpropagation
            loss.backward()

            # Update the weights
            optimizer.step()

            # Clear the gradients
            optimizer.zero_grad()

            if print_loss_per_step > 0 and step % print_loss_per_step == 0:
                print(f"Step {step}, Loss: {loss.item()}")
            step += 1
            total_loss += loss.item()



        loss = total_loss / len(train_loader)
        
        predictions = np.array(predictions)
        truth_values = np.array(truth_values)
        accuracy = np.mean(predictions == truth_values)
        print(f"Epoch {i+1}, Loss: {loss}")
        print(f"Accuracy: {accuracy}")
        print('--'*20)

        accuracies.append(accuracy)
        losses.append(loss)
    return accuracies, losses



In [11]:
def evaluate_model(model, data_loader):
    model.eval()
    predictions, truth_values = [], []
    with torch.no_grad():
        for input_ids, attention_mask, labels in tqdm(data_loader):
            input_ids = input_ids.to(device)
            attention_mask = attention_mask.to(device)
            labels = labels.to(device)

            outputs = model(input_ids, attention_mask=attention_mask)
            batch_predictions = torch.argmax(outputs.logits, dim=1).tolist()
            predictions.extend(batch_predictions)
            truth_values.extend(labels.tolist())

    predictions = np.array(predictions)
    truth_values = np.array(truth_values)
    accuracy = np.mean(predictions == truth_values)
    
    return round(float(accuracy), 2)

# Full fine-tuning

In [12]:
start = time.time()

full_tuning__accuracy, full_tuning_loss = train_model(model, train_loader, epochs=5)

end = time.time()

full_tuning_time = end - start
print(f"Training time: {full_tuning_time:.2f} seconds")

 48%|████▊     | 76/157 [01:20<01:23,  1.03s/it]

# LoRA Fine Tuning with peft from HuggingFace

In [13]:
from peft import get_peft_model, LoraConfig, TaskType


lora_config = LoraConfig(
    task_type=TaskType.SEQ_CLS,  # Sequence classification task
    r=4,  # Low-rank dimension
    lora_alpha=4,  # Alpha scaling factor
    lora_dropout=0.1,  # Dropout rate to use in LoRA
    target_modules=["c_attn"]  # Apply LoRA to attention layers (can be adjusted)
)

In [14]:
# del model
model = AutoModelForSequenceClassification.from_pretrained(model_name).to(device)
model.config.pad_token_id = tokenizer.pad_token_id

model = get_peft_model(model, lora_config).to(device)




In [15]:
print_count_parameters(model)

Total Parameters: 774772480
Trainable: 739840 (0.10%)
Frozen: 774032640 (99.90%)


In [16]:
start = time.time()

lora_accuracy, lora_loss = train_model(model, train_loader, lr=5e-5, epochs=5)

end = time.time()

lora_tuning_time = end - start
print(f"Training time: {lora_tuning_time:.2f} seconds")

100%|██████████| 313/313 [03:40<00:00,  1.42it/s]


Epoch 1, Loss: 0.5008679023518349
Accuracy: 0.7501
----------------------------------------


100%|██████████| 313/313 [03:47<00:00,  1.38it/s]


Epoch 2, Loss: 0.6714163522560376
Accuracy: 0.6102
----------------------------------------


100%|██████████| 313/313 [03:45<00:00,  1.39it/s]


Epoch 3, Loss: 0.7231310015669266
Accuracy: 0.5354
----------------------------------------


 82%|████████▏ | 257/313 [03:05<00:40,  1.39it/s]


KeyboardInterrupt: 

In [20]:
saved_time = full_tuning_time - lora_tuning_time
saved_percent = (saved_time / full_tuning_time) * 100

print(f"LoRA saved {saved_time:.2f} seconds ({saved_percent:.2f}%) in training time.")

NameError: name 'full_tuning_time' is not defined

In [52]:
lora_val_accuracy = evaluate_model(model, val_loader)
print(f"LoRA Validation Accuracy: {lora_val_accuracy}")

  0%|          | 0/28 [00:00<?, ?it/s]

0.88

In [28]:
class LoRA(nn.Module):
    def __init__(self, original_layer, alpha, rank=8):
        super(LoRA, self).__init__()
        
        # Store the original layer's weight
        self.original_weight = original_layer.weight
        self.alpha = alpha
        
        # Perform weight decomposition into two low-rank matrices A and B
        # We initialize A and B with random values
        self.rank = rank

        std_dev = 1 / torch.sqrt(torch.tensor(rank).float())
        self.A = nn.Parameter(torch.randn(original_layer.weight.shape[0], rank) * std_dev)
        self.B = nn.Parameter(torch.zeros(rank, original_layer.weight.shape[1]))
        
        self.original_weight.requires_grad = False
    
    def forward(self, x):
        # Approximate the original weight as the product of A and B
        low_rank_weight = self.alpha * torch.matmul(self.A, self.B)
        adapted_weight = self.original_weight + low_rank_weight
        
        # Apply the adapted weight to the input
        return torch.matmul(x, adapted_weight)
    
class DoRA(nn.Module):
    def __init__(self, original_layer, alpha, rank=8):
        super(DoRA, self).__init__()
        
        # Store the original layer's weight
        self.original_weight = original_layer.weight
        self.alpha = alpha
        
        # Perform weight decomposition into two low-rank matrices A and B
        # We initialize A and B with random values
        self.rank = rank

        std_dev = 1 / torch.sqrt(torch.tensor(rank).float())
        self.A = nn.Parameter(torch.randn(original_layer.weight.shape[0], rank) * std_dev)
        self.B = nn.Parameter(torch.zeros(rank, original_layer.weight.shape[1]))
        self.m = nn.Parameter(torch.ones(1, original_layer.weight.shape[1]))
        
        self.original_weight.requires_grad = False
    
    def forward(self, x):
        # Approximate the original weight as the product of A and B
        low_rank_weight = self.alpha * torch.matmul(self.A, self.B)

        low_rank_weight_norm = low_rank_weight / (low_rank_weight.norm(p=2, dim=1, keepdim=True) + 1e-9)
        
        # Add the original (frozen) weight back to the low-rank adaptation
        low_rank_weight = self.m * low_rank_weight_norm
        adapted_weight = self.original_weight + low_rank_weight
        
        # Apply the adapted weight to the input
        return torch.matmul(x, adapted_weight)

In [29]:
from transformers.pytorch_utils import Conv1D
def apply_peft_to_layer(module, alpha=4, rank=8, type='lora'):
    """
    Recursively applies LoRA/DoRA to the appropriate layers in the model.
    
    Args:
        module: The current module to examine and possibly replace.
        alpha: Scaling factor for DoRA.
        rank: The rank of the low-rank adaptation.
    
    Returns:
        None (modifies the module in place).
    """
    peft_module = LoRA if type == 'lora' else DoRA
    for name, child_module in module.named_children():        
        if isinstance(child_module, Conv1D) and 'c_attn' in name:
            # Replace with DoRA version of the module
            setattr(module, name, peft_module(child_module, alpha=alpha, rank=rank))
        
        # If the module has children, apply the function recursively
        if len(list(child_module.children())) > 0:
            apply_peft_to_layer(child_module, alpha, rank, type)

def get_peft_model(alpha=4, rank=8, type='lora'):
    """
    Load the model and apply LoRA/DoRA recursively to all applicable layers.
    
    Args:
        model_name: The name of the model to load.
        alpha: Scaling factor for DoRA.
        rank: Rank for low-rank adaptation in DoRA.
    
    Returns:
        The model with LoRA/DoRA applied.
    """
    # Load the model and set the pad token ID
    model = AutoModelForSequenceClassification.from_pretrained(model_name, ignore_mismatched_sizes=True).to(device)
    model.config.pad_token_id = tokenizer.pad_token_id
    for param in model.parameters():
        param.requires_grad = False

    # Apply DoRA recursively to all relevant layers
    apply_peft_to_layer(model, alpha=alpha, rank=rank, type=type)
    
    return model

# Custom LoRA Implementation

In [30]:
model = get_peft_model(alpha=4, rank=4, type='lora').to(device)
print_count_parameters(model)

Some weights of GPT2ForSequenceClassification were not initialized from the model checkpoint at openai-community/gpt2-large and are newly initialized: ['score.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Total Parameters: 774631680
Trainable: 737280 (0.10%)
Frozen: 773894400 (99.90%)


In [31]:
start = time.time()

custom_lora_accuracy, custom_lora_loss = train_model(model, train_loader, epochs=5)

end = time.time()

custom_lora_tuning_time = end - start
print(f"Training time: {custom_lora_tuning_time:.2f} seconds")

  9%|▊         | 27/313 [00:35<06:18,  1.32s/it] 


KeyboardInterrupt: 

In [None]:
saved_time = full_tuning_time - custom_lora_tuning_time
saved_percent = (saved_time / full_tuning_time) * 100

print(f"Custom LoRA saved {saved_time:.2f} seconds ({saved_percent:.2f}%) in training time.")

In [None]:
custom_lora_val_accuracy = evaluate_model(model, val_loader)
print(f"Custom LoRA Validation Accuracy: {lora_val_accuracy}")

# DoRA Fine-tuning

In [20]:
model = get_peft_model(alpha=4, rank=4, type='dora').to(device)
print_count_parameters(model)

Some weights of GPT2ForSequenceClassification were not initialized from the model checkpoint at openai-community/gpt2-large and are newly initialized: ['score.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Total Parameters: 774769920
Trainable: 875520 (0.11%)
Frozen: 773894400 (99.89%)


In [24]:
start = time.time()

dora_accuracy, dora_loss = train_model(model, train_loader, epochs=5)

end = time.time()

dora_tuning_time = end - start
print(f"Training time: {dora_tuning_time:.2f} seconds")

100%|██████████| 313/313 [04:15<00:00,  1.23it/s]


Epoch 1, Loss: 0.6422119356763248
Accuracy: 0.6478
----------------------------------------


100%|██████████| 313/313 [04:25<00:00,  1.18it/s]


Epoch 2, Loss: 0.32359869419909515
Accuracy: 0.8668
----------------------------------------


 13%|█▎        | 41/313 [00:38<04:14,  1.07it/s]


KeyboardInterrupt: 

In [None]:
saved_time = full_tuning_time - dora_tuning_time
saved_percent = (saved_time / full_tuning_time) * 100

print(f"DoRA saved {saved_time:.2f} seconds ({saved_percent:.2f}%) in training time.")

In [None]:
dora_val_accuracy = evaluate_model(model, val_loader)
print(f"DoRA Validation Accuracy: {dora_val_accuracy}")