In [4]:
import torch
import pandas as pd
from torch.utils.data import DataLoader, Dataset
from transformers import RobertaTokenizer, RobertaForSequenceClassification, AdamW, get_linear_schedule_with_warmup
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [5]:
# Load the custom training data from a CSV file in Google Drive with specific encoding
custom_training_data = pd.read_csv('/content/drive/MyDrive/bert_custom_training_data.csv', encoding='ISO-8859-1')

In [6]:
# Count and display the frequency of each unique value in the 'sentiment' column
custom_training_data['sentiment'].value_counts()

Unnamed: 0_level_0,count
sentiment,Unnamed: 1_level_1
Neutral,1280
Negative,450
Positive,322
positive,1


In [7]:
# Filter and sample the required number of rows for each sentiment
positive_samples = custom_training_data[custom_training_data['sentiment'] == 'Positive'].sample(n=322, random_state=42)
negative_samples = custom_training_data[custom_training_data['sentiment'] == 'Negative'].sample(n=340, random_state=42)
neutral_samples = custom_training_data[custom_training_data['sentiment'] == 'Neutral'].sample(n=338, random_state=42)

# Concatenate the samples into a single DataFrame
custom_balanced_data = pd.concat([positive_samples, negative_samples, neutral_samples], ignore_index=True)

# Display the resulting DataFrame
custom_balanced_data

Unnamed: 0,listing_id,comments,keywords,sentiment
0,3.994421e+07,This house is extremely comfortable and beauti...,wheelchair,Positive
1,4.860920e+07,We loved this home. The home was exactly what...,elderly,Positive
2,3.382268e+07,"You are renting a floor in the house, not the ...",disability,Positive
3,1.026391e+07,We really enjoyed how the place was set up. It...,disabled,Positive
4,4.992154e+17,My wife is handicap and it was very accessible...,handicap,Positive
...,...,...,...,...
995,4.893358e+07,"Good location, nice spot for my sister, her hu...",elevator,Neutral
996,1.064886e+07,Very spacious apartment. Quick walk to the con...,"step, spacious",Neutral
997,5.689689e+17,This place is wonderful! Perfect for 5 night s...,"stair, large",Neutral
998,2.607599e+07,My team and I stayed here for our annual plann...,"accessible, spacious",Neutral


In [13]:
import pandas as pd
import torch
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from transformers import RobertaTokenizer, RobertaForSequenceClassification, Trainer, TrainingArguments
import numpy as np

# Label encoding
label_encoder = LabelEncoder()
custom_balanced_data['Sentiment'] = label_encoder.fit_transform(custom_balanced_data['sentiment'])

# Splitting train and test data
X_train, X_test, y_train, y_test = train_test_split(custom_balanced_data['comments'], custom_balanced_data['Sentiment'], test_size=0.2, random_state=42)

# Creating datasets
tokenizer = RobertaTokenizer.from_pretrained('roberta-base')
train_encodings = tokenizer(X_train.tolist(), truncation=True, padding=True, max_length=512)
test_encodings = tokenizer(X_test.tolist(), truncation=True, padding=True, max_length=512)

class ReviewsDataset(torch.utils.data.Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx])
        return item

    def __len__(self):
        return len(self.labels)

train_dataset = ReviewsDataset(train_encodings, y_train.tolist())
test_dataset = ReviewsDataset(test_encodings, y_test.tolist())

# Set weights based on class distribution
class_counts = np.bincount(y_train)
class_weights = 1. / class_counts
class_weights = torch.tensor(class_weights, dtype=torch.float)

# Load RoBERTa model
model = RobertaForSequenceClassification.from_pretrained('roberta-base', num_labels=len(label_encoder.classes_))

# Custom Trainer class to include weighted loss
class WeightedTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.get("labels")
        # Forward pass
        outputs = model(**inputs)
        logits = outputs.get("logits")
        # Compute loss with class weights
        loss_fct = torch.nn.CrossEntropyLoss(weight=class_weights.to(logits.device))
        loss = loss_fct(logits.view(-1, model.config.num_labels), labels.view(-1))
        return (loss, outputs) if return_outputs else loss

# Set model training arguments
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=3,
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    warmup_steps=500,
    weight_decay=0.01,
    logging_dir='./logs',
    logging_steps=10,
    evaluation_strategy="epoch",
)

# Define evaluation metrics
def compute_metrics(p):
    preds = np.argmax(p.predictions, axis=1)
    precision = np.sum((preds == p.label_ids) & (preds == 1)) / np.sum(preds == 1)
    recall = np.sum((preds == p.label_ids) & (preds == 1)) / np.sum(p.label_ids == 1)
    f1 = 2 * (precision * recall) / (precision + recall) if precision + recall > 0 else 0.0
    accuracy = np.mean(preds == p.label_ids)
    return {"accuracy": accuracy, "precision": precision, "recall": recall, "f1": f1}

