## Importing library 

In [None]:
import torch
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from torch.utils.data import DataLoader, TensorDataset, random_split
from transformers import BertTokenizer, BertForSequenceClassification, AdamW
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, confusion_matrix, precision_recall_fscore_support
import pandas as pd
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.utils.class_weight import compute_class_weight
from sklearn.model_selection import train_test_split
from torch.cuda.amp import GradScaler, autocast
print("Libraries and functions imported.")


## Function to preprocess text for BERT

In [None]:
def preprocess_for_bert(texts, tokenizer, max_len=128):
    encoded_batch = tokenizer.batch_encode_plus(
        texts,
        add_special_tokens=True,
        padding='max_length',
        truncation=True,
        max_length=max_len,
        return_attention_mask=True,
        return_tensors='pt'
    )
    return encoded_batch['input_ids'], encoded_batch['attention_mask']

# Load the BERT tokenizer
print("Loading BERT tokenizer...")
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Loading and preprocessing dataset
print("Start: Loading and preprocessing dataset...")
df = pd.read_csv('clean_file.csv').sample(n=100000)  # this file is loaded after applying data cleaning and data tranformation step that are in CNN-BiLSTM.ipynb file.
df['content'] = df['content'].astype(str)

label_encoder = LabelEncoder()
labels = label_encoder.fit_transform(df['category'])
#input_ids, attention_masks = preprocess_for_bert(df['content'].tolist(), tokenizer)

# Select a sample text from the dataset
sample_text = df['content'].iloc[1]

# Step 1: Original Text
print("Original Text:")
print(sample_text)

# Step 2: Tokenization
tokens = tokenizer.tokenize(sample_text)
print("\nTokens:")
print(tokens)

# Step 3: Convert Tokens to IDs
token_ids = tokenizer.convert_tokens_to_ids(tokens)
print("\nToken IDs:")
print(token_ids)

# Step 4: Add Special Tokens and Create Attention Mask
# BERT requires special tokens at the start and end of each sentence
input_ids = tokenizer.encode(sample_text, add_special_tokens=True, max_length=128, truncation=True)
attention_mask = [10] * len(input_ids)
print("\nInput IDs with Special Tokens:")
print(input_ids)
print("\nAttention Mask:")
print(attention_mask)

# Step 5: Final Encoded Representation
encoded_representation = tokenizer.encode_plus(
    sample_text,
    add_special_tokens=True,
    max_length=128,
    padding='max_length',
    truncation=True,
    return_attention_mask=True
)
print("\nFinal Encoded Representation:")
print(encoded_representation)


# Stratified split
train_texts, test_texts, train_labels, test_labels = train_test_split(
    df['content'], labels, test_size=0.2, stratify=labels, random_state=42
)
train_texts, val_texts, train_labels, val_labels = train_test_split(
    train_texts, train_labels, test_size=0.125, stratify=train_labels, random_state=42  # 0.125 x 0.8 = 0.1
)

# Preprocess texts

train_input_ids, train_attention_masks = preprocess_for_bert(train_texts.tolist(), tokenizer)
val_input_ids, val_attention_masks = preprocess_for_bert(val_texts.tolist(), tokenizer)
test_input_ids, test_attention_masks = preprocess_for_bert(test_texts.tolist(), tokenizer)

# Create TensorDatasets
train_dataset = TensorDataset(train_input_ids, train_attention_masks, torch.tensor(train_labels))
val_dataset = TensorDataset(val_input_ids, val_attention_masks, torch.tensor(val_labels))
test_dataset = TensorDataset(test_input_ids, test_attention_masks, torch.tensor(test_labels))

print("End: Dataset loaded and preprocessed.")


## Checking if GPU available

In [None]:
# Check if a GPU is available and if not, use a CPU, this code is only for windows users. Mac users have to change the model to tensorflow.
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


## Calculating Class Weights for Imbalanced Data

In [None]:
print("Start: Calculating class weights...")
class_weights = compute_class_weight('balanced', classes=np.unique(train_labels), y=train_labels)
weights = torch.tensor(class_weights, dtype=torch.float).to(device)

# Define your loss function with these weights
criterion = torch.nn.CrossEntropyLoss(weight=weights)

print("End: Class weights calculated.")


## Efficient Data Loading

