In [None]:
import os, re, random

DATA_PATH  = "data/aclImdb/"

def load_inidv_dataset(set_path: str) -> tuple:
    texts = []
    labels = []
    
    for label in ['pos', 'neg']:
        cat_path = os.path.join(set_path, label)
        for file_name in os.listdir(cat_path):
            file_path = os.path.join(cat_path, file_name)
            with open(file_path, 'r') as file:
                text = file.read()
                texts.append(text)
            labels.append(0 if label=='neg' else 1)
            
    return (texts, labels)


def load_dataset(train_path: str, test_path: str, seed=1) -> tuple:
    train_texts = []
    train_labels = []
    
    (train_texts, train_labels) = load_inidv_dataset(set_path=train_path)
    (test_texts, test_labels)   = load_inidv_dataset(set_path=test_path)

    random.seed(seed)
    random.shuffle(train_texts)
    random.shuffle(test_texts)
    
    random.seed(seed)
    random.shuffle(train_labels)
    random.shuffle(test_labels)
    
    #remove html tags from the texts
    train_texts = [re.sub('<.*?>', '', text) for text in train_texts]
    test_texts  = [re.sub('<.*?>', '', text) for text in test_texts]
    
    return ((train_texts, train_labels), (test_texts, test_labels))


def get_smaller_dataset(size: int, texts: list[str], labels: list[int], seed=10) -> tuple:
    
    random.seed(seed)
    smaller_texts = random.sample(texts, size)
    random.seed(seed)
    smaller_labels = random.sample(labels, size)

    return (smaller_texts, smaller_labels)


train_path = os.path.join(DATA_PATH, "train")
test_path  = os.path.join(DATA_PATH, "test")

((train_texts, train_labels), (test_texts, test_labels)) = load_dataset(train_path=train_path, test_path=test_path)

In [None]:
import numpy as np

from sklearn.model_selection import train_test_split
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.feature_extraction.text import TfidfVectorizer

MAX_FEATURES = 5000

random.seed()
seed = random.randint(1, 1000)

(sm_texts, sm_labels) = get_smaller_dataset(12000, train_texts, train_labels, seed=seed)

vectorizer = TfidfVectorizer(ngram_range=(1,2), min_df=5)

X_sm = vectorizer.fit_transform(sm_texts)
y_sm = np.array(sm_labels)

selector = SelectKBest(score_func=f_classif, k=min(MAX_FEATURES, X_sm.shape[1]))
X_sm = selector.fit_transform(X_sm, y_sm)

In [None]:
#placeholder benchmark naive bayes model
from sklearn.naive_bayes import ComplementNB
from sklearn.metrics import accuracy_score

X_sm_train, X_sm_test, y_sm_train, y_sm_test = train_test_split(X_sm, y_sm, test_size=0.33, random_state=42)

clf = ComplementNB()
clf.fit(X_sm_train, y_sm_train)
y_pred = clf.predict(X_sm_test)

accuracy_score(y_sm_test, y_pred)

In [None]:
y_sm_train

In [None]:
import torch
from torch.utils.data import Dataset, DataLoader

#For now just wrapping already processed vectors
class NgramMovieDataset(Dataset):
    def __init__(self, X, y, device):        
        self.x = torch.from_numpy(X).to(device, dtype=torch.float32)
        self.y = torch.from_numpy(y).to(device, dtype=torch.float32)
            
    def __getitem__(self, idx):
        return (self.x[idx], self.y[idx])
    
    def __len__(self):
        return self.x.shape[0]

    
device = ("cuda" if torch.cuda.is_available() else "cpu")

train_dataset = NgramMovieDataset(X=X_sm_train.toarray(), y=y_sm_train, device=device)
test_dataset  = NgramMovieDataset(X=X_sm_test.toarray(), y=y_sm_test, device=device)

train_dataloader = DataLoader(dataset=train_dataset, batch_size=64)
test_dataloader  = DataLoader(dataset=test_dataset,  batch_size=64)

In [None]:
from torch import nn

N_EPOCHS = 10
BATCH_SIZE = 64


class SentimentCLF(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear_stack = nn.Sequential(
            nn.Linear(5000, 512),
            nn.Linear(512, 512),
            nn.Linear(512, 1)
        )

    def forward(self, x):
        logits = self.linear_stack(x)
        return logits
    
            
model = SentimentCLF().to(device)

loss_fn   = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3) 


def train(dataloader: torch.utils.data.DataLoader, model: nn.Module, loss_fn, optimizer):
    
    size = len(dataloader.dataset)
    
    for batch, (X, y) in enumerate(dataloader): 
        
        model.train()
        
        pred = model(X).squeeze()
        loss = loss_fn(pred, y)
        
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        
        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
                   
    
def test(dataloader: torch.utils.data.DataLoader, model: nn.Module, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X).squeeze()
            test_loss += loss_fn(pred, y).item()
            correct += ((torch.round(torch.sigmoid(pred))) == y).sum().item()
            
    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [None]:
epochs = 5
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn)
print("Done!")