# Trainer setup using WeightedTrainer
trainer = WeightedTrainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics
)

trainer.train()  # Train model

# Model evaluation
results = trainer.evaluate()
print("Evaluation Results:", results)


Some weights of RobertaForSequenceClassification were not initialized from the model checkpoint at roberta-base and are newly initialized: ['classifier.dense.bias', 'classifier.dense.weight', 'classifier.out_proj.bias', 'classifier.out_proj.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Epoch,Training Loss,Validation Loss,Accuracy,Precision,Recall,F1
1,0.5977,0.440335,0.845,0.855072,0.880597,0.867647
2,0.5907,0.525876,0.875,0.921875,0.880597,0.900763
3,0.3109,0.747765,0.83,0.948276,0.820896,0.88


Evaluation Results: {'eval_loss': 0.7477653622627258, 'eval_accuracy': 0.83, 'eval_precision': 0.9482758620689655, 'eval_recall': 0.8208955223880597, 'eval_f1': 0.8799999999999999, 'eval_runtime': 6.2823, 'eval_samples_per_second': 31.836, 'eval_steps_per_second': 7.959, 'epoch': 3.0}


In [14]:
model.save_pretrained('content/drive/MyDrive/roberta_custom_saved_model')
tokenizer.save_pretrained('content/drive/MyDrive/tokenizer_roberta_custom_saved_model')

('content/drive/MyDrive/tokenizer_roberta_custom_saved_model/tokenizer_config.json',
 'content/drive/MyDrive/tokenizer_roberta_custom_saved_model/special_tokens_map.json',
 'content/drive/MyDrive/tokenizer_roberta_custom_saved_model/vocab.json',
 'content/drive/MyDrive/tokenizer_roberta_custom_saved_model/merges.txt',
 'content/drive/MyDrive/tokenizer_roberta_custom_saved_model/added_tokens.json')

### LSTM Model

In [44]:
from google.colab import drive
import pandas as pd
import re
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from collections import Counter
from sklearn.model_selection import train_test_split
from nltk.corpus import stopwords
import numpy as np
import nltk

nltk.download('stopwords')

# Mount Google Drive
drive.mount('/content/drive')

# Load and balance dataset
custom_training_data = pd.read_csv('/content/drive/MyDrive/bert_custom_training_data.csv', encoding='ISO-8859-1')
positive_samples = custom_training_data[custom_training_data['sentiment'] == 'Positive'].sample(n=322, random_state=42)
negative_samples = custom_training_data[custom_training_data['sentiment'] == 'Negative'].sample(n=340, random_state=42)
neutral_samples = custom_training_data[custom_training_data['sentiment'] == 'Neutral'].sample(n=338, random_state=42)
custom_balanced_data = pd.concat([positive_samples, negative_samples, neutral_samples], ignore_index=True)

# Map sentiment to numerical values
custom_balanced_data['sentiment'] = custom_balanced_data['sentiment'].map({'Positive': 2, 'Negative': 0, 'Neutral': 1})

# Define hyperparameters
class HyperParams:
    PAD_INDEX = 0
    UNK_INDEX = 1
    PAD_TOKEN = '<pad>'
    UNK_TOKEN = '<unk>'
    STOP_WORDS = set(stopwords.words('english'))
    MAX_LENGTH = 128
    BATCH_SIZE = 32
    EMBEDDING_DIM = 300  # Increased embedding dimension
    HIDDEN_DIM = 256     # Increased hidden dimension
    OUTPUT_DIM = 3       # Positive, Negative, Neutral
    N_LAYERS = 3         # Increased LSTM layers
    DROPOUT_RATE = 0.5   # Increased dropout for regularization
    LR = 0.0003        # Lower learning rate
    N_EPOCHS = 15        # More epochs
    WD = 0
    SEED = 42
    BIDIRECTIONAL = True

hparams = HyperParams()

# Text Preprocessing
def preprocess_text(text):
    text = re.sub(r'[^a-zA-Z\s]', '', text, re.I | re.A).lower().strip()
    tokens = text.split()
    tokens = [word for word in tokens if word not in hparams.STOP_WORDS]
    return ' '.join(tokens)

custom_balanced_data['comments'] = custom_balanced_data['comments'].apply(preprocess_text)

# Split the data
X = custom_balanced_data['comments'].values
y = custom_balanced_data['sentiment'].values
x_train, x_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=hparams.SEED, stratify=y)
x_valid, x_test, y_valid, y_test = train_test_split(x_temp, y_temp, test_size=2/3, random_state=hparams.SEED, stratify=y_temp)

