In [194]:
import torch
from torch import nn
from torch.optim import Adam
from torch.utils.data import DataLoader, Dataset
from sqlalchemy.orm import Session
from src.sql.models import WikiArticle
from src.sql import engine
from nltk.tokenize import word_tokenize
from typing import List, Tuple, Dict
import numpy as np
import re
from tqdm import tqdm

In [195]:
def clean_str(string):
    # remove punctuation
    string = re.sub(r"[^A-Za-z]", " ", string)
    # remove extra spaces
    string = re.sub(r"\s{2,}", " ", string)
    return string.strip().lower()

In [196]:
def load_glove_data():
    p = "data/glove.6B/glove.6B.50d.txt"
    word2idx: Dict[str, int] = {}
    idx2word: Dict[int, str] = {}
    embs: List[List[float]] = []
    with open(p, "r", encoding="utf-8") as file:
        i = 0
        for l in file:
            l = l.split()
            word, emb = l[0], l[1:]
            emb = [float(x) for x in emb]
            word2idx[word] = i
            idx2word[i] = word
            embs.append(emb)
            i += 1
        
    
    return word2idx, idx2word, np.array(embs)

In [197]:
def get_train_test_ids(test_portion_size: float = 0.1):
    with Session(engine) as session:
        _ids = []
        items = session.query(WikiArticle.id).limit(1000).all()
        for item in items:
            _id = item[0]
            _ids.append((_id, "standard"))
            _ids.append((_id, "simple"))
    train_size = 1 - test_portion_size
    train = _ids[:int(len(_ids) * train_size)]
    test = _ids[int(len(_ids) * train_size):]
    return train, test

In [198]:
def build_bocabulary(train_ids: List[Tuple[int, str]], glove_word2idx:Dict[str,int],  glove_idx2word: Dict[int, str]):
    word2idx: Dict[str, int] = {}
    idx2word = {}
    
    for idx, word in glove_idx2word.items():
        word2idx[word] = idx
        idx2word[idx] = word
    
    voc = set()
    max_tokens = 0
    with Session(engine) as session:
        for _id, level in tqdm(train_ids):
            doc = session.query(WikiArticle).filter_by(id=_id).first()
            text = doc.simple_text if level == "simple" else doc.standard_text
            text = clean_str(text)
            tokens = word_tokenize(text)
            for token in tokens:
                voc.add(token)
            max_tokens = max(max_tokens, len(tokens))
    
    unprocessed = []        
    for word in voc:
        glove_idx = glove_word2idx.get(word)
        if glove_idx is None:
            unprocessed.append(word)
    
    i = len(word2idx)
    for word in unprocessed:
        while idx2word.get(i) is not None:
            i += 1
        word2idx[word] = i
        idx2word[i] = word
        i += 1
    
    unk_idx = len(word2idx)
    while idx2word.get(unk_idx) is not None:
        unk_idx += 1
    unk = "<UNK>"
    word2idx[unk] = unk_idx
    idx2word[unk_idx] = unk
    
    pad_idx = len(word2idx)
    while idx2word.get(pad_idx) is not None:
        pad_idx += 1
    pad = "<PAD>"
    word2idx[pad] = pad_idx
    idx2word[pad_idx] = pad
    
    return word2idx, idx2word, unk_idx, pad_idx, max_tokens



In [199]:
def random_vector(size:int) -> torch.Tensor:
    return torch.rand(size)

def get_embedding_weights(embs: np.ndarray, idx2word: Dict[int, str]):
    embedding_dim = 50
    vocab_size = len(idx2word)
    weight_matrix = torch.zeros((vocab_size, embedding_dim))
    for idx in idx2word:
        if idx < len(embs):
            weight_matrix[idx] = torch.tensor(embs[idx])
        else:
            weight_matrix[idx] = random_vector(embedding_dim)
    return weight_matrix

In [200]:
class WikiArticleDataset(Dataset):
    def __init__(
        self, ids: List[Tuple[int, str]], 
        word2idx: Dict[str, int], 
        unk_tok_idx: int, 
        pad_token_idx: int,
        pad_to_size: int = 1000,
    ) -> None:
        super().__init__()
        self._ids = ids
        self.word2idx = word2idx
        self.unk_idx = unk_tok_idx
        self.pad_size = pad_to_size
        self.pad_idx = pad_token_idx
    
    def __len__(self):
        return len(self._ids)

    def __getitem__(self, index) -> Tuple[str, str]:
        _id, level = self._ids[index]
        with Session(engine) as session:
            item = session.query(WikiArticle).filter_by(id=_id).first()
            if level == "standard":
                text = item.standard_text
            else:
                text = item.simple_text
        text = clean_str(text)
        toks = word_tokenize(text, language="english")
        tok_idx = [self.word2idx.get(tok, self.unk_idx) for tok in toks]
        tok_idx = torch.tensor(tok_idx)
        
        if len(tok_idx) < self.pad_size:
            pad = torch.tensor([self.pad_idx] * (self.pad_size - len(tok_idx)))
            tok_idx = torch.cat([tok_idx, pad])
        elif len(tok_idx) > self.pad_size:
            tok_idx = tok_idx[:self.pad_size]
        
        level = torch.tensor([1]) if level == "standard" else torch.tensor([0])
        return tok_idx, level

