# **Imports and Downloads**

In [None]:
from google.colab import drive
drive.mount("/content/drive", force_remount = True)

Mounted at /content/drive


In [None]:
# Core libraries
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, Subset

import numpy as np
import pandas as pd
import pickle
import math
import re
import os
import random
import time
import matplotlib.pyplot as plt
from collections import Counter

# NLP and preprocessing
import nltk
from nltk.corpus import stopwords, opinion_lexicon
from nltk.util import ngrams
from nltk.sentiment import SentimentIntensityAnalyzer

# Sklearn tools
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.preprocessing import normalize, StandardScaler

# Reproducibility
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)

torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# **Data Loading and Preprocessing**

In [None]:
with open("/content/drive/MyDrive/QML-Research/Data/sentiment labelled sentences/amazon_cells_labelled.txt", "r") as f:
    lines = f.readlines()

sentences = [line.split("\t")[0] for line in lines]
labels = [int(line.split("\t")[1]) for line in lines]

In [None]:
nltk.download("stopwords")
stop_words = set(stopwords.words("english"))
domain_neutral_words = {
    "phone", "product", "battery", "headset", "quality", "one", "use"
}
stop_words.update(domain_neutral_words)

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [None]:
def clean_and_tokenize(text):
    text = text.lower()
    text = re.sub(r"http\S+", "", text)
    text = re.sub(r"[^a-z0-9\s]", "", text)
    text = re.sub(r"\s+", " ", text).strip()
    tokens = text.split()
    tokens = [word for word in tokens if word not in stop_words]
    return tokens

cleaned_sents = [clean_and_tokenize(sentence) for sentence in sentences]

In [None]:
max_len = 10
for i in range(len(cleaned_sents)):
  if (len(cleaned_sents[i]) < max_len):
    cleaned_sents[i] += ["<PAD>"] * (max_len - len(cleaned_sents[i]))
  else:
    cleaned_sents[i] = cleaned_sents[i][:max_len]

# **GloVE Word Embeddings**

In [None]:
def load_glove_embeddings(file_path):
    embeddings = {}
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            values = line.strip().split()
            word = values[0]
            vector = np.array(values[1:], dtype='float32')
            embeddings[word] = vector
    return embeddings

In [None]:
glove_path = '/content/drive/MyDrive/QML-Research/Data/glove.6B.100d.txt'
glove = load_glove_embeddings(glove_path)

# **AutoEncoder**

