### **Library**

In [1]:
# Library
# Time
from timeit import default_timer as timer
import time
from tqdm.auto import tqdm

# File
import warnings
import os
import requests
import zipfile
from pathlib import Path
import random
import chardet

# Numerical & Data Handling
import numpy as np
import pandas as pd
import scipy as sp
import math
from typing import List, Callable, Union, Dict, Any, Tuple
import itertools
from torch.utils.data import DataLoader, Dataset, random_split, TensorDataset

# Visualization
import seaborn as sns
from matplotlib import pyplot as plt
from matplotlib.lines import Line2D
%matplotlib inline
from sklearn.tree import plot_tree
from scipy.optimize import curve_fit
from mpl_toolkits.axes_grid1 import host_subplot
import mpl_toolkits.axisartist as AA
from pandas.plotting import lag_plot

# Classic ML Libraries
from sklearn.linear_model import LogisticRegression

# Neural Network Libraries
import torch
from torch import nn as nn
from torch.nn import functional as F
import tensorflow as tf
from transformers import pipeline

# NLP
import re
from collections import Counter, defaultdict
from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer
from transformers import AutoTokenizer, AutoModel, BertTokenizer, BertForSequenceClassification
from datasets import Dataset

# Feature Engineering
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import (StandardScaler, RobustScaler, MinMaxScaler, MaxAbsScaler, Normalizer,
								   LabelEncoder, OneHotEncoder, OrdinalEncoder, LabelBinarizer)

# Evaluation
from sklearn.metrics import *
from torchmetrics import *

  from .autonotebook import tqdm as notebook_tqdm


### **Data Preprocessing**

In [None]:
# Load data
df = pd.read_csv(filepath_or_buffer="data/imdb/imdb_50k.csv")

In [None]:
def tokenizer(text):
	"""
    Tokenize a given text string by:
    - Lowercasing all characters
    - Removing HTML tags
    - Removing non-alphabetic characters except whitespace
    - Splitting text into individual word tokens

    Args:
        text (str): Raw input text

    Returns:
        list of str: Tokenized list of cleaned words
    """

	text = text.lower()
	text = re.sub(pattern=r"<.*?>", repl="", string=text)
	text = re.sub(pattern=r"[^a-z\s]", repl="", string=text)

	return text.split()

# Tokenize text
reviews_tokenized = df["review"].apply(tokenizer)

In [None]:
# Label encoding
df["label"] = df["sentiment"].map({"positive": 1, "negative": 0})

In [None]:
# Word counter
counter = Counter()
for tokens in reviews_tokenized:
    counter.update(tokens)

In [None]:
# Build vocab dict
min_freq = 2 # Keep tokens with frequency >= 2
vocab = {'<pad>': 0, '<unk>': 1} # Start vocab w/ special tokens
tokens_sorted = sorted(counter.items(), key=lambda x: x[1], reverse=True) # Sort tokens by frequency
for word, freq in tokens_sorted:
    if freq >= min_freq:
        vocab[word] = len(vocab) # Add a token to vocab

In [None]:
def encode_and_pad(tokens, max_len=256):
    """
    Convert a list of word tokens into a fixed-length list of integer indices
    using a predefined vocabulary. Pads shorter sequences with <pad> tokens
    and truncates longer ones.

    Args:
        tokens (list of str): List of tokenized words
        max_len (int): Desired fixed length of output sequence (default is 256)

    Returns:
        list of int: Encoded and padded sequence of length `max_len`
    """

    vec_encoded = [vocab.get(token, vocab['<unk>']) for token in tokens] # Convert each word in tokens to its integer id from the vocab dictionary, and if a token is not found in vocab, use the index of '<unk>'
    vec_encoded = vec_encoded[:max_len]
    vec_padding = [vocab['<pad>']] * (max_len - len(vec_encoded))
    return vec_encoded + vec_padding

# Encode and pad
MAX_LEN = 256
reviews_encoded = reviews_tokenized.apply(lambda x: encode_and_pad(x, MAX_LEN))

In [None]:
# Data split
X = torch.tensor(reviews_encoded.tolist())
y = torch.tensor(df['label'].values)
X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.25, random_state=42)

