## Oğuz Kağan Hitit, Koç University, 2024

## Installing the dependencies

In [None]:
!pip install datasets
!pip install git+https://github.com/huggingface/peft
!pip install wandb

## Generating the model


In [None]:
import torch
from transformers import GPT2LMHeadModel, GPT2Tokenizer, GPT2Config
from datasets import load_dataset
from peft import LoraConfig, get_peft_model
from torch.utils.data import DataLoader
import wandb
from tqdm import tqdm
from sklearn.metrics import precision_score, recall_score, f1_score

class SentimentModel:
    def __init__(self, model_name='gpt2-medium-peft-lora', config=None):
        self.model_name = model_name
        self.model_dir = '/content/drive/MyDrive/comp542_assignment3/lora-gpt-peft'
        self.tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
        self.tokenizer.pad_token = self.tokenizer.eos_token

        self.config = GPT2Config.from_pretrained('gpt2', **config)
        base_model = GPT2LMHeadModel.from_pretrained('gpt2', config=self.config)
        print('Using GPT2-medium...')
        lora_config = LoraConfig(r=config['lora_r'], lora_alpha=config['lora_alpha'],
                                 lora_dropout=config['dropout'], task_type="CAUSAL_LM")
        print('lora_r:', config['lora_r'])
        print('alpha:', config['lora_alpha'])
        print('dropout:', config['dropout'])
        print('lr:', config['learning_rate'])
        self.model = get_peft_model(base_model, lora_config)
        self.model.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))

        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=config['learning_rate'])
        self.train_loader = None
        self.val_loader = None
        self.test_dataset = None

        wandb.init(project="lora_finetune", entity="ohitit20", config=config)
        wandb.watch(self.model, log='all')

    def prepare_data(self):
        dataset = load_dataset("rotten_tomatoes")

        train_dataset = dataset["train"].map(self.add_instruction_finetuning)
        train_dataset = train_dataset.map(self.preprocess_data, batched=True)
        self.train_loader = DataLoader(train_dataset.with_format("torch"), batch_size=config['batch_size'], shuffle=True)

        val_dataset = dataset['validation'].map(self.add_instruction_finetuning)
        val_dataset = val_dataset.map(self.preprocess_data, batched=True)
        self.val_loader = DataLoader(val_dataset.with_format('torch'), batch_size=config['batch_size'], shuffle=True)

        self.test_dataset = dataset['test']
        print(f'Dataset loaded with {len(self.train_loader.dataset)} training samples.')

    def add_instruction_finetuning(self, rec):
        instruction = "Label the sentiment of the following sentence"
        INSTRUCTION_TEMPLATE = "{}\nSentence:{}\nLabel:{}"
        label = 'positive' if rec['label'] == 1 else 'negative'
        rec["instr_tuned_text"] = INSTRUCTION_TEMPLATE.format(instruction, rec['text'], label)
        return rec

    def preprocess_data(self, data):
        return self.tokenizer(data['instr_tuned_text'], return_tensors='pt', padding=True, truncation=True, max_length=512)

    def train(self, epochs):
      print('Training loop:')
      self.model.train()
      for epoch in range(epochs):
          self.optimizer.zero_grad()
          total_loss = 0
          total_val_loss = 0

          for step, batch in enumerate(tqdm(self.train_loader, desc=f'Epoch {epoch+1}')):
              inputs = {k: v.to(self.model.device) for k, v in batch.items() if k in self.tokenizer.model_input_names}
              outputs = self.model(**inputs, labels=inputs["input_ids"])
              loss = outputs.loss
              loss = loss / config['gradient_accumulation_steps']
              loss.backward()

              if (step + 1) % config['gradient_accumulation_steps'] == 0:
                  self.optimizer.step()
                  self.optimizer.zero_grad()

              total_loss += loss.item()

              if (step + 1) % 50 == 0:
                  print(f'Batch {step+1}, Training Loss: {loss.item()}')

          self.model.eval()
          with torch.no_grad():
              for val_batch in self.val_loader:
                  val_inputs = {k: v.to(self.model.device) for k, v in val_batch.items() if k in self.tokenizer.model_input_names}
                  val_outputs = self.model(**val_inputs, labels=val_inputs["input_ids"])
                  val_loss = val_outputs.loss
                  total_val_loss += val_loss.item()

          avg_train_loss = total_loss / len(self.train_loader)
          avg_val_loss = total_val_loss / len(self.val_loader)
          wandb.log({"epoch": epoch + 1, "training_loss": avg_train_loss, "validation_loss": avg_val_loss,
                       "learning_rate": config['learning_rate'], "lora_r": config['lora_r'], "lora_alpha": config['lora_alpha']})
          print(f"Epoch {epoch + 1}, Training Loss: {avg_train_loss}, Validation Loss: {avg_val_loss}")
          torch.save(self.model.state_dict(), f"{self.model_dir}/{self.model_name}_epoch{epoch+1}.pt")
          self.model.train()

    def evaluate(self):
      predictions = []
      targets = []
      positive_predictions = 0
      negative_predictions = 0
      counter = 0

      instruction = 'Label the sentiment of following sentence'
      INSTRUCTION_TEMPLATE = "{}:\nSentence:{}\nLabel:"
      for example in self.test_dataset:
          prompt_text = INSTRUCTION_TEMPLATE.format(instruction, example['text'])
          target_label = 'positive' if example['label'] == 1 else 'negative'
          targets.append(target_label)

          generated_text = self.generate_text(prompt_text)
          if('<|endoftext|>' in generated_text):
              generated_text = generated_text.split('<|endoftext|>')[0]
          positive = 'positive' in generated_text
          negative = 'negative' in generated_text

          if positive and negative:
              predicted_label = 'null'
          elif positive:
              predicted_label = 'positive'
              positive_predictions += 1
          elif negative:
              predicted_label = 'negative'
              negative_predictions += 1
          else:
              predicted_label = 'null'
          predictions.append(predicted_label)
          if(counter % 5 == 2):
              print('Prompt text:' + prompt_text)
              print('Generated text:' + generated_text)
              print('Target label:' + target_label)
              print('Predicted label:' + predicted_label)
          counter += 1

      accuracy = sum([1 for pred, target in zip(predictions, targets) if pred == target]) / len(targets)
      precision = precision_score(targets, predictions, labels=["positive", "negative"], average='micro')
      recall = recall_score(targets, predictions, labels=["positive", "negative"], average='micro')
      f1 = f1_score(targets, predictions, labels=["positive", "negative"], average='micro')
      print('Accuracy:', accuracy, 'Precision:', precision, 'Recall:', recall, 'F1:', f1)
      return accuracy, positive_predictions, negative_predictions

    def generate_text(self, input_text):
        inputs = self.tokenizer.encode(input_text, return_tensors='pt')
        inputs = inputs.to(self.model.device)
        with torch.no_grad():
            outputs = self.model.generate(inputs, max_length=512, num_return_sequences=1)
        generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
        return generated_text

## Training and evaluation


In [None]:
config = {
    "learning_rate": 5e-4,
    "epochs": 1,
    "batch_size": 1,
    "lora_r": 16,
    "lora_alpha": 64,
    "dropout": 0.05,
    "max_iters": 50,
    "gradient_accumulation_steps": 32,
}

model = SentimentModel(config=config)
model.prepare_data()
model.train(epochs=config['epochs'])
model.evaluate()
