<a href="https://colab.research.google.com/github/sureshiitp/Assignment-1-APR/blob/main/surjii.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install torch torchvision torchtext pandas numpy scikit-learn nltk matplotlib seaborn


In [None]:
from google.colab import files
uploaded = files.upload()  # Upload your Flipkart CSV here


In [None]:
# Step 4: Load and Preprocess Dataset (Safe Manual Version)
import pandas as pd
import numpy as np
import torch
from sklearn.model_selection import train_test_split
import nltk
from nltk.corpus import stopwords
import re

# Download NLTK stopwords
nltk.download('stopwords')

# Load CSV safely
try:
    df = pd.read_csv(list(uploaded.keys())[0], encoding='utf-8', errors='ignore')
except:
    df = pd.read_csv(list(uploaded.keys())[0], encoding='ISO-8859-1')

print("CSV Loaded. Columns:", df.columns)

# ------------------- MANUAL COLUMN SELECTION -------------------
review_col = 'Review'
rating_col = 'Rate'

# Basic cleaning of review text
def clean_text(text):
    text = str(text).lower()
    text = re.sub(r'[^a-zA-Z0-9\s]', '', text)
    text = ' '.join([word for word in text.split() if word not in stopwords.words('english')])
    return text

df['clean_review'] = df[review_col].apply(clean_text)

# Convert ratings to sentiment labels
def rating_to_sentiment(r):
    try:
        r = float(r)
    except:
        r = 3  # Treat invalid/missing ratings as Neutral
    if r <= 2:
        return 0  # Negative
    elif r == 3:
        return 1  # Neutral
    else:
        return 2  # Positive

df['label'] = df[rating_col].apply(rating_to_sentiment)

# Train-test split
train_texts, test_texts, train_labels, test_labels = train_test_split(
    df['clean_review'], df['label'], test_size=0.2, random_state=42
)

print("Training samples:", len(train_texts))
print("Testing samples:", len(test_texts))


In [None]:
# Step 5: Corrected FNN for Bag-of-Words
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics import classification_report

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Using device:", device)

# Dataset class
class ReviewDataset(Dataset):
    def __init__(self, texts, labels, vectorizer):
        self.X = vectorizer.transform(texts).toarray()
        self.y = labels.values

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return torch.tensor(self.X[idx], dtype=torch.float32), torch.tensor(self.y[idx], dtype=torch.long)

# Vectorization
vectorizer = CountVectorizer(max_features=5000)
vectorizer.fit(train_texts)

train_dataset = ReviewDataset(train_texts, train_labels, vectorizer)
test_dataset = ReviewDataset(test_texts, test_labels, vectorizer)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# Feed-Forward Network
input_dim = len(vectorizer.vocabulary_)  # <-- Correct input size
hidden_dim = 128
output_dim = 3

class FNN(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(FNN, self).__init__()
        self.fc1 = nn.Linear(input_dim, hidden_dim)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_dim, output_dim)

    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

model = FNN(input_dim, hidden_dim, output_dim).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training
num_epochs = 5
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(train_loader):.4f}")

# Evaluation
model.eval()
all_preds, all_labels = [], []
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        outputs = model(X_batch)
        preds = torch.argmax(outputs, dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(y_batch.cpu().numpy())

print(classification_report(all_labels, all_preds, target_names=['Negative','Neutral','Positive']))


In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print("Using device:", device)


In [None]:
# Tokenize reviews
def tokenize(text):
    return text.lower().split()

tokenized_train = [tokenize(text) for text in train_texts]
tokenized_test = [tokenize(text) for text in test_texts]

# Build vocabulary manually
from collections import Counter

all_tokens = [token for review in tokenized_train for token in review]
token_counts = Counter(all_tokens)
vocab_tokens = ["<unk>"] + [t for t, c in token_counts.most_common(5000)]  # top 5000 words
vocab = {word: idx for idx, word in enumerate(vocab_tokens)}
vocab_size = len(vocab)
print("Vocabulary size:", vocab_size)

# Encode reviews to indices
def encode(texts):
    encoded = []
    for text in texts:
        indices = [vocab.get(word, 0) for word in tokenize(text)]  # 0 = <unk>
        encoded.append(torch.tensor(indices, dtype=torch.long))
    return encoded

train_encoded = encode(train_texts)
test_encoded = encode(test_texts)


In [None]:
from torch.nn.utils.rnn import pad_sequence  # <- add this line

# Pad sequences
train_padded = pad_sequence(train_encoded, batch_first=True)
test_padded = pad_sequence(test_encoded, batch_first=True)

# Dataset class
class ReviewSeqDataset(Dataset):
    def __init__(self, sequences, labels):
        self.X = sequences
        self.y = torch.tensor(labels.values, dtype=torch.long)
    def __len__(self):
        return len(self.y)
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

train_dataset = ReviewSeqDataset(train_padded, train_labels)
test_dataset = ReviewSeqDataset(test_padded, test_labels)

train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)