# Build Vocabulary
def build_vocab(x_train, min_freq=5):
    word_counter = Counter()
    for review in x_train:
        word_counter.update(review.split())
    vocab = {word: i+2 for i, word in enumerate([w for w, f in word_counter.items() if f >= min_freq])}
    vocab[hparams.PAD_TOKEN] = hparams.PAD_INDEX
    vocab[hparams.UNK_TOKEN] = hparams.UNK_INDEX
    return vocab

vocab = build_vocab(x_train)

# Tokenize
def tokenize(vocab, text):
    return [vocab.get(word, vocab['<unk>']) for word in text.split()]

# Dataset Class
class CustomDataset(Dataset):
    def __init__(self, x, y, vocab):
        self.x = x
        self.y = y
        self.vocab = vocab

    def __getitem__(self, idx):
        text, label = self.x[idx], self.y[idx]
        tokenized_text = tokenize(self.vocab, text)
        if len(tokenized_text) > hparams.MAX_LENGTH:
            tokenized_text = tokenized_text[:hparams.MAX_LENGTH]
        else:
            tokenized_text += [self.vocab[hparams.PAD_TOKEN]] * (hparams.MAX_LENGTH - len(tokenized_text))
        return torch.tensor(tokenized_text), torch.tensor(label)

    def __len__(self):
        return len(self.x)

# Collate Function
def collate_fn(batch):
    texts, labels = zip(*batch)
    texts = torch.stack(texts)
    labels = torch.stack(labels)
    return texts, labels

# LSTM Model
class LSTMModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, n_layers, dropout_rate, pad_index, bidirectional):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_index)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=n_layers, bidirectional=bidirectional, dropout=dropout_rate, batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2 if bidirectional else hidden_dim, output_dim)
        self.dropout = nn.Dropout(dropout_rate)

    def forward(self, text):
        embedded = self.dropout(self.embedding(text))
        lstm_out, (hidden, _) = self.lstm(embedded)
        if self.lstm.bidirectional:
            hidden = torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)
        else:
            hidden = hidden[-1,:,:]
        return self.fc(hidden)

