In [None]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

Mounted at /content/gdrive


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from torch.optim.lr_scheduler import ReduceLROnPlateau
import random

# Define a simple dataset class
class EmotionDataset(Dataset):
    def __init__(self, texts, labels, word_to_idx):
        self.texts = texts
        self.labels = labels
        self.word_to_idx = word_to_idx

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = [self.word_to_idx[word] for word in self.texts[idx].split()]

        # Try to convert the label to an integer, handle non-numeric labels
        try:
            label = torch.as_tensor(int(self.labels[idx])).clone().detach()
        except ValueError:
            # Handle the case where the label is not a valid integer
            label = torch.as_tensor(0)  # Set a default value or handle it as appropriate

        return {'text': torch.LongTensor(text), 'label': label}

def collate_batch(batch):
    texts = [item['text'] for item in batch]
    labels = [item['label'] for item in batch]

    # Pad sequences to the same length within each batch
    padded_texts = pad_sequence(texts, batch_first=True, padding_value=0)

    return {'text': padded_texts, 'label': torch.stack(labels)}

def read_data_from_csv(file_path):
    texts = []
    labels = []
    with open(file_path, 'r') as file:
        next(file)  # Skip header if exists
        for line in file:
            parts = line.strip().split(',')
            if len(parts) >= 2:
                texts.append(parts[0])
                labels.append(parts[1].strip('\"'))
    return texts, labels

# Text preprocessing
class EmotionAnalysisModel(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super(EmotionAnalysisModel, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.batch_norm = nn.BatchNorm1d(hidden_dim)  # Add batch normalization
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        embedded = self.embedding(x)
        lstm_out, _ = self.lstm(embedded)
        lstm_out = lstm_out[:, -1, :]
        lstm_out = self.batch_norm(lstm_out)  # Apply batch normalization
        lstm_out = self.dropout(lstm_out)  # Apply dropout
        out = self.fc(lstm_out)
        return out

# Read data from CSV file
file_path = '/content/gdrive/MyDrive/DS677 - Fall 23 - DL Project - Paresh, Ojaswi, Dinesh/TextAndEmotions.csv'
texts, labels = read_data_from_csv(file_path)

# Create vocabulary and word_to_idx mapping
vocab = set(' '.join(texts).split())
word_to_idx = {word: idx + 1 for idx, word in enumerate(vocab)}  # Add 1 to reserve index 0 for padding

# Update the dataset and pad sequences
train_texts, test_texts, train_labels, test_labels = train_test_split(texts, labels, test_size=0.2, random_state=42)

train_dataset = EmotionDataset(train_texts, train_labels, word_to_idx)
test_dataset = EmotionDataset(test_texts, test_labels, word_to_idx)

batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_batch)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_batch)

# Use GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = EmotionAnalysisModel(vocab_size=len(word_to_idx) + 1, embedding_dim=50, hidden_dim=100, output_dim=len(set(labels))).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Learning rate scheduler
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5, verbose=True)

epochs = 50
for epoch in range(epochs):
    model.train()
    total_loss = 0

    for batch in train_loader:
        text, label = batch['text'].to(device), batch['label'].to(device)
        optimizer.zero_grad()
        output = model(text)
        loss = criterion(output, label)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    print(f'Epoch {epoch + 1}/{epochs}, Loss: {total_loss / len(train_loader)}')

    # Update the learning rate based on training loss
    scheduler.step(total_loss / len(train_loader))

model.eval()
all_predictions = []
all_labels = []

with torch.no_grad():
    for batch in test_loader:
        text, label = batch['text'].to(device), batch['label'].to(device)
        output = model(text)
        predictions = torch.argmax(output, dim=1)
        all_predictions.extend(predictions.cpu().numpy())
        all_labels.extend(label.cpu().numpy())

accuracy = accuracy_score(all_labels, all_predictions)
print(f'Test Accuracy: {accuracy * 100:.2f}%')


Epoch 1/50, Loss: 5.212891896565755
Epoch 2/50, Loss: 4.99224673377143
Epoch 3/50, Loss: 4.902364730834961
Epoch 4/50, Loss: 4.60579087999132
Epoch 5/50, Loss: 4.311837037404378
Epoch 6/50, Loss: 3.9687681198120117
Epoch 7/50, Loss: 3.5991027620103626
Epoch 8/50, Loss: 3.12363330523173
Epoch 9/50, Loss: 2.501327223247952
Epoch 10/50, Loss: 2.0091420544518366
Epoch 11/50, Loss: 1.4027652210659451
Epoch 12/50, Loss: 1.1353367567062378
Epoch 13/50, Loss: 0.675278902053833
Epoch 14/50, Loss: 0.5552616500192218
Epoch 15/50, Loss: 0.3590041597684224
Epoch 16/50, Loss: 0.19627143401238653
Epoch 17/50, Loss: 0.13862000323004192
Epoch 18/50, Loss: 0.12142143522699674
Epoch 19/50, Loss: 0.08268820266756746
Epoch 20/50, Loss: 0.059918465092778206
Epoch 21/50, Loss: 0.04743958595726225
Epoch 22/50, Loss: 0.09386506314492887
Epoch 23/50, Loss: 0.04212546803885036
Epoch 24/50, Loss: 0.034734474081132144
Epoch 25/50, Loss: 0.04235730232256982
Epoch 26/50, Loss: 0.023594688727623887
Epoch 27/50, Loss:

In [None]:
# Sample inputs
sample_inputs = [
    "I love the new feature!",
    "I am satisfied with the customer service.",
    "This movie is not good!",
    "This software have lots of features.",
    "Incredible customer experience!",
    "I am anxious"
]

# Predict emotions for each sample input
for sample_input in sample_inputs:
    # Preprocess the sample input
    sample_input_indices = [word_to_idx[word] for word in sample_input.split() if word in word_to_idx]

    # Check if the sample input is empty after filtering
    if not sample_input_indices:
        print(f"No valid words found in the sample input: {sample_input}")
    else:
        # Convert to tensor and add batch dimension
        sample_input_tensor = torch.LongTensor(sample_input_indices).unsqueeze(0).to(device)

        # Pass through the trained model
        model.eval()
        with torch.no_grad():
            output = model(sample_input_tensor)

        # Interpret the model's output
        predicted_class = torch.argmax(output, dim=1).item()

        # Map the predicted class to the corresponding emotion label
        emotion_labels = {0: "Negative", 1: "Positive"}  # Update with your specific labels
        predicted_emotion = emotion_labels.get(predicted_class, "Unknown")

        print(f"The predicted emotion for the sample input '{sample_input}' is: {predicted_emotion}")


The predicted emotion for the sample input 'I love the new feature!' is: Positive
The predicted emotion for the sample input 'I am satisfied with the customer service.' is: Positive
The predicted emotion for the sample input 'This movie is not good!' is: Negative
The predicted emotion for the sample input 'This software have lots of features.' is: Negative
The predicted emotion for the sample input 'Incredible customer experience!' is: Negative
The predicted emotion for the sample input 'I am anxious' is: Positive
