In [None]:
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from torch.nn.utils.rnn import pad_sequence
import numpy as np

In [None]:
df = pd.read_csv('/content/final_tweets.csv')

In [None]:
sentiment_mapping = {1: 2, -1: 0, 0: 1}  # Map old sentiment values to new labels
df['polarity'] = df['sentiment'].map(sentiment_mapping)

In [None]:
import nltk

# Download 'punkt_tab' to enable sentence tokenization in word_tokenize.
nltk.download('punkt_tab')

# Download 'punkt' to enable word tokenization.
nltk.download('punkt')

# Tokenize each tweet
from nltk.tokenize import word_tokenize  # Explicitly import word_tokenize

df['tokens'] = df['Tweets'].apply(word_tokenize)

In [None]:
from gensim.models import Word2Vec

# Prepare data for Word2Vec training
sentences = df['tokens'].tolist()  # List of tokenized sentences
word2vec = Word2Vec(sentences, vector_size=1000, window=5, min_count=1, workers=4)
embedding_dim = word2vec.vector_size

# Convert each tokenized sentence into a list of word embeddings
def text_to_embedding(tokens):
    return [word2vec.wv[word] if word in word2vec.wv else [0]*embedding_dim for word in tokens]

df['embedded_text'] = df['tokens'].apply(text_to_embedding)

In [None]:
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset, DataLoader

In [None]:
# Split data and labels
train_data, test_data, train_labels, test_labels = train_test_split(
    df['embedded_text'].tolist(),  # List of embedded texts
    df['polarity'].tolist(),       # List of polarity labels
    test_size=0.2,                 # 20% for testing, 80% for training
    random_state=42                # For reproducibility
)

In [None]:
import torch
from torch.nn.utils.rnn import pad_sequence

class TweetsDataset(torch.utils.data.Dataset):
    def __init__(self, data, labels):
        # Filter out data points with NaN labels during initialization
        self.data = [d for d, l in zip(data, labels) if not torch.isnan(torch.tensor(l)).item()]
        self.labels = [l for l in labels if not torch.isnan(torch.tensor(l)).item()]

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        data = torch.tensor(self.data[idx], dtype=torch.float32)
        label = torch.tensor(int(self.labels[idx]), dtype=torch.long)  # No need to check for NaN anymore
        return data, label

In [None]:
def collate_fn(batch):
    # Separate features (x) and labels (y)
    features = [item[0] for item in batch]
    labels = [item[1] for item in batch]

    # Pad the features using pad_sequence
    features_padded = pad_sequence(features, batch_first=True, padding_value=0)  # Assumes 0 is the padding token

    # Stack the labels to create a tensor
    labels = torch.tensor(labels)

    return features_padded, labels

train_dataset = TweetsDataset(train_data, train_labels) # This line was missing
test_dataset = TweetsDataset(test_data, test_labels)   # This line was missing

# Create DataLoaders using the custom collate function
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

class LSTMModel(nn.Module):
    def __init__(self, embedding_dim, hidden_dim, output_dim):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.dropout = nn.Dropout(0.2)  # Add dropout with a rate of 0.2
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        _, (hidden, _) = self.lstm(x)
        out = self.dropout(hidden[-1])  # Apply dropout to the hidden state
        out = self.fc(out)
        return out


In [None]:
# Hyperparameters
embedding_dim = 1000  # Should match the Word2Vec embedding size
hidden_dim = 32
output_dim = 3  # For three classes: negative, neutral, positive
learning_rate = 0.001
num_epochs = 10


In [None]:
# Initialize model, loss function, and optimizer
lstm_model = LSTMModel(embedding_dim, hidden_dim, output_dim)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(lstm_model.parameters(), lr=learning_rate)

In [None]:
import matplotlib.pyplot as plt

epoch_accuracy = []  # Initialize an empty list to store epoch accuracies


def train_model(model, optimizer, loader):
    model.train()  # Set the model to training mode
    total_loss = 0
    correct_predictions = 0
    total_samples = 0

    for x, y in loader:
        optimizer.zero_grad()  # Clear gradients
        output = model(x)  # Forward pass
        loss = criterion(output, y)  # Compute loss
        loss.backward()  # Backward pass
        optimizer.step()  # Update weights
        total_loss += loss.item()

        # Calculate accuracy
        _, predicted = torch.max(output, 1)  # Get predicted class labels
        total_samples += y.size(0)
        correct_predictions += (predicted == y).sum().item()

    accuracy = correct_predictions / total_samples
    return total_loss / len(loader), accuracy