In [None]:
# Efficient Data Loaders
train_loader = DataLoader(train_dataset, shuffle=True, batch_size=16, num_workers=4, pin_memory=True)
val_loader = DataLoader(val_dataset, shuffle=False, batch_size=16, num_workers=4, pin_memory=True)
test_loader = DataLoader(test_dataset, shuffle=False, batch_size=16, num_workers=4, pin_memory=True)


## Training with Gradient Accumulation and Mixed Precision

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = BertForSequenceClassification.from_pretrained('bert-base-uncased', num_labels=3)  
model.to(device)
optimizer = AdamW(model.parameters(), lr=2e-5, weight_decay=0.01, betas=(0.9, 0.999), eps=1e-9)
use_cuda = torch.cuda.is_available()
scaler = GradScaler(enabled=use_cuda)
epochs = 4  
accumulation_steps = 4
early_stopping_patience = 3
best_val_loss = float('inf')
epochs_no_improve = 0

train_accuracies = []
val_accuracies = []
train_losses = []
val_losses = []
val_true_labels = []
val_pred_labels = []

for epoch_i in range(epochs):
    total_train_loss = 0
    total_val_loss = 0
    # Training phase with gradient accumulation
    model.train()
    total_loss, total_correct, total = 0, 0, 0
    for step, batch in tqdm(enumerate(train_loader), total=len(train_loader)):
        b_input_ids, b_input_mask, b_labels = tuple(t.to(device) for t in batch)
        model.zero_grad()        
        outputs = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask, labels=b_labels)
        loss = outputs.loss / accumulation_steps
        loss.backward()
        total_train_loss += loss.item()

        if (step + 1) % accumulation_steps == 0:
            optimizer.step()
            optimizer.zero_grad()

        # Calculate training accuracy
        logits = outputs.logits
        predictions = torch.argmax(logits, dim=1)
        total_correct += (predictions == b_labels).sum().item()
        total += b_labels.size(0)

    train_accuracy = total_correct / total
    train_accuracies.append(train_accuracy)
    print(f"Epoch {epoch_i + 1}/{epochs} Training Accuracy: {train_accuracy}")
    avg_train_loss = total_train_loss / len(train_loader)
    train_losses.append(avg_train_loss)
    print(f"Epoch {epoch_i + 1}/{epochs} Training Loss: {train_losses}")

    # Validation phase
    model.eval()
    total_eval_loss, total_correct, total = 0, 0, 0
    val_true_labels = []
    val_pred_labels = []
    #all_preds, all_labels = [], []
    for batch in tqdm(val_loader, desc="Validating"):
        b_input_ids, b_input_mask, b_labels = tuple(t.to(device) for t in batch)
        with torch.no_grad():
            outputs = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask, labels=b_labels)

        loss = outputs.loss.item()
        total_eval_loss += loss
        total_val_loss += outputs.loss.item()

        # For accuracy and confusion matrix
        logits = outputs.logits
        predictions = torch.argmax(logits, dim=1)
        total_correct += (predictions == b_labels).sum().item()
        total += b_labels.size(0)
        # Extend the list of true and predicted labels
        val_true_labels.extend(b_labels.cpu().numpy())
        val_pred_labels.extend(predictions)

    val_accuracy = total_correct / total
    val_accuracies.append(val_accuracy)
    avg_val_loss = total_eval_loss / len(val_loader)
    print(f"Epoch {epoch_i + 1}/{epochs} Validation Accuracy: {val_accuracy}")
    avg_val_loss = total_val_loss / len(val_loader)
    val_losses.append(avg_val_loss)
    print(f"Epoch {epoch_i + 1}/{epochs} Validation Loss: {val_losses}")
    
    # Early Stopping Check
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        epochs_no_improve = 0
    else:
        epochs_no_improve += 1
        if epochs_no_improve == early_stopping_patience:
            print("Early stopping triggered.")
            break

print("Training and validation completed.")


## Plotting Training and Validation Accuracy

