## Sentiment Analysis with Bag of Tricks
### (Embedding Layer, Average Pooling, Linear)

In [9]:
import torch
import torchtext
import functools
from tqdm import tqdm
import sys

import torch.nn as nn
from torch.utils.data import DataLoader
from torch.nn.functional import avg_pool2d
from torchtext.data import get_tokenizer
from torchtext.data.utils import ngrams_iterator
from torchtext.vocab import build_vocab_from_iterator
from torch.utils.data import random_split
from torchtext.data.functional import to_map_style_dataset
from torch.nn.utils.rnn import pad_sequence
from torch.optim.lr_scheduler import StepLR

### 1. Prepare Data
#### Build Vobulary

In [10]:
train_iter = torchtext.datasets.IMDB(split="train")

def yield_tokens(text_iter):
    tokenizer = get_tokenizer('spacy', language='en_core_web_sm')
    for _, text in text_iter:
        tokens = tokenizer(text)
        yield  list(ngrams_iterator(tokens, 2))

special_tokens = ['<unk>', '<pad>']        
vocab = build_vocab_from_iterator(yield_tokens(train_iter),
                                               min_freq=1,
                                               specials=special_tokens)
vocab.set_default_index(vocab['<unk>'])

#### Build Dataset and Dataloader

In [11]:
tokenizer = get_tokenizer('spacy', language='en_core_web_sm')
text_pipeline = lambda x : vocab(list(ngrams_iterator(tokenizer(x), 2)))
label_pipeline = lambda x: 0. if x=='neg' else 1.

BATCH_SIZE = 100

# Load Dataset
train_iter, test_iter = torchtext.datasets.IMDB()

train_dataset, test_dataset = to_map_style_dataset(train_iter), to_map_style_dataset(test_iter)
num_test = int(len(test_dataset)*0.90)
split_test, split_valid = random_split(test_dataset, [num_test, len(test_dataset)-num_test])

def collate_batch(batch, pad_index):
    label_list, text_list = [], []
    for (label, text) in batch:
        processed_text = torch.tensor(text_pipeline(text), dtype=torch.int64)
        text_list.append(processed_text)
        label_list.append(label_pipeline(label))
    seq_tensor = pad_sequence(text_list, batch_first=True, padding_value=pad_index)
    label_tensor = torch.tensor(label_list)
    return seq_tensor, label_tensor

pad_index = vocab["<pad>"]
collate_batch = functools.partial(collate_batch, pad_index=pad_index)

train_loader = DataLoader(dataset=train_dataset, 
                         batch_size=BATCH_SIZE,
                         shuffle=True,
                         collate_fn=collate_batch)
valid_loader = DataLoader(dataset=split_valid, 
                         batch_size=BATCH_SIZE,
                         shuffle=False,
                         collate_fn=collate_batch)
test_loader = DataLoader(dataset=split_test, 
                         batch_size=BATCH_SIZE,
                         shuffle=False,
                         collate_fn=collate_batch)

### 2. Define Model

In [12]:
class FastText(torch.nn.Module):
    def __init__(self, vocab_size, embedding_dim, pad_idx, output_dim):       
        super().__init__()        
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=pad_idx)      
        self.fc = nn.Linear(embedding_dim, output_dim)
        
    def forward(self, text):      
        #text = [sent len, batch size]       
        embedded = self.embedding(text)                
        #embedded = [batch size, sent len, emb dim]      
        pooled = avg_pool2d(embedded, (embedded.shape[1], 1)).squeeze(1)       
        #pooled = [batch size, embedding_dim]               
        return self.fc(pooled)
    

### Build and Train
#### Defien Hpyerdimensions

In [16]:
EMBED_DIM = 128
vocab_size = len(vocab)
OUTPUT_DIM = 1
pad_index = vocab['<pad>']

classifier = FastText(vocab_size, EMBED_DIM, OUTPUT_DIM, pad_index) 

UNK_IDX = vocab["<unk>"]
PAD_IDX = vocab["<pad>"]
classifier.embedding.weight.data[UNK_IDX] = torch.zeros(EMBED_DIM)
classifier.embedding.weight.data[PAD_IDX] = torch.zeros(EMBED_DIM)

#### Train Model

In [17]:
def train(dataloader, model):
    for texts, labels in tqdm(dataloader, desc='training...', file=sys.stdout):
        optimizer.zero_grad()
        outputs = model(texts)
        outputs = outputs.reshape(-1)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
def evaluate(dataloader, model):
    n_samples, n_accurates = 0, 0
    with torch.no_grad():
        for texts, labels in dataloader:
            outputs = model(texts)
            outputs = outputs.reshape(-1)
            n_samples += labels.size(0)
            n_accurates += (torch.round(outputs)==labels).sum().item()
    return n_accurates/n_samples

In [18]:
N_EPOCHS = 1

# Criterion, Optimizer, learning rate scheduler

criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(classifier.parameters())

for epoch in range(1, N_EPOCHS+1):
    train(train_loader, classifier)
    accu_train = evaluate(train_loader, classifier)
    accu_val = evaluate(valid_loader, classifier)
    print(f"| Epoch: {epoch}/{N_EPOCHS} | train_accuracy: {accu_train: .3f} | val_accuracy :  {accu_val: .3f}")
    
    # Test with test set
accu_test = evaluate(test_loader, classifier)
print('='*60)
print(f"Test Accuracy: {accu_test: .3f}")

training...: 100%|███████████████████████████████████████████████████████████████████| 250/250 [12:35<00:00,  3.02s/it]
| Epoch: 1/1 | train_accuracy:  0.502 | val_accuracy :   0.484
Test Accuracy:  0.502
