In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import pickle

In [None]:
df = pd.read_csv("processed_dataset.csv")
df.head()
print(df['sentiment'].value_counts())
df = df.dropna(subset=['cleaned_review', 'sentiment'])

X = df['cleaned_review']
y = df['sentiment']

In [None]:
with open('w2v_embeddings.pkl', 'rb') as f:
    X_encoded = pickle.load(f)

y_reshaped = y.values.reshape(-1, 1)
encoder = OneHotEncoder()
y_encoded = encoder.fit_transform(y_reshaped)

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X_encoded, y_encoded, test_size=0.2, random_state=42)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

In [None]:
num_non_zero_elements = y_train.getnnz()
print(num_non_zero_elements)
num_rows = y_train.shape[0]
print(num_rows)

In [None]:
X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
y_train_tensor = torch.tensor(np.argmax(y_train.toarray(), axis=1), dtype=torch.long).to(device)
y_test_tensor = torch.tensor(np.argmax(y_test.toarray(), axis=1), dtype=torch.long).to(device)

In [None]:
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=4, pin_memory=False)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=4, pin_memory=False)   

In [None]:
class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(LSTMModel, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        # Ensure x is 3D: [batch_size, seq_length, input_dim]
        x = x.unsqueeze(1)  # Add sequence length dimension (assuming seq_length=1)
        _, (hn, _) = self.lstm(x)
        out = self.fc(hn[-1])  # Take the last hidden state
        return out  # Output shape: [batch_size, output_dim]

# Define CNN model
class CNNModel(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=input_dim, out_channels=100, kernel_size=3)
        self.fc = nn.Linear(100, output_dim)

    def forward(self, x):
        x = x.transpose(1, 2)  # Shape: [batch_size, input_dim, seq_length]
        x = nn.functional.relu(self.conv1(x))
        x = nn.functional.max_pool1d(x, x.size(2)).squeeze(2)  # Shape: [batch_size, 100]
        return self.fc(x)  # Output shape: [batch_size, output_dim]

# Define RNN model
class RNNModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(RNNModel, self).__init__()
        self.rnn = nn.RNN(input_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x = x.unsqueeze(1)  # Add sequence length dimension (assuming seq_length=1)
        _, hn = self.rnn(x)
        return self.fc(hn[-1])  # Output shape: [batch_size, output_dim]

# Define LSTM-CNN model
class LSTMCNNModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(LSTMCNNModel, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.conv = nn.Conv1d(in_channels=hidden_dim, out_channels=100, kernel_size=3)
        self.fc = nn.Linear(100, output_dim)

    def forward(self, x):
        x = x.unsqueeze(1)  # Add sequence length dimension (assuming seq_length=1)
        lstm_out, _ = self.lstm(x)
        lstm_out = lstm_out.permute(0, 2, 1)  # For Conv1d compatibility
        conv_out = nn.functional.relu(self.conv(lstm_out))
        conv_out = nn.functional.max_pool1d(conv_out, conv_out.size(2)).squeeze(2)  # Shape: [batch_size, 100]
        return self.fc(conv_out)  # Output shape: [batch_size, output_dim]

In [None]:
def train_model(model, train_loader, criterion, optimizer, device, num_epochs=10):
    model.train()
    
    for epoch in range(num_epochs):
        for batch_idx, (inputs, labels) in enumerate(train_loader):
            inputs, labels = inputs.to(device), labels.to(device)
            
            # Forward pass
            outputs = model(inputs)

            # Compute loss based on criterion
            loss = criterion(outputs, labels)  # Ensure outputs and labels have the same shape
            
            # Backward pass and optimization
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            if batch_idx % 10 == 0:  # Print loss every 10 batches
                print(f"Epoch [{epoch+1}/{num_epochs}], Batch [{batch_idx}], Loss: {loss.item()}")
    
    return model

In [None]:
input_dim = X_train_tensor.size(1)
output_dim = y_train_tensor.size(1)  # Number of classes (one-hot encoded)
hidden_dim = 128

# Initialize models
lstm_model = LSTMModel(input_dim, hidden_dim, output_dim).to(device)
cnn_model = CNNModel(input_dim, output_dim).to(device)
rnn_model = RNNModel(input_dim, hidden_dim, output_dim).to(device)
lstm_cnn_model = LSTMCNNModel(input_dim, hidden_dim, output_dim).to(device)

In [None]:
criterion = nn.BCEWithLogitsLoss() if output_dim == 1 else nn.CrossEntropyLoss()

# Optimizer function
def get_optimizer(model, lr=0.001):
    return optim.Adam(model.parameters(), lr=lr)

In [None]:
lstm_model = train_model(lstm_model, train_loader, criterion, get_optimizer(lstm_model), device)
cnn_model = train_model(cnn_model, train_loader, criterion, get_optimizer(cnn_model), device)
rnn_model = train_model(rnn_model, train_loader, criterion, get_optimizer(rnn_model), device)
lstm_cnn_model = train_model(lstm_cnn_model, train_loader, criterion, get_optimizer(lstm_cnn_model), device)


In [None]:
def evaluate_model(model, test_loader):
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
    
    acc = accuracy_score(all_labels, all_preds)
    print("Accuracy:", acc)
    print(classification_report(all_labels, all_preds))
    sns.heatmap(confusion_matrix(all_labels, all_preds), annot=True, fmt="d")
    plt.show()

In [None]:
print("LSTM Model:")
evaluate_model(lstm_model, test_loader)

In [None]:
print("CNN Model:")
evaluate_model(cnn_model, test_loader)

In [None]:
print("RNN Model:")
evaluate_model(rnn_model, test_loader)

In [None]:
print("LSTM-CNN Hybrid Model:")
evaluate_model(lstm_cnn_model, test_loader)