# Training the model
for epoch in range(num_epochs):
    lstm_loss, accuracy = train_model(lstm_model, optimizer, train_loader)
    epoch_accuracy.append(accuracy)
    print(f'Epoch [{epoch+1}/{num_epochs}], LSTM Loss: {lstm_loss:.4f}, Accuracy: {accuracy * 100:.2f}%')




  data = torch.tensor(self.data[idx], dtype=torch.float32)


Epoch [1/10], LSTM Loss: 0.9225, Accuracy: 55.87%
Epoch [2/10], LSTM Loss: 0.8149, Accuracy: 60.94%
Epoch [3/10], LSTM Loss: 0.7576, Accuracy: 64.87%
Epoch [4/10], LSTM Loss: 0.6983, Accuracy: 68.88%
Epoch [5/10], LSTM Loss: 0.6678, Accuracy: 70.85%
Epoch [6/10], LSTM Loss: 0.6437, Accuracy: 72.38%
Epoch [7/10], LSTM Loss: 0.6292, Accuracy: 73.05%
Epoch [8/10], LSTM Loss: 0.6136, Accuracy: 73.82%
Epoch [9/10], LSTM Loss: 0.6030, Accuracy: 74.43%
Epoch [10/10], LSTM Loss: 0.5918, Accuracy: 74.97%


In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score

def evaluate_model_with_metrics(model, loader):
    model.eval()  # Set the model to evaluation mode
    all_targets = []
    all_predictions = []

    with torch.no_grad():  # No need to calculate gradients
        for x, y in loader:
            output = model(x)  # Forward pass
            _, predicted = torch.max(output, 1)  # Get class with max probability

            # Collect predictions and actual labels
            all_predictions.extend(predicted.cpu().numpy())
            all_targets.extend(y.cpu().numpy())

    # Calculate Accuracy
    accuracy = (np.array(all_predictions) == np.array(all_targets)).sum() / len(all_targets)

    # Calculate Precision, Recall, F1-Score
    precision = precision_score(all_targets, all_predictions, average='weighted')
    recall = recall_score(all_targets, all_predictions, average='weighted')
    f1 = f1_score(all_targets, all_predictions, average='weighted')

    return accuracy, precision, recall, f1

# Evaluate the LSTM Model
accuracy, precision, recall, f1 = evaluate_model_with_metrics(lstm_model, test_loader)

# Print the metrics
print(f'Test Accuracy: {accuracy*100:.2f}%')
print(f'Precision: {precision:.4f}')
print(f'Recall: {recall:.4f}')
print(f'F1-Score: {f1:.4f}')


Test Accuracy: 73.08%
Precision: 0.7355
Recall: 0.7308
F1-Score: 0.7189


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

