In [None]:
%%capture
%pip install -U bitsandbytes
%pip install -U transformers
%pip install -U accelerate
%pip install -U peft
%pip install -U trl
%pip install -U wandb

In [None]:
import numpy as np
import pandas as pd
import os
from tqdm import tqdm
import bitsandbytes as bnb
import torch
import torch.nn as nn
import transformers
from datasets import Dataset
from peft import LoraConfig, PeftConfig
from trl import SFTTrainer
from transformers import (AutoModelForCausalLM, 
                          AutoTokenizer, 
                          BitsAndBytesConfig, 
                          TrainingArguments, 
                          pipeline, 
                          logging)
from sklearn.metrics import (accuracy_score, 
                             classification_report, 
                             confusion_matrix)
from sklearn.model_selection import train_test_split

# Load your dataset
train_data = pd.read_csv('/kaggle/input/bengali-hate/RawData.csv')
test_data = pd.read_csv('/kaggle/input/bengali-hate/testing_data.csv')

# Shuffle the DataFrame
train_data = train_data.sample(frac=1, random_state=85)

# Split the training data into training and validation sets
train_data, val_data = train_test_split(train_data, test_size=0.1, random_state=85)


In [None]:
# Define the prompt generation functions
def generate_prompt(data_point):
    return f"""
            Classify the text into Hate Speech, Normal Speech and return the answer as the corresponding Sentiment label.
text: {data_point["comment"]}
label: {data_point["Sentiment"]}""".strip()

def generate_test_prompt(data_point):
    return f"""
            Classify the text into Hate Speech, Normal Speech  and return the answer as the corresponding Sentiment label.
text: {data_point["comment"]}
label: """.strip("Sentiment")

# Generate prompts for training, validation, and test data
train_data.loc[:,'comment'] = train_data.apply(generate_prompt, axis=1)
val_data.loc[:,'comment'] = val_data.apply(generate_prompt, axis=1)

# Generate test prompts and extract true labels
y_true = test_data.loc[:,'Sentiment']
test_data = pd.DataFrame(test_data.apply(generate_test_prompt, axis=1), columns=["comment"])



In [None]:
# Convert to datasets
train_dataset = Dataset.from_pandas(train_data[["comment"]])
val_dataset = Dataset.from_pandas(val_data[["comment"]])
test_dataset = Dataset.from_pandas(test_data[["comment"]])

base_model_name = "facebook/opt-350m"  # Example base model, replace with your desired model

bnb_config = BitsAndBytesConfig(
    load_in_8bit=True,
)

model = AutoModelForCausalLM.from_pretrained(
    base_model_name,
    device_map="auto",
    torch_dtype=torch.float16,
    quantization_config=bnb_config, 
)

model.config.use_cache = False
model.config.pretraining_tp = 1

tokenizer = AutoTokenizer.from_pretrained(base_model_name)
tokenizer.pad_token_id = tokenizer.eos_token_id

def predict(test, model, tokenizer):
    y_pred = []
    categories = ["Hate Speech", "Normal Speech"]
    
    for i in tqdm(range(len(test))):
        prompt = test.iloc[i]["comment"]
        pipe = pipeline(task="text-generation", 
                        model=model, 
                        tokenizer=tokenizer, 
                        max_new_tokens=2, 
                        temperature=0.1)
        
        result = pipe(prompt)
        answer = result[0]['generated_text'].split("label:")[-1].strip()
        
        # Determine the predicted category
        for category in categories:
            if category.lower() in answer.lower():
                y_pred.append(category)
                break
        else:
            y_pred.append("none")
    
    return y_pred

def evaluate(y_true, y_pred):
    labels = ["Hate Speech", "Normal Speech"]
    mapping = {label: idx for idx, label in enumerate(labels)}
    
    def map_func(x):
        return mapping.get(x, -1)  # Map to -1 if not found, but should not occur with correct data
    
    y_true_mapped = np.vectorize(map_func)(y_true)
    y_pred_mapped = np.vectorize(map_func)(y_pred)
    
    # Calculate accuracy
    accuracy = accuracy_score(y_true=y_true_mapped, y_pred=y_pred_mapped)
    print(f'Accuracy: {accuracy:.3f}')
    
    # Generate accuracy report
    unique_labels = set(y_true_mapped)  # Get unique labels
    
    for label in unique_labels:
        label_indices = [i for i in range(len(y_true_mapped)) if y_true_mapped[i] == label]
        label_y_true = [y_true_mapped[i] for i in label_indices]
        label_y_pred = [y_pred_mapped[i] for i in label_indices]
        label_accuracy = accuracy_score(label_y_true, label_y_pred)
        print(f'Accuracy for label {labels[label]}: {label_accuracy:.3f}')
        
    
    class_report = classification_report(y_true=y_true_mapped, y_pred=y_pred_mapped, target_names=labels, labels=list(range(len(labels))))
    print('\nClassification Report:')
    print(class_report)
    
    # Generate confusion matrix
    conf_matrix = confusion_matrix(y_true=y_true_mapped, y_pred=y_pred_mapped, labels=list(range(len(labels))))
    print('\nConfusion Matrix:')
    print(conf_matrix)

import bitsandbytes as bnb

def find_all_linear_names(model):
    cls = bnb.nn.Linear8bitLt
    lora_module_names = set()
    for name, module in model.named_modules():
        if isinstance(module, cls):
            names = name.split('.')
            lora_module_names.add(names[0] if len(names) == 1 else names[-1])
    if 'lm_head' in lora_module_names:  # needed for 16 bit
        lora_module_names.remove('lm_head')
    return list(lora_module_names)

modules = find_all_linear_names(model)
modules

output_dir="/kaggle/working/"

peft_config = LoraConfig(
    lora_alpha=16,
    lora_dropout=0,
    r=64,
    bias="none",
    task_type="CAUSAL_LM",
    target_modules=modules,
)

training_arguments = TrainingArguments(
    output_dir=output_dir,                    
    num_train_epochs=10,                      
    per_device_train_batch_size=1,           
    gradient_accumulation_steps=8,           
    gradient_checkpointing=True,              
    optim="paged_adamw_32bit",
    logging_steps=1,                         
    learning_rate=2e-4,                       
    weight_decay=0.001,
    fp16=True,
    bf16=False,
    max_grad_norm=0.3,                        
    max_steps=-1,
    warmup_ratio=0.03,                      
    group_by_length=False,
    lr_scheduler_type="cosine",            
    report_to="wandb",                  
    eval_strategy="steps",              
    eval_steps = 0.2
)

trainer = SFTTrainer(
    model=model,
    args=training_arguments,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    peft_config=peft_config,
    dataset_text_field="comment",
    tokenizer=tokenizer,
    max_seq_length=512,
    packing=False,
    dataset_kwargs={
    "add_special_tokens": False,
    "append_concat_token": False,
    }
)

trainer.train()

import wandb
wandb.finish()
model.config.use_cache = True

trainer.save_model(output_dir)
tokenizer.save_pretrained(output_dir)

y_pred = predict(test_dataset, model, tokenizer)
evaluate(y_true, y_pred)