In [None]:
class Attention(nn.Module):
    def __init__(self, hidden_dim):
        super(Attention, self).__init__()
        self.attn = nn.Linear(hidden_dim*2, 1)
    def forward(self, lstm_out):
        weights = torch.softmax(self.attn(lstm_out), dim=1)
        output = torch.sum(weights * lstm_out, dim=1)
        return output

class BiLSTM_Attention(nn.Module):
    def __init__(self, vocab_size, embed_dim, hidden_dim, output_dim):
        super(BiLSTM_Attention, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, bidirectional=True, batch_first=True)
        self.attention = Attention(hidden_dim)
        self.fc = nn.Linear(hidden_dim*2, output_dim)
    def forward(self, x):
        x = self.embedding(x)
        lstm_out, _ = self.lstm(x)
        attn_out = self.attention(lstm_out)
        out = self.fc(attn_out)
        return out

embed_dim = 100
hidden_dim = 128
output_dim = 3  # Negative, Neutral, Positive

model = BiLSTM_Attention(vocab_size, embed_dim, hidden_dim, output_dim).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)


In [None]:
num_epochs = 5

for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(train_loader):.4f}")


In [None]:
model.eval()
all_preds, all_labels = [], []
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        outputs = model(X_batch)
        preds = torch.argmax(outputs, dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(y_batch.cpu().numpy())

print(classification_report(all_labels, all_preds, target_names=['Negative','Neutral','Positive']))


In [None]:
torch.save(model.state_dict(), "bilstm_attention_model.pth")
print("BiLSTM + Attention model saved successfully!")


In [None]:

!pip install torch-geometric -q

In [None]:
import torch, torch_geometric
print(torch.__version__)
print(torch_geometric.__version__)


In [None]:
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)


In [None]:
vocab_size = 5000  # or your actual vocab size
embedding_dim = 128
hidden_dim = 64
output_dim = 3  # Negative, Neutral, Positive


In [None]:
import torch.nn as nn
import torch.nn.functional as F

In [None]:
class BiLSTMAttention(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super(BiLSTMAttention, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, bidirectional=True, batch_first=True)
        self.attention = nn.Linear(hidden_dim*2, 1)
        self.fc = nn.Linear(hidden_dim*2, output_dim)

    def forward(self, x):
        embedded = self.embedding(x)  # (batch, seq_len, embed_dim)
        lstm_out, _ = self.lstm(embedded)  # (batch, seq_len, hidden*2)
        attn_weights = F.softmax(self.attention(lstm_out), dim=1)  # (batch, seq_len, 1)
        context = torch.sum(attn_weights * lstm_out, dim=1)  # (batch, hidden*2)
        out = self.fc(context)  # (batch, output_dim)
        return out

In [None]:
model = BiLSTMAttention(vocab_size, embedding_dim, hidden_dim, output_dim)
model = model.to(device)

In [None]:
import torch.optim as optim
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim


In [None]:
from torch.utils.data import TensorDataset, DataLoader

# Step 9: Convert padded sequences and labels to tensors
# Use the variables defined in the previously executed cells.
X_train_tensor = train_padded.clone().detach()
y_train_tensor = torch.tensor(train_labels.values, dtype=torch.long)
X_test_tensor = test_padded.clone().detach()
y_test_tensor = torch.tensor(test_labels.values, dtype=torch.long)

# Step 9: Create DataLoader for batching
train_dataset = TensorDataset(X_train_tensor, y_train_tensor)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_dataset = TensorDataset(X_test_tensor, y_test_tensor)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False)

# Step 9: Training loop
num_epochs = 5
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {total_loss/len(train_loader):.4f}")

# Step 9: Evaluate the model
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        outputs = model(X_batch)
        _, predicted = torch.max(outputs, 1)
        total += y_batch.size(0)
        correct += (predicted == y_batch).sum().item()

print(f"Test Accuracy: {100 * correct / total:.2f}%")

In [None]:
# Step 10: Evaluate the model
model.eval()  # set model to evaluation mode
correct = 0
total = 0

with torch.no_grad():
    for texts, labels in test_loader:
        texts, labels = texts.to(device), labels.to(device)
        outputs = model(texts)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = 100 * correct / total
print(f"Test Accuracy: {accuracy:.2f}%")