class RNNModel(nn.Module):
    def __init__(self, embedding_dim, hidden_dim, output_dim):
        super(RNNModel, self).__init__()
        self.rnn = nn.RNN(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        _, hidden = self.rnn(x)  # Use the last hidden state output
        out = self.fc(hidden[-1])
        return out


In [None]:
# Hyperparameters
embedding_dim = 1000  # Should match the Word2Vec embedding size
hidden_dim = 32
output_dim = 3  # For three classes: negative, neutral, positive
learning_rate = 0.001
num_epochs = 10


In [None]:
# Initialize the RNN model, loss function, and optimizer
rnn_model = RNNModel(embedding_dim, hidden_dim, output_dim)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(rnn_model.parameters(), lr=learning_rate)


In [None]:
import matplotlib.pyplot as plt

# Function to train the model and calculate accuracy
def train_model(model, optimizer, loader):
    model.train()  # Set the model to training mode
    total_loss = 0
    correct_predictions = 0
    total_samples = 0

    for x, y in loader:
        optimizer.zero_grad()  # Clear gradients
        output = model(x)  # Forward pass
        loss = criterion(output, y)  # Compute loss
        loss.backward()  # Backward pass
        optimizer.step()  # Update weights
        total_loss += loss.item()

        # Calculate accuracy
        _, predicted = torch.max(output, 1)  # Get predicted class labels
        total_samples += y.size(0)
        correct_predictions += (predicted == y).sum().item()

    # Calculate average loss and accuracy
    average_loss = total_loss / len(loader)
    accuracy = correct_predictions / total_samples  # Accuracy in decimal form
    return average_loss, accuracy

# Training the RNN model
train_losses = []
train_accuracies = []

for epoch in range(num_epochs):
    # Train and record the loss and accuracy
    rnn_loss, train_accuracy = train_model(rnn_model, optimizer, train_loader)
    train_losses.append(rnn_loss)
    train_accuracies.append(train_accuracy)

    print(f'Epoch [{epoch+1}/{num_epochs}], RNN Loss: {rnn_loss:.4f}, Training Accuracy: {train_accuracy * 100:.2f}%')



Epoch [1/10], RNN Loss: 1.0056, Training Accuracy: 50.43%
Epoch [2/10], RNN Loss: 0.9896, Training Accuracy: 52.13%
Epoch [3/10], RNN Loss: 0.9890, Training Accuracy: 51.97%
Epoch [4/10], RNN Loss: 0.9877, Training Accuracy: 52.04%
Epoch [5/10], RNN Loss: 0.9854, Training Accuracy: 52.53%
Epoch [6/10], RNN Loss: 0.9850, Training Accuracy: 52.50%
Epoch [7/10], RNN Loss: 0.9840, Training Accuracy: 52.57%
Epoch [8/10], RNN Loss: 0.9815, Training Accuracy: 52.94%
Epoch [9/10], RNN Loss: 0.9812, Training Accuracy: 52.79%
Epoch [10/10], RNN Loss: 0.9799, Training Accuracy: 52.98%


In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score

def evaluate_model_with_metrics(model, loader):
    model.eval()  # Set the model to evaluation mode
    all_targets = []
    all_predictions = []

    with torch.no_grad():  # No need to calculate gradients
        for x, y in loader:
            output = model(x)  # Forward pass
            _, predicted = torch.max(output, 1)  # Get class with max probability

            # Append predictions and actual labels
            all_predictions.extend(predicted.cpu().numpy())
            all_targets.extend(y.cpu().numpy())

    # Accuracy
    accuracy = (np.array(all_predictions) == np.array(all_targets)).sum() / len(all_targets)

    # Precision, Recall, F1-Score
    precision = precision_score(all_targets, all_predictions, average='weighted')
    recall = recall_score(all_targets, all_predictions, average='weighted')
    f1 = f1_score(all_targets, all_predictions, average='weighted')

    return accuracy, precision, recall, f1

# Final Test Evaluation with Metrics
accuracy, precision, recall, f1 = evaluate_model_with_metrics(rnn_model, test_loader)

# Print Metrics
print(f"Final Test Accuracy of RNN Model: {accuracy*100:.4f}%")
print(f"Precision: {precision*100:.4f}")
print(f"Recall: {recall*100:.4f}")
print(f"F1-Score: {f1*100:.4f}")


Final Test Accuracy of RNN Model: 50.7000%
Precision: 0.4051
Recall: 0.5070
F1-Score: 0.3818


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


In [None]:
class LSTMModel(nn.Module):
    def __init__(self, embedding_dim, hidden_dim, output_dim):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        _, (hidden, _) = self.lstm(x)
        out = self.fc(hidden[-1])
        return out  # No softmax, we’ll use raw scores


class RNNModel(nn.Module):
    def __init__(self, embedding_dim, hidden_dim, output_dim):
        super(RNNModel, self).__init__()
        self.rnn = nn.RNN(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        _, hidden = self.rnn(x)
        out = self.fc(hidden[-1])
        return out  # No softmax, we’ll use raw scores


In [None]:
def ensemble_predict_majority_voting(lstm_model, rnn_model, loader):
    lstm_model.eval()
    rnn_model.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for x, y in loader:
            # Get class predictions from both models
            lstm_output = lstm_model(x)  # LSTM model output
            _, lstm_pred = torch.max(lstm_output, 1)  # LSTM model prediction

            rnn_output = rnn_model(x)  # RNN model output
            _, rnn_pred = torch.max(rnn_output, 1)  # RNN model prediction

            # Combine predictions using majority voting
            final_pred = []
            for i in range(len(lstm_pred)):
                # If both models agree, use that prediction
                if lstm_pred[i] == rnn_pred[i]:
                    final_pred.append(lstm_pred[i])
                else:
                    # In case of a tie, use LSTM's prediction as the default
                    final_pred.append(lstm_pred[i])

            final_pred = torch.stack(final_pred)

            # Count correct predictions
            correct += (final_pred == y).sum().item()
            total += y.size(0)

    return correct / total


In [None]:
# Evaluate ensemble model on test set with majority voting
ensemble_accuracy = ensemble_predict_majority_voting(lstm_model, rnn_model, test_loader)
print(f'Ensemble Model Test Accuracy (Majority Voting): {ensemble_accuracy:.4f}')


Ensemble Model Test Accuracy (Majority Voting): 0.7308
