<a href="https://colab.research.google.com/github/sarthak026/Advanced-Restaurant-Sentiment-Exploration/blob/main/Untitled26.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import pandas as pd
import numpy as np
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from transformers import RobertaTokenizer, RobertaForSequenceClassification, TrainingArguments, Trainer
import torch
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
from torch.utils.data import Dataset
from sklearn.utils.class_weight import compute_class_weight
import seaborn as sns
import matplotlib.pyplot as plt

# Preprocess the text
def preprocess_text(text):
    if isinstance(text, str):
        text = text.lower()
        tokens = word_tokenize(text)
        stop_words = set(stopwords.words('english'))
        filtered_tokens = [word for word in tokens if word.isalnum() and word not in stop_words]
        return ' '.join(filtered_tokens)
    else:
        return ''

df['processed_reviews'] = df['review'].apply(preprocess_text)

# Label reviews based on specific keywords
def label_review(text):
    if 'taste' in text or 'flavor' in text:
        return 'Taste of food'
    elif 'quality' in text or 'fresh' in text:
        return 'Quality of food'
    elif 'quantity' in text or 'portion' in text:
        return 'Quantity of food'
    elif 'location' in text or 'nearby' in text:
        return 'Location'
    elif 'delivery' in text or 'service' in text:
        return 'Delivery Service'
    elif 'health' in text or 'nutritious' in text:
        return 'Health Factor'
    elif 'ambiance' in text or 'decor' in text:
        return 'Ambiance'
    elif 'clean' in text or 'hygiene' in text:
        return 'Cleanliness'
    elif 'wait' in text or 'time' in text:
        return 'Wait Time'
    elif 'support' in text or 'customer' in text:
        return 'Customer Support'
    elif 'presentation' in text or 'plating' in text:
        return 'Food Presentation'
    elif 'accessibility' in text or 'convenience' in text:
        return 'Accessibility'
    elif 'safety' in text or 'compliance' in text:
        return 'Health and Safety Compliance'
    else:
        return 'Other'

df['label'] = df['processed_reviews'].apply(label_review)

# Check the class distribution
print("Class Distribution:\n", df['label'].value_counts())

# Visualize class distribution
sns.countplot(x='label', data=df)
plt.xticks(rotation=90)
plt.show()

dataset_size = len(df)
print(f"Dataset size: {dataset_size}")

num_test_samples = min(500, int(0.2 * dataset_size))

test_df = df.sample(n=num_test_samples, random_state=42)
train_df = df.drop(test_df.index)

train_texts = train_df['processed_reviews']
train_labels = train_df['label']
test_texts = test_df['processed_reviews']
test_labels = test_df['label']

print(f"Training set size: {len(train_df)}")
print(f"Test set size: {len(test_df)}")

tokenizer = RobertaTokenizer.from_pretrained('roberta-large')

def encode_texts(texts):
    return tokenizer(texts.tolist(), truncation=True, padding=True, max_length=512, return_tensors='pt')

train_encodings = encode_texts(train_texts)
test_encodings = encode_texts(test_texts)

# Create a custom dataset class for PyTorch
class ReviewsDataset(Dataset):
    def __init__(self, encodings, labels):
        self.encodings = encodings
        self.labels = labels

    def __getitem__(self, idx):
        item = {key: torch.tensor(val[idx]).clone().detach() for key, val in self.encodings.items()}
        item['labels'] = torch.tensor(self.labels[idx], dtype=torch.long)
        return item

    def __len__(self):
        return len(self.labels)

# Label encoding for the classes
label_encoder = LabelEncoder()
train_labels = label_encoder.fit_transform(train_labels)
test_labels = label_encoder.transform(test_labels)

train_dataset = ReviewsDataset(train_encodings, train_labels)
test_dataset = ReviewsDataset(test_encodings, test_labels)

# Calculate class weights to handle imbalance
class_weights = compute_class_weight('balanced', classes=np.unique(train_labels), y=train_labels)
class_weights = torch.tensor(class_weights, dtype=torch.float)

# Load the model
model = RobertaForSequenceClassification.from_pretrained('roberta-large', num_labels=len(label_encoder.classes_))

# Define a custom loss function with class weights
def custom_loss_fn(outputs, labels):
    loss_fct = torch.nn.CrossEntropyLoss(weight=class_weights.to(outputs.device))
    return loss_fct(outputs, labels)

# Compute metrics for evaluation
def compute_metrics(pred):
    labels = pred.label_ids
    preds = pred.predictions.argmax(-1)
    acc = accuracy_score(labels, preds)
    precision, recall, f1, _ = precision_recall_fscore_support(labels, preds, average='weighted')
    return {
        'accuracy': acc,
        'precision': precision,
        'recall': recall,
        'f1': f1,
    }

# Define training arguments
training_args = TrainingArguments(
    output_dir='./results',
    num_train_epochs=10,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    warmup_steps=100,
    weight_decay=0.01,
    logging_dir='./logs',
    logging_steps=10,
    eval_strategy="epoch",
    save_strategy="epoch",
    learning_rate=3e-5,
    load_best_model_at_end=True,
    metric_for_best_model='accuracy',
)

# Trainer with custom loss function
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    compute_metrics=compute_metrics,
    # Add custom loss function
    compute_loss=lambda model, inputs: custom_loss_fn(model(**inputs).logits, inputs["labels"]),
)

# Train the model
trainer.train()

# Evaluate the model
eval_results = trainer.evaluate()
print("Evaluation Results:", eval_results)

accuracy = eval_results.get('eval_accuracy', 'Not available')
precision = eval_results.get('eval_precision', 'Not available')
recall = eval_results.get('eval_recall', 'Not available')
f1 = eval_results.get('eval_f1', 'Not available')

print(f"Test Accuracy: {accuracy}")
print(f"Test Precision: {precision}")
print(f"Test Recall: {recall}")
print(f"Test F1 Score: {f1}")

# Confusion Matrix
predictions = trainer.predict(test_dataset).predictions.argmax(-1)
cm = confusion_matrix(test_labels, predictions)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=label_encoder.classes_, yticklabels=label_encoder.classes_)
plt.xlabel('Predicted')
plt.ylabel('True')
plt.show()