# Prepare Datasets and DataLoaders
train_dataset = CustomDataset(x_train, y_train, vocab)
valid_dataset = CustomDataset(x_valid, y_valid, vocab)
test_dataset = CustomDataset(x_test, y_test, vocab)
train_loader = DataLoader(train_dataset, batch_size=hparams.BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
valid_loader = DataLoader(valid_dataset, batch_size=hparams.BATCH_SIZE, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=hparams.BATCH_SIZE, collate_fn=collate_fn)

# Initialize Model, Optimizer, and Loss Function
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = LSTMModel(len(vocab), hparams.EMBEDDING_DIM, hparams.HIDDEN_DIM, hparams.OUTPUT_DIM, hparams.N_LAYERS, hparams.DROPOUT_RATE, hparams.PAD_INDEX, hparams.BIDIRECTIONAL).to(device)

optimizer = optim.Adam(model.parameters(), lr=hparams.LR)
criterion = nn.CrossEntropyLoss()

# Training and Evaluation
def train(model, loader, optimizer, criterion):
    model.train()
    epoch_loss, epoch_acc = 0, 0
    for texts, labels in loader:
        texts, labels = texts.to(device), labels.to(device)
        optimizer.zero_grad()
        predictions = model(texts)
        loss = criterion(predictions, labels)
        acc = (predictions.argmax(1) == labels).sum().item() / len(labels)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
        epoch_acc += acc
    return epoch_loss / len(loader), epoch_acc / len(loader)

def evaluate(model, loader, criterion):
    model.eval()
    epoch_loss, epoch_acc = 0, 0
    with torch.no_grad():
        for texts, labels in loader:
            texts, labels = texts.to(device), labels.to(device)
            predictions = model(texts)
            loss = criterion(predictions, labels)
            acc = (predictions.argmax(1) == labels).sum().item() / len(labels)
            epoch_loss += loss.item()
            epoch_acc += acc
    return epoch_loss / len(loader), epoch_acc / len(loader)

# Training Loop
for epoch in range(hparams.N_EPOCHS):
    train_loss, train_acc = train(model, train_loader, optimizer, criterion)
    valid_loss, valid_acc = evaluate(model, valid_loader, criterion)
    print(f"Epoch {epoch+1}/{hparams.N_EPOCHS}")
    print(f"Train Loss: {train_loss:.3f}, Train Acc: {train_acc*100:.2f}%")
    print(f"Valid Loss: {valid_loss:.3f}, Valid Acc: {valid_acc*100:.2f}%")

# Test Evaluation
test_loss, test_acc = evaluate(model, test_loader, criterion)
print(f"Test Loss: {test_loss:.3f}, Test Acc: {test_acc*100:.2f}%")


[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Epoch 1/15
Train Loss: 1.099, Train Acc: 30.93%
Valid Loss: 1.098, Valid Acc: 35.94%
Epoch 2/15
Train Loss: 1.093, Train Acc: 41.44%
Valid Loss: 1.098, Valid Acc: 30.47%
Epoch 3/15
Train Loss: 1.069, Train Acc: 48.07%
Valid Loss: 1.066, Valid Acc: 33.59%
Epoch 4/15
Train Loss: 0.988, Train Acc: 55.95%
Valid Loss: 1.076, Valid Acc: 43.75%
Epoch 5/15
Train Loss: 0.898, Train Acc: 58.44%
Valid Loss: 0.932, Valid Acc: 57.81%
Epoch 6/15
Train Loss: 0.795, Train Acc: 65.46%
Valid Loss: 1.022, Valid Acc: 48.44%
Epoch 7/15
Train Loss: 0.736, Train Acc: 68.71%
Valid Loss: 0.950, Valid Acc: 53.91%
Epoch 8/15
Train Loss: 0.686, Train Acc: 69.40%
Valid Loss: 1.003, Valid Acc: 56.25%
Epoch 9/15
Train Loss: 0.640, Train Acc: 73.15%
Valid Loss: 1.250, Valid Acc: 42.97%
Epoch 10/15
Train Loss: 0.648, Train Acc: 71.31%
Valid Loss: 1.037, Valid Acc: 53.12%
Epoch 11/15
Train Lo

### GPT2 Model

In [28]:
# Install necessary libraries
!pip install transformers datasets peft

# Import libraries
import os
import torch
import numpy as np
import pandas as pd
from torch.utils.data import DataLoader
from sklearn.preprocessing import LabelEncoder
from transformers import AutoTokenizer, GPT2Model, Trainer, TrainingArguments, DataCollatorWithPadding
from peft import LoraConfig, get_peft_model, TaskType
import torch.nn as nn



In [17]:
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

Using device: cuda


In [45]:
# Split the data
X_train, X_temp, y_train, y_temp = train_test_split(
    custom_balanced_data['comments'].values, custom_balanced_data['sentiment'].values,
    test_size=0.3, random_state=42, stratify=custom_balanced_data['sentiment']
)
X_valid, X_test, y_valid, y_test = train_test_split(
    X_temp, y_temp, test_size=0.5, random_state=42, stratify=y_temp
)

# Load GPT-2 tokenizer and model
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
tokenizer.pad_token = tokenizer.eos_token  # Set padding token to EOS token for compatibility

# Initialize the GPT-2 model for sequence classification
model = GPT2ForSequenceClassification.from_pretrained("gpt2", num_labels=3)
model.config.pad_token_id = tokenizer.eos_token_id  # Set padding token ID in model configuration

# Create a custom Dataset class
class CustomDataset(Dataset):
    def __init__(self, texts, labels, tokenizer, max_length=128):
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        label = self.labels[idx]
        encoding = self.tokenizer(
            text, padding='max_length', truncation=True, max_length=self.max_length, return_tensors='pt'
        )
        input_ids = encoding['input_ids'].squeeze()  # Remove extra batch dimension
        attention_mask = encoding['attention_mask'].squeeze()  # Remove extra batch dimension
        return {'input_ids': input_ids, 'attention_mask': attention_mask, 'labels': torch.tensor(label)}

# Prepare datasets
train_dataset = CustomDataset(X_train, y_train, tokenizer)
valid_dataset = CustomDataset(X_valid, y_valid, tokenizer)
test_dataset = CustomDataset(X_test, y_test, tokenizer)

# Training arguments
training_args = TrainingArguments(
    output_dir='./results',
    evaluation_strategy="epoch",
    save_strategy="epoch",
    num_train_epochs=3,
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    warmup_steps=100,
    weight_decay=0.01,
    logging_dir='./logs',
    logging_steps=10,
    load_best_model_at_end=True,
)

# Define a function to compute accuracy
def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    accuracy = (preds == labels).mean()
    return {"accuracy": accuracy}

# Set up Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=valid_dataset,
    compute_metrics=compute_metrics
)

# Train the model
trainer.train()

# Evaluate the model on the test set
test_results = trainer.evaluate(test_dataset)
print("Test Results:", test_results)

Some weights of GPT2ForSequenceClassification were not initialized from the model checkpoint at gpt2 and are newly initialized: ['score.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Epoch,Training Loss,Validation Loss,Accuracy
1,1.2285,1.030749,0.5
2,0.5215,0.695812,0.713333
3,0.8474,0.913883,0.74


Test Results: {'eval_loss': 0.9189814329147339, 'eval_accuracy': 0.7066666666666667, 'eval_runtime': 1.2822, 'eval_samples_per_second': 116.985, 'eval_steps_per_second': 29.636, 'epoch': 3.0}