In [None]:
from torch_geometric.nn import GCNConv

class BiLSTM_Attn_GNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super(BiLSTM_Attn_GNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, bidirectional=True, batch_first=True)
        self.attention = nn.Linear(hidden_dim * 2, 1)

        # GNN Layer (Graph Convolution)
        self.gcn1 = GCNConv(hidden_dim * 2, hidden_dim)
        self.fc = nn.Linear(hidden_dim, output_dim)

    def forward(self, x, edge_index=None):
        embedded = self.embedding(x)
        lstm_out, _ = self.lstm(embedded)

        # Attention
        attn_weights = torch.softmax(self.attention(lstm_out), dim=1)
        attn_out = torch.sum(attn_weights * lstm_out, dim=1)

        # GNN part
        if edge_index is not None:
            attn_out = self.gcn1(attn_out, edge_index)

        out = self.fc(attn_out)
        return out


In [None]:
# Step 12: Combine BiLSTM with GNN (Graph Neural Network)
import torch
import torch.nn as nn
from torch_geometric.nn import GCNConv

class BiLSTM_GNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim):
        super(BiLSTM_GNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, bidirectional=True)
        self.gcn1 = GCNConv(hidden_dim * 2, hidden_dim)
        self.fc = nn.Linear(hidden_dim, output_dim)
        self.relu = nn.ReLU()

    def forward(self, x, edge_index):
        x = self.embedding(x)
        lstm_out, _ = self.lstm(x)
        x = lstm_out.mean(dim=1)  # Global average pooling
        x = self.relu(self.gcn1(x, edge_index))
        x = self.fc(x)
        return x

# Initialize model
gnn_model = BiLSTM_GNN(vocab_size, embedding_dim, hidden_dim, output_dim).to(device)
print("✅ BiLSTM-GNN model is ready.")


In [None]:
# Step 13: Memory-efficient graph creation
import torch

# Limit number of nodes to avoid RAM crash
subset_size = min(500, X_train_tensor.size(0))  # take up to 500 samples
X_subset = X_train_tensor[:subset_size]

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Create dummy graph edges using nearest neighbors
# Here we just connect each node to the next 5 nodes to reduce edges
edges = []
k = 5  # number of neighbors per node
for i in range(subset_size):
    for j in range(1, k+1):
        if i + j < subset_size:
            edges.append([i, i+j])
            edges.append([i+j, i])

# Convert to torch tensor
edge_index = torch.tensor(edges, dtype=torch.long).t().to(device)  # shape [2, num_edges]

print("Subset size (nodes):", subset_size)
print("Edge index shape:", edge_index.shape)


In [None]:
# Step 14: Model Training (with memory-efficient graph edges)
import torch.nn.functional as F

# Move model to device
model = BiLSTMAttention(vocab_size, embedding_dim, hidden_dim, output_dim).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop
num_epochs = 5  # start with small number to check
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()
        total_loss


In [None]:
# Step 15: Model Evaluation
model.eval()  # set model to evaluation mode
correct = 0
total = 0

with torch.no_grad():
    for X_batch, y_batch in test_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        outputs = model(X_batch)
        _, predicted = torch.max(outputs.data, 1)
        total += y_batch.size(0)
        correct += (predicted == y_batch).sum().item()

accuracy = 100 * correct / total
print(f"Test Accuracy: {accuracy:.2f}%")


In [None]:
# Step 16: Save the trained model
model_path = "bilstm_attention_model.pth"
torch.save(model.state_dict(), model_path)
print(f"Model saved to {model_path}")


In [None]:
# Step 17: Inference on new data
def predict_sentiment(text, vocab, model, device):
    # Tokenize text
    tokens = text.lower().split()  # simple split; you can use same tokenizer as training
    # Encode tokens using your vocab
    encoded = [vocab.get(token, vocab["<unk>"]) for token in tokens]
    # Convert to tensor and pad
    tensor_input = torch.tensor(encoded, dtype=torch.long).unsqueeze(0).to(device)

    with torch.no_grad():
        output = model(tensor_input)
        pred = torch.argmax(output, dim=1).item()

    labels = {0: "Negative", 1: "Neutral", 2: "Positive"}
    return labels[pred]


In [None]:
positive_text= "Very happy with the purchase, highly recommend it."
neutral_text = "Average quality, nothing special or disappointing."
negative_text ="Very poor quality, broke after first use"

In [None]:
# Example usage:
sample_text = negative_text
print("Predicted Sentiment:", predict_sentiment(sample_text, vocab, model, device))

In [None]:
from google.colab import files
files.download('bilstm_attention_model.pth')
