In [None]:
import torch
from torch import nn, optim
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, f1_score, roc_auc_score
import nltk
nltk.download("punkt", download_dir="nltk-tokenizers")
nltk.download("punkt_tab", download_dir="nltk-tokenizers")
from nltk.tokenize import word_tokenize
import numpy as np
import json
import re
from collections import Counter

device = "cuda" if torch.cuda.is_available() else "cpu"
MAX_ROWS = 1_000_000 # limiting to not crash
MAX_FEATURES = 4096 # total number of known tokens

In [None]:
NON_ALPHANUM = re.compile(r"[\W]")
NON_ASCII = re.compile(r"[^a-z0-9\s]")


def normalize_text(text):
    text = NON_ALPHANUM.sub(" ", text.lower())
    text = NON_ASCII.sub("", text)
    return text


def load_data(filepath):
    labels = []
    texts = []

    for i, line in enumerate(open(filepath)):
        if i == MAX_ROWS:
            break
        labels.append(int(line[9]) - 1)
        texts.append(normalize_text(line[10:]).strip())
    
    return labels, texts


train_labels, train_texts = load_data("reviews-dataset/train.ft.txt")
test_labels, test_texts = load_data("reviews-dataset/test.ft.txt")
print(f"{len(train_texts) = }")
print(f"{len(test_texts) = }")

In [None]:
def tokenize_text(text):
    return word_tokenize(text)


def make_vocab():
    counter = Counter(
        tk
        for text in train_texts
        for tk in tokenize_text(text)
    )
    vocab = {
        tk: i+2 # 0 and 1 are reserved
        for i, (tk, _) in enumerate(counter.most_common(MAX_FEATURES-2))
    }
    vocab['<PADDING>'] = 0
    vocab['<UNKOWN>'] = 1
    return vocab


vocab = make_vocab()
def encode_text(text):
    return [
        vocab.get(tk, 1)
        for tk in tokenize_text(text)
    ]


class CustomDataset(Dataset):

    def __init__(self, texts, labels):
        self.texts = [torch.tensor(encode_text(text)) for text in texts]
        self.labels = torch.tensor(labels).float()
    
    def __len__(self):
        return len(self.labels)

    def __getitem__(self, index):
        return self.texts[index], self.labels[index]


def collate_fn(batch):
    texts, labels = zip(*batch)
    return pad_sequence(texts, batch_first=True), torch.tensor(labels).float()


train_ds = CustomDataset(train_texts, train_labels)
test_ds = CustomDataset(test_texts, test_labels)
train_dl = DataLoader(train_ds, batch_size=128, shuffle=True, collate_fn=collate_fn)
test_dl = DataLoader(test_ds, batch_size=128, shuffle=False, collate_fn=collate_fn)
print(f"{len(train_ds) = }")
print(f"{len(test_ds) = }")

In [None]:
class ClassifierModel(nn.Module):

    def __init__(self, vocab_size, emb=64):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, emb)
        self.conv1 = nn.Conv1d(emb, 64, kernel_size=3, padding=1)
        self.pool1 = nn.MaxPool1d(2)
        self.conv2 = nn.Conv1d(64, 64, kernel_size=3, padding=1)
        self.pool2 = nn.MaxPool1d(2)
        self.conv3 = nn.Conv1d(64, 64, kernel_size=3, padding=1)
        self.globpool = nn.AdaptiveMaxPool1d(1)
        self.fc1 = nn.Linear(64, 100)
        self.fc2 = nn.Linear(100, 1)

    def forward(self, x):
        x = self.embed(x).transpose(1, 2)
        x = self.pool1(torch.relu(self.conv1(x)))
        x = self.pool2(torch.relu(self.conv2(x)))
        x = torch.relu(self.conv3(x))
        x = self.globpool(x).squeeze(2)
        x = torch.relu(self.fc1(x))
        return torch.sigmoid(self.fc2(x)).squeeze(1)

In [None]:
def train(model, epochs=3):
    optimizer = optim.Adam(model.parameters())
    crierion = nn.BCELoss()

    for e in range(epochs):
        model.train()
        for xb, yb in train_dl:
            xb, yb = xb.to(device), yb.to(device)
            pred = model(xb)
            loss = crierion(pred, yb)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

        print(f"Epoch {e+1} done")
        evaluate(model, test_dl)


def evaluate(model, loader):
    model.eval()
    preds = []
    ys = []
    with torch.no_grad():
        for xb, yb in loader:
            xb = xb.to(device)
            pred = model(xb).cpu().numpy()
            preds.extend(pred)
            ys.extend(yb.numpy())

    preds, ys = np.array(preds), np.array(ys)
    print(f"Test Acc: {accuracy_score(ys, preds>0.5):.4f}, F1: {f1_score(ys, preds>0.5):.4f}, AUC: {roc_auc_score(ys, preds):.4f}")

In [None]:
classifier = ClassifierModel(len(vocab)).to(device)
print(classifier)
train(classifier, epochs=1)

In [None]:
scripted_model = torch.jit.script(classifier)
torch.jit.save(scripted_model, "sentiment-analysis.pt")
json.dump(vocab, open("vocab.json", 'w'))