In [None]:
# Tensor -> TensorDataset (Wrap input and label) -> DataLoader (Divide dataset into mini-batches)
BATCH_SIZE = 64
dl_train = DataLoader(TensorDataset(X_train, y_train), batch_size=BATCH_SIZE, shuffle=True)
dl_val = DataLoader(TensorDataset(X_val, y_val), batch_size=BATCH_SIZE)
dl_test = DataLoader(TensorDataset(X_test, y_test), batch_size=BATCH_SIZE)

### **Classic ML**

In [None]:
# Bag of words
vectorizer = CountVectorizer(max_features=5000)
X_train_val_vec = vectorizer.fit_transform(X_train_val)
X_test_vec = vectorizer.transform(X_test)

# Logistic Regression
model_logistic_regression = LogisticRegression(max_iter=1000)
model_logistic_regression.fit(X_train_val_vec, y_train_val)
y_pred = model_logistic_regression.predict(X_test_vec)

print(classification_report(y_test, y_pred))

In [None]:
# TF-IDF
vectorizer = TfidfVectorizer(max_features=5000)
X_train_val_vec = vectorizer.fit_transform(X_train_val)
X_test_vec = vectorizer.transform(X_test)

# Logistic Regression
model_logistic_regression = LogisticRegression(max_iter=1000)
model_logistic_regression.fit(X_train_val_vec, y_train_val)
y_pred = model_logistic_regression.predict(X_test_vec)

print(classification_report(y_test, y_pred))

### **NN**

In [None]:
# Device agnostic code
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
class LSTM_model_v0(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=vocab['<pad>'])
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, 1)
        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        embedded = self.dropout(self.embedding(x))
        _, (hidden, _) = self.lstm(embedded)
        return self.fc(hidden[-1]).squeeze(1)

In [None]:
# Instantiation
model = LSTM_model_v0(len(vocab), embedding_dim=256, hidden_dim=128).to(device)
criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# Early stopping setup
patience = 3
counter = 0
best_acc_val = 0.0
best_model_state = None

EPOCHS = 20
for epoch in range(EPOCHS):
    # Training
    model.train()
    correct_train, total_train = 0, 0
    for batch_X, batch_y in dl_train:
        batch_X, batch_y = batch_X.to(device), batch_y.float().to(device)
        y_logits = model(batch_X)
        loss_train = criterion(y_logits, batch_y)
        optimizer.zero_grad()
        loss_train.backward()
        optimizer.step()
        y_pred_prob = torch.sigmoid(y_logits)
        y_pred = torch.round(y_pred_prob)
        correct_train += (y_pred == batch_y).sum().item()
        total_train += batch_y.size(0)
    acc_train = correct_train / total_train
    print(f"Epoch {epoch+1}, Train Accuracy: {acc_train:.4f}")

    # Evaluation
    model.eval()
    correct_val, total_val = 0, 0
    with torch.inference_mode():
        for batch_X, batch_y in dl_val:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            y_logits = model(batch_X)
            y_pred_prob = torch.sigmoid(y_logits)
            y_pred = torch.round(y_pred_prob)
            correct_val += (y_pred == batch_y).sum().item()
            total_val += batch_y.size(0)
    acc_val = correct_val / total_val
    print(f"Epoch {epoch+1}, Val Accuracy: {acc_val:.4f}")

    # Early stopping
    if acc_val > best_acc_val:
        best_acc_val = acc_val
        counter = 0
        best_model_state = model.state_dict()  # Save best model
    else:
        counter += 1
        if counter >= patience:
            print(f"Early stopping triggered at epoch {epoch+1}")
            break

In [None]:
# Test
model.eval()
correct, total = 0, 0
with torch.inference_mode():
    for batch_X, batch_y in dl_test:
        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
        y_logits = model(batch_X)
        y_pred_prob = torch.sigmoid(y_logits)
        y_pred = torch.round(y_pred_prob)
        correct += (y_pred == batch_y).sum().item()
        total += batch_y.size(0)
print(f"Test Accuracy: {correct / total:.4f}")