In [None]:
# Plotting training and validation accuracy
plt.figure(figsize=(8, 4))
plt.plot(range(1, epochs+1), train_accuracies, label='Training Accuracy')
plt.plot(range(1, epochs+1), val_accuracies, label='Validation Accuracy')
plt.title('Training & Validation Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.show()


## Plot Training and Validation Loss

In [None]:
plt.figure(figsize=(8, 4))

# Plot training loss
plt.plot(range(1, epochs + 1), train_losses, label='Training Loss')

# Plot validation loss
plt.plot(range(1, epochs + 1), val_losses, label='Validation Loss')

# Adding title
plt.title('Training and Validation Loss')

# Adding labels
plt.xlabel('Epochs')
plt.ylabel('Loss')

# Adding legend
plt.legend()

# Show plot
plt.show()


## Plotting the Confusion Matrix

In [None]:
# Compute confusion matrix
cm = confusion_matrix(val_pred_labels, val_true_labels)

# Plot confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=label_encoder.classes_, yticklabels=label_encoder.classes_)
plt.title('Confusion Matrix')
plt.ylabel('Actual')
plt.xlabel('Predicted')
plt.show()


## Saving the model

In [None]:
model_save_path = 'model_BERT'  # Specify your path here

# Save the model
model.save_pretrained(model_save_path)

# Save the tokenizer associated with the model
tokenizer.save_pretrained(model_save_path)

print(f"Model and tokenizer have been saved to {model_save_path}")


## Loading the model

In [None]:
model_save_path = 'model_BERT'
model = BertForSequenceClassification.from_pretrained(model_save_path)
tokenizer = BertTokenizer.from_pretrained(model_save_path)
print("Model is loaded")

## Testing and Evaluation Metrics

In [None]:
# Check if CUDA is available and set the `use_cuda` variable
use_cuda = torch.cuda.is_available()
print("Start: Testing loop...")
criterion = torch.nn.CrossEntropyLoss()

model.eval()
test_preds = []
test_true_labels = []
test_loss = 0.0

# Iterate over the test data using 'test_loader'
for batch in tqdm(test_loader, desc="Testing"):
    b_input_ids, b_input_mask, b_labels = tuple(t.to(device) for t in batch)
    
    # Forward pass, calculate logit predictions
    with torch.no_grad():
        with autocast(enabled=use_cuda):
            outputs = model(b_input_ids, token_type_ids=None, attention_mask=b_input_mask)
            logits = outputs.logits

        # Calculate the loss
        loss = criterion(logits, b_labels)
        test_loss += loss.item()

    logits = logits.detach().cpu().numpy()
    label_ids = b_labels.to('cpu').numpy()

    # Store predictions and true labels
    test_preds.extend(np.argmax(logits, axis=1))
    test_true_labels.extend(label_ids)

# Calculate the average loss over the test set
avg_test_loss = test_loss / len(test_loader)
print(f"Average Test Loss: {avg_test_loss}")

print("End: Testing completed.")

# Calculate and print other metrics
accuracy = accuracy_score(test_true_labels, test_preds)
precision, recall, f1, _ = precision_recall_fscore_support(test_true_labels, test_preds, average='weighted')

print(f"Testing Accuracy: {accuracy}")
print(f"Testing Precision: {precision}")
print(f"Testing Recall: {recall}")
print(f"Testing F1 Score: {f1}")


## Predicting Sentiments of given sentences

In [None]:
def predict_class(text):
    '''Function to predict sentiment class of the passed text'''
    sentiment_classes = ['Negative', 'Neutral', 'Positive']
    max_len = 250

    # Tokenizing and encoding the text for BERT
    inputs = tokenizer.encode_plus(text, add_special_tokens=True, max_length=max_len, padding='max_length', truncation=True, return_tensors='pt')
    input_ids = inputs['input_ids']
    attention_mask = inputs['attention_mask']

    # Prediction
    with torch.no_grad():
        outputs = model(input_ids, attention_mask=attention_mask)
        predictions = torch.nn.functional.softmax(outputs.logits, dim=1)
        predicted_class = torch.argmax(predictions).item()

    print('The predicted sentiment is', sentiment_classes[predicted_class])


In [None]:
predict_class("The coffee, food, and service were all excellent! They offer a light and dark roast of coffee, I chose the light roast and it was great. I also had the Green Eggs & Ham sandwich which was made fresh to order and very delicious. As a cute touch they also gave me and other customers mini cupcakes for free. Will definitely be going back!")

In [None]:
predict_class("the service was okieshh! ~ ~ ~ consider only you don't have any option")