In [None]:
class GloVeAutoencoder(nn.Module):
    def __init__(self, input_dim, latent_dim):
        super(GloVeAutoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 64),
            nn.ReLU(),
            nn.Linear(64, latent_dim)
        )
        self.decoder = nn.Sequential(
            nn.Linear(latent_dim, 64),
            nn.ReLU(),
            nn.Linear(64, input_dim)
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

In [None]:
all_words = list(glove.keys())
all_vectors = np.array([glove[word] for word in all_words])
all_vectors = normalize(all_vectors)
word_tensor = torch.tensor(all_vectors).float()

In [None]:
latent_dim = 8
epochs = 100
save_path = f"/content/drive/MyDrive/QML-Research/Autoencoder-weights/glove_autoencoder_normalized_{latent_dim}.pth"

In [None]:
if os.path.exists(save_path):
    print(f"Loading Autoencoder from {save_path}")
    autoencoder = GloVeAutoencoder(input_dim=100, latent_dim=latent_dim)
    autoencoder.load_state_dict(torch.load(save_path, map_location=torch.device('cpu')))
else:
    print("Training Autoencoder")
    autoencoder = GloVeAutoencoder(input_dim=100, latent_dim=latent_dim)
    optimizer = torch.optim.Adam(autoencoder.parameters(), lr=1e-3)
    criterion = nn.MSELoss()

    for epoch in range(epochs):
        optimizer.zero_grad()
        reconstructed = autoencoder(word_tensor)
        loss = criterion(reconstructed, word_tensor)
        loss.backward()
        optimizer.step()

        if epoch % 10 == 0:
            print(f"Epoch {epoch}, Loss: {loss.item():.6f}")

    torch.save(autoencoder.state_dict(), save_path)
    print(f"Saved Autoencoder serialized model in drive @ {save_path}")

Loading Autoencoder from /content/drive/MyDrive/QML-Research/Autoencoder-weights/glove_autoencoder_normalized_8.pth


In [None]:
autoencoder.eval()
with torch.no_grad():
    compressed_vectors = autoencoder.encoder(word_tensor).numpy()

reduced_embeddings = {
    word: compressed_vectors[i]
    for i, word in enumerate(all_words)
}

# **Opinion Lexicons, Negators & Vader Compound**

In [None]:
nltk.download("opinion_lexicon")
POS_SET = set(opinion_lexicon.positive())
NEG_SET = set(opinion_lexicon.negative())
print(f"Positive Lexicons length: {len(POS_SET)}")
print(f"Negative Lexicons length: {len(NEG_SET)}")

Positive Lexicons length: 2006
Negative Lexicons length: 4783


[nltk_data] Downloading package opinion_lexicon to /root/nltk_data...
[nltk_data]   Package opinion_lexicon is already up-to-date!


In [None]:
nltk.download("vader_lexicon")
SIA = SentimentIntensityAnalyzer()

NEGATORS = {
    "no","not","never","hardly","scarcely","barely","cannot",
    "cant","isnt","arent","werent","wasnt","dont","doesnt",
    "didnt","won't","wont","can't","isn't","aren't","weren't",
    "wasn't","don't","doesn't","didn't"
}

[nltk_data] Downloading package vader_lexicon to /root/nltk_data...
[nltk_data]   Package vader_lexicon is already up-to-date!


In [None]:
def simple_feats_from_tokens(tokens, pos_set=POS_SET, neg_set=NEG_SET, raw_text=None):
    toks = [t for t in tokens if t.isalpha()]
    total = len(toks) if toks else 1
    pos_cnt = sum(1 for t in toks if t in pos_set)
    neg_cnt = sum(1 for t in toks if t in neg_set)
    pos_ratio = pos_cnt / total
    neg_ratio = neg_cnt / total
    negation_count = float(sum(1 for t in toks if t in NEGATORS))

    if raw_text is None:
        raw_text = " ".join(toks)
    vader_compound = float(SIA.polarity_scores(raw_text)["compound"])

    return [pos_ratio, neg_ratio, negation_count, vader_compound]

# **Embedding**

In [None]:
def sentence_to_vec(sentence, embeddings, dim):
    vectors = []
    for word in sentence:
        if word in embeddings:
            vectors.append(embeddings[word])
        else:
            vectors.append(np.zeros(dim))
    return vectors

def embed_sentences(cleaned_sents, embeddings, dim):
    return np.array([sentence_to_vec(tokens, embeddings, dim) for tokens in cleaned_sents])

In [None]:
X_embed_np = embed_sentences(cleaned_sents, reduced_embeddings, dim=8)
X_feats_np = np.array(
    [simple_feats_from_tokens(tokens, raw_text=sentences[i]) for i, tokens in enumerate(cleaned_sents)],
    dtype="float32"
)
labels_np  = np.array(labels, dtype=np.int64)

idx_all = np.arange(len(labels_np))
idx_tr, idx_te = train_test_split(
    idx_all, test_size=0.2, random_state=42, stratify=labels_np
)

mu = X_embed_np[idx_tr].mean(axis=(0, 1), keepdims=True)
sd = X_embed_np[idx_tr].std(axis=(0, 1), keepdims=True) + 1e-8
X_embed_std = (X_embed_np - mu) / sd

feat_scaler = StandardScaler().fit(X_feats_np[idx_tr])
X_feats_std = feat_scaler.transform(X_feats_np)

# **Dataset and DataLoader**

In [None]:
class AmazonDataset(torch.utils.data.Dataset):
    def __init__(self, X_seq, X_feats, y):
        self.X_seq = X_seq
        self.X_feats = X_feats
        self.y = y

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.X_seq[idx], self.X_feats[idx], self.y[idx]

In [None]:
X_seq_train   = torch.tensor(X_embed_std[idx_tr]).float()
X_seq_test    = torch.tensor(X_embed_std[idx_te]).float()
X_feats_train = torch.tensor(X_feats_std[idx_tr]).float()
X_feats_test  = torch.tensor(X_feats_std[idx_te]).float()
y_train       = torch.tensor(labels_np[idx_tr]).long()
y_test        = torch.tensor(labels_np[idx_te]).long()

print("Embeddings (std) train shape:", X_seq_train.shape)
print("Extra features (std) train shape:", X_feats_train.shape)

train_dataset = AmazonDataset(X_seq_train, X_feats_train, y_train)
test_dataset  = AmazonDataset(X_seq_test,  X_feats_test,  y_test)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader  = torch.utils.data.DataLoader(test_dataset,  batch_size=32)

Embeddings (std) train shape: torch.Size([800, 10, 8])
Extra features (std) train shape: torch.Size([800, 4])


# **Model**

In [None]:
class SimpleSeqFeatClassifier(nn.Module):
    def __init__(self, emb_dim=8, feat_dim=4, hidden=64, num_classes=2, p_drop=0.2):
        super().__init__()
        self.token_proj = nn.Sequential(
            nn.Linear(emb_dim, hidden),
            nn.ReLU(),
        )
        self.classifier = nn.Sequential(
            nn.Linear(hidden + feat_dim, 64),
            nn.ReLU(),
            nn.Dropout(p_drop),
            nn.Linear(64, num_classes),
        )

    def forward(self, x_seq, x_feats):
        H = self.token_proj(x_seq)
        h_sent = H.mean(dim=1)
        fused = torch.cat([h_sent, x_feats], dim=-1)
        logits = self.classifier(fused)
        return logits

# **Parameter Initialization**

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = SimpleSeqFeatClassifier(
    emb_dim=8,
    feat_dim=4,
    hidden=64,
    num_classes=2
).to(device)

EPOCHS = 100
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# **Training & Testing**

In [None]:
def train_one_epoch(model, loader, optimizer, criterion, device):
    model.train()
    total_loss, correct, total = 0.0, 0, 0
    for x_seq, x_feats, y in loader:
        x_seq, x_feats, y = x_seq.to(device), x_feats.to(device), y.to(device)

        optimizer.zero_grad()
        logits = model(x_seq, x_feats)
        loss = criterion(logits, y)
        loss.backward()
        optimizer.step()

        total_loss += loss.item() * y.size(0)
        preds = logits.argmax(dim=1)
        correct += (preds == y).sum().item()
        total += y.size(0)
    return total_loss / total, correct / total

In [None]:
@torch.no_grad()
def evaluate(model, loader, criterion, device):
    model.eval()
    total_loss, correct, total = 0.0, 0, 0
    for x_seq, x_feats, y in loader:
        x_seq, x_feats, y = x_seq.to(device), x_feats.to(device), y.to(device)
        logits = model(x_seq, x_feats)
        loss = criterion(logits, y)
        total_loss += loss.item() * y.size(0)
        preds = logits.argmax(dim=1)
        correct += (preds == y).sum().item()
        total += y.size(0)
    return total_loss / total, correct / total

In [None]:
best_state = None
best_acc = 0.0

for epoch in range(1, EPOCHS + 1):
    tr_loss, tr_acc = train_one_epoch(model, train_loader, optimizer, criterion, device)
    te_loss, te_acc = evaluate(model, test_loader, criterion, device)

    if te_acc > best_acc:
        best_acc = te_acc
        best_state = {k: v.cpu() for k, v in model.state_dict().items()}

    print(f"Epoch {epoch:02d} | Train loss {tr_loss:.4f} acc {tr_acc:.3f} "
          f"| Test loss {te_loss:.4f} acc {te_acc:.3f}")

if best_state is not None:
    model.load_state_dict(best_state)
torch.save(model.state_dict(), "/content/drive/MyDrive/QML-Research/Model Saves/Classical/simple_seq_feat_v1.pt")

final_loss, final_acc = evaluate(model, test_loader, criterion, device)
print(f"Final Test — loss: {final_loss:.4f}, acc: {final_acc:.3f}")

Epoch 01 | Train loss 0.6014 acc 0.811 | Test loss 0.5436 acc 0.810
Epoch 02 | Train loss 0.4661 acc 0.845 | Test loss 0.4577 acc 0.815
Epoch 03 | Train loss 0.3888 acc 0.873 | Test loss 0.4076 acc 0.810
Epoch 04 | Train loss 0.3463 acc 0.870 | Test loss 0.3987 acc 0.815
Epoch 05 | Train loss 0.3263 acc 0.868 | Test loss 0.3839 acc 0.825
Epoch 06 | Train loss 0.3167 acc 0.873 | Test loss 0.3775 acc 0.825
Epoch 07 | Train loss 0.3179 acc 0.861 | Test loss 0.3737 acc 0.820
Epoch 08 | Train loss 0.3096 acc 0.870 | Test loss 0.3775 acc 0.835
Epoch 09 | Train loss 0.3029 acc 0.874 | Test loss 0.3751 acc 0.840
Epoch 10 | Train loss 0.3006 acc 0.864 | Test loss 0.3762 acc 0.840
Epoch 11 | Train loss 0.2908 acc 0.881 | Test loss 0.3741 acc 0.830
Epoch 12 | Train loss 0.2886 acc 0.881 | Test loss 0.3745 acc 0.835
Epoch 13 | Train loss 0.2918 acc 0.879 | Test loss 0.3722 acc 0.825
Epoch 14 | Train loss 0.2919 acc 0.870 | Test loss 0.3748 acc 0.830
Epoch 15 | Train loss 0.2871 acc 0.875 | Test lo

# **Test Inference**

In [None]:
def tokenize_and_pad(text, max_len=10):
    t = text.lower()
    t = re.sub(r"http\S+", "", t)
    t = re.sub(r"[^a-z0-9\s]", "", t)
    t = re.sub(r"\s+", " ", t).strip()
    tokens = [w for w in t.split() if w not in stop_words]
    if len(tokens) < max_len:
        tokens = tokens + ["<PAD>"] * (max_len - len(tokens))
    else:
        tokens = tokens[:max_len]
    return tokens

In [None]:
@torch.no_grad()
def predict_texts(model, texts, device, emb_dim=8, max_len=10):
    token_lists = [tokenize_and_pad(t, max_len=max_len) for t in texts]
    feat_tokens = [[w for w in toks if w.isalpha()] for toks in token_lists]
    feats_np = np.array(
        [simple_feats_from_tokens(toks, raw_text=txt) for toks, txt in zip(feat_tokens, texts)],
        dtype="float32"
    )
    feats_np = feat_scaler.transform(feats_np)
    X_feats = torch.tensor(feats_np).float().to(device)
    X_seq_np = embed_sentences(token_lists, reduced_embeddings, dim=emb_dim)
    X_seq_np = (X_seq_np - mu) / sd
    X_seq = torch.tensor(X_seq_np).float().to(device)

    model.eval()
    logits = model(X_seq, X_feats)
    probs = torch.softmax(logits, dim=1).cpu().numpy()
    preds = logits.argmax(dim=1).cpu().numpy()
    return preds, probs

trial_sentences = [
    "This charger is amazing, super fast and highly reliable.",
    "Worst headphones ever bought, awful sound and terrible build quality.",
    "Very satisfied with camera quality considering the affordable price range.",
    "Product stopped working completely within one week of regular usage.",
    "Excellent screen resolution, great brightness and strong battery backup too.",
    "This is the best light bulb I’ve ever used.",
    "Did not match the description, complete waste of my money.",
    "This chair feels sturdy, comfortable, and worth every single penny.",
    "Absolutely love this phone case, protective, stylish and well-made.",
    "Speaker quality is poor, distorts quickly at medium volume levels.",
    "Delivery arrived quickly with excellent packaging and no visible damages.",
    "Avoid this item entirely, cheap build and constantly stops working.",
    "Best pair of scissors ever purchased, sharp, durable and reliable."
]

preds, probs = predict_texts(model, trial_sentences, device, emb_dim=8, max_len=10)

for s, p, pr in zip(trial_sentences, preds, probs):
    label = "POS" if int(p)==1 else "NEG"
    print(f"[{label}] {pr.tolist()} :: {s}")

[POS] [0.0035899747163057327, 0.996410071849823] :: This charger is amazing, super fast and highly reliable.
[NEG] [0.9969472289085388, 0.0030527871567755938] :: Worst headphones ever bought, awful sound and terrible build quality.
[POS] [0.35570475459098816, 0.6442952156066895] :: Very satisfied with camera quality considering the affordable price range.
[NEG] [0.9653716087341309, 0.03462842106819153] :: Product stopped working completely within one week of regular usage.
[POS] [0.0027090604417026043, 0.9972909092903137] :: Excellent screen resolution, great brightness and strong battery backup too.
[POS] [0.08565100282430649, 0.9143490195274353] :: This is the best light bulb I’ve ever used.
[NEG] [0.9987524747848511, 0.0012475211406126618] :: Did not match the description, complete waste of my money.
[POS] [0.0037493137642741203, 0.9962506890296936] :: This chair feels sturdy, comfortable, and worth every single penny.
[POS] [0.0017618017736822367, 0.998238205909729] :: Absolutely l