In [201]:
class Classifier(nn.Module):
    def __init__(
        self,
        hidden_size: int = 100,
        glove_embs: np.ndarray = None,
        word2idx: Dict[str, int] = None,
        idx2word: Dict[int, str] = None,
    ) -> None:
        super().__init__()
        self.glove_embs = glove_embs
        self.word2idx = word2idx
        self.idx2word = idx2word
        self.emb_size = 50
        self.embedding = nn.Embedding(len(idx2word), self.emb_size)
        self._load_embedding_weight()
        self.lstm = nn.LSTM(
            input_size=self.emb_size,
            hidden_size=hidden_size,
            num_layers=1,
            batch_first=True,
        )
        
        self.emb_to_hidden = nn.Linear(self.emb_size, hidden_size)
        self.hidden_to_hidden = nn.Linear(hidden_size, hidden_size)
        self.dropout = nn.Dropout(0.5)
        self.hidden_to_output = nn.Linear(hidden_size, 2)
        self.softmax = nn.LogSoftmax(dim=1)
        
    def _load_embedding_weight(self):
        weight = get_embedding_weights(self.glove_embs, self.idx2word)
        self.embedding.load_state_dict({"weight": weight})
        self.embedding.weight.requires_grad = False
    
    def forward(self, document_batch: torch.Tensor):
        document_batch = self.embedding(document_batch)
        lstm_out, _ = self.lstm(document_batch)
        lstm_out = lstm_out[:, -1, :]
        # lstm_out = self.dropout(lstm_out)
        out = self.hidden_to_output(lstm_out)
        out = self.softmax(out)
        return out

In [202]:
def train(
    model: Classifier, 
    train_loader: DataLoader, 
    optimizer: torch.optim.Optimizer, 
    criterion: nn.Module,
    epochs: int = 10,
):
    for (document_batch, level_batch) in tqdm(train_loader):
        optimizer.zero_grad()
        out = model(document_batch)
        # combine losses
        loss = 0
        for i in range(out.shape[0]):
            prediction = out[i].unsqueeze(0)
            correct = level_batch[i]
            loss += criterion(prediction, correct)
        loss = loss / out.shape[0]
        tqdm.set_description(f"Loss: {loss.item()}")
        loss.backward()
        optimizer.step()

In [203]:
def evaluate(model: Classifier, test_dataloader: DataLoader):
    correct = 0
    total = 0
    with torch.no_grad():
        for document_batch, level_batch in test_dataloader:
            out = model(document_batch)
            for i in range(out.shape[0]):
                out_doc: torch.Tensor = out[i]
                prediction = out_doc.argmax().item()
                correct_answer = level_batch[i].item()
                if prediction == correct_answer:
                    correct += 1
                total += 1
    print(f"Accuracy: {correct / total}")

In [204]:
train_ids, test_ids = get_train_test_ids()
glove_word2idx, glove_idx2word, glove_embs = load_glove_data()
vocab_word2idx, vocab_idx2word, unk_idx, pad_idx, max_tokens = build_bocabulary(
    train_ids, glove_word2idx, glove_idx2word)



100%|██████████| 1800/1800 [00:11<00:00, 157.71it/s]


In [205]:

train_dataset = WikiArticleDataset(train_ids, vocab_word2idx, unk_idx, pad_idx, pad_to_size=max_tokens)
test_dataset = WikiArticleDataset(test_ids, vocab_word2idx, unk_idx, pad_idx, pad_to_size=max_tokens)
train_dataloader = DataLoader(train_dataset, batch_size=10, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=10, shuffle=True)

In [206]:
model = Classifier(
    glove_embs=glove_embs, 
    word2idx=vocab_word2idx, 
    idx2word=vocab_idx2word
)
optimizer = Adam(model.parameters(), lr=0.001)
criterion = nn.NLLLoss()

train(model, train_dataloader, optimizer, criterion)

Accuracy: 0.5


 51%|█████     | 92/180 [13:08<12:34,  8.58s/it]


KeyboardInterrupt: 

In [None]:
evaluate(model, test_dataloader)

Accuracy: 0.5
