# Simple NN for text classification

Download IMDB data as it is described [here](https://towardsdatascience.com/sentiment-analysis-with-python-part-1-5ce197074184).

In [None]:
reviews_train = []
for line in open("../data/imdb/full_train.txt", "r"):
    reviews_train.append(line.strip())
    
reviews_test = []
for line in open("../data/imdb/full_test.txt", "r"):
    reviews_test.append(line.strip())

In [None]:
train_target = [1 if i < 12500 else 0 for i in range(25000)]
test_target = [1 if i < 12500 else 0 for i in range(25000)]

In [None]:
print(f"Train size – {len(reviews_train)}")
print(f"Test size – {len(reviews_test)}")

In [None]:
import torch
import torch.nn as nn

class SentimentClassificationModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_class):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim) # 100000x300
        self.fc1 = nn.Linear(embed_dim, embed_dim) # 300x300
        self.fc2 = nn.Linear(embed_dim, num_class) # 300x2
        self.init_weights()

    def init_weights(self):
        init_range = 0.5
        self.embedding.weight.data.uniform_(-init_range, init_range)
        self.fc1.weight.data.uniform_(-init_range, init_range)
        self.fc1.bias.data.zero_()
        self.fc2.weight.data.uniform_(-init_range, init_range)
        self.fc2.bias.data.zero_()

    def forward(self, text): # [[1,2,3,4],[3,4,5,6,2]]
        embedded = self.embedding(text) # 2x5x300
        fc1_output = self.fc1(embedded) # 2x5x300
        pooled_output, _ = fc1_output.max(dim=1) # 2x300
        return self.fc2(pooled_output) # 2x2 [[0.7, -0.9],[0.1, 1.6]]

In [None]:
from itertools import chain

train_tokens = list(chain(*[[token for token in sample.lower().split()] for sample in reviews_train]))

In [None]:
from collections import Counter

train_vocabulary = Counter(train_tokens)

In [None]:
train_vocabulary.most_common(10)

In [None]:
len(train_vocabulary)

In [None]:
UNKNOWN_TOKEN = "unknown"
PADDING_TOKEN = "padding"

index_to_token = [PADDING_TOKEN, UNKNOWN_TOKEN] + list(train_vocabulary.keys())
token_to_index = {token: index + 2 for index, token in enumerate(train_vocabulary.keys())}
token_to_index[PADDING_TOKEN] = 0
token_to_index[UNKNOWN_TOKEN] = 1

In [None]:
token_to_index["hi"]

In [None]:
index_to_token[24615]

In [None]:
EMBEDDING_DIM = 100
BATCH_SIZE = 64
MAX_INPUT_LENGTH = 100

### Define batching

In [None]:
def generate_batch(input_data, max_length = MAX_INPUT_LENGTH):
    texts = torch.tensor([padding(sample["text"], max_length) for sample in input_data], dtype=torch.long)
    labels = torch.tensor([sample["label"] for sample in input_data], dtype=torch.long)
    return texts, labels

def padding(text_tokens, max_length, padding_token = 0):
    if len(text_tokens) >= max_length:
        return text_tokens[:max_length]
    return text_tokens + [padding_token]*(max_length - len(text_tokens))

In [None]:
prepared_data = []

for label, text in zip(train_target, reviews_train):
    text_tokens = [token_to_index[token.lower()] for token in text.split()]
    prepared_data.append({"label": label, "text": text_tokens})

### Train and test strategy

In [None]:
from torch.utils.data import DataLoader

def train(input_data):

    # Train the model
    train_loss = 0
    train_acc = 0
    data = DataLoader(input_data, batch_size=BATCH_SIZE, shuffle=True, collate_fn=generate_batch)
    for i, (text, label) in enumerate(data):
        optimizer.zero_grad()
        text, label = text.to(device), label.to(device)
        output = model(text)
        loss = criterion(output, label)
        train_loss += loss.item()
        loss.backward()
        optimizer.step()
        train_acc += (output.argmax(1) == label).sum().item()

    return train_loss / len(data), train_acc / len(input_data)

def test(input_data):
    test_loss = 0
    acc = 0
    data = DataLoader(input_data, batch_size=BATCH_SIZE, collate_fn=generate_batch)
    for text, label in data:
        text, label = text.to(device), label.to(device)
        with torch.no_grad():
            output = model(text)
            loss = criterion(output, label)
            test_loss += loss.item()
            acc += (output.argmax(1) == label).sum().item()

    return test_loss / len(data), acc / len(input_data)

## Training

In [None]:
import random
import numpy

def set_seed(seed: int, n_gpu: int):
    random.seed(seed)
    numpy.random.seed(seed)
    torch.manual_seed(seed)
    if n_gpu > 0:
        torch.cuda.manual_seed_all(seed)

In [None]:
set_seed(42, 1)

In [None]:
device = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")

In [None]:
model = SentimentClassificationModel(vocab_size=len(index_to_token), embed_dim=EMBEDDING_DIM, num_class=len(set(train_target)))
model.to(device)

In [None]:
import time
from torch.utils.data.dataset import random_split

N_EPOCHS = 5
min_valid_loss = float('inf')

criterion = torch.nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=0.1)

train_len = int(len(prepared_data) * 0.95)
train_data, validation_data = \
    random_split(prepared_data, [train_len, len(prepared_data) - train_len])

for epoch in range(N_EPOCHS):

    start_time = time.time()
    train_loss, train_acc = train(train_data)
    valid_loss, valid_acc = test(validation_data)

    secs = int(time.time() - start_time)
    mins = secs / 60
    secs = secs % 60

    print('Epoch: %d' %(epoch + 1), " | time in %d minutes, %d seconds" %(mins, secs))
    print(f'\tLoss: {train_loss:.4f}(train)\t|\tAcc: {train_acc * 100:.1f}%(train)')
    print(f'\tLoss: {valid_loss:.4f}(valid)\t|\tAcc: {valid_acc * 100:.1f}%(valid)')

In [None]:
prepared_test_data = []

for label, text in zip(test_target, reviews_test):
    text_tokens = [token_to_index.get(token.lower(), token_to_index["unknown"]) for token in text.split()]
    prepared_test_data.append({"label": label, "text": text_tokens})

In [None]:
test(prepared_test_data)