# NMSU CSCI-5435 Assignment 4 Task 1 - LSTM with retrained tokenizer

## Relevent Information

In [1]:
#Name:               Tianjie Chen
#Email:              tvc5586@nmsu.edu
#File Creation Date: Mar/17/2025
#Purpose of File:    NMSU CSCI-5435 Assignment 4 Task 1
#Last Edit Date:     Mar/17/2025
#Last Edit Note:     File creation
#GenAI used:         False

## Load Libraries

In [11]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import gensim.downloader
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score
from torch.autograd import Variable

In [12]:
from huggingface_hub import login

login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

## Setup

In [4]:
# USING GPU
print(torch.cuda.device_count())
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

1


In [5]:
batch_size = 64  # BATCH SIZE FOR THIS MODEL
epochs     = 5   # Number of training epochs

In [6]:
DATA_PATH = "News_Category_Dataset_v2.json"

train_data = pd.read_json(DATA_PATH, lines = True)['short_description']

## Preprocessing

In [7]:
train_data = train_data.str.lower()
train_data = train_data.str.replace("-", " ", regex=True)
train_data = train_data.str.replace(r"[^'\&\w\s]", "", regex=True)
train_data = train_data.str.strip()
train_data = [" ".join(["<start>", x, "<end>"]) for x in train_data]

In [8]:
print(train_data[0])

<start> she left her husband he killed their children just another day in america <end>


## LSTM

### Re-train Tokenizer

#### Tokenization

In [9]:
class Vocab:
    def __init__(self, list_of_sentence, tokenization, special_token, max_tokens=None):
        # count vocab frequency
        vocab_freq = {}
        tokens = tokenization(list_of_sentence)
        for t in tokens:
            for vocab in t:
                if vocab not in vocab_freq:
                    vocab_freq[vocab] = 0
                vocab_freq[vocab] += 1
        # sort by frequency
        vocab_freq = {k: v for k, v in sorted(vocab_freq.items(), key=lambda i: i[1], reverse=True)}
        # create vocab list
        self.vocabs = special_token + list(vocab_freq.keys())
        if max_tokens:
            self.vocabs = self.vocabs[:max_tokens]
        self.stoi = {v: i for i, v in enumerate(self.vocabs)}

    def _get_tokens(self, list_of_sentence):
        for sentence in list_of_sentence:
            tokens = tokenizer.tokenize(sentence)
            yield tokens

    def get_itos(self):
        return self.vocabs

    def get_stoi(self):
        return self.stoi

    def append_token(self, token):
        self.vocabs.append(token)
        self.stoi = {v: i for i, v in enumerate(self.vocabs)}

    def __call__(self, list_of_tokens):
        def get_token_index(token):
            if token in self.stoi:
                return self.stoi[token]
            else:
                return 0
        return [get_token_index(t) for t in list_of_tokens]

    def __len__(self):
        return len(self.vocabs)

In [10]:
from transformers import BertTokenizerFast

training_corpus = (
    train_data[i : i + 1000]
    for i in range(0, len(train_data), 1000)
)

# create tokenizer
tokenizer = BertTokenizerFast.from_pretrained('bert-base-cased')

# Train tokenizer
tokenizer = tokenizer.train_new_from_iterator(training_corpus, 52000)

tokenizer.add_tokens(["<start>", "<end>"])

# define tokenization function
def yield_tokens(data):
    for text in data:
        tokens = tokenizer.tokenize(text)
        yield tokens

max_word = 50000

# build vocabulary list
vocab = Vocab(
    train_data,
    tokenization=yield_tokens,
    special_token=["<unk>"],
    max_tokens=max_word,
)

# get list for index-to-word, and word-to-index.
itos = vocab.get_itos()
stoi = vocab.get_stoi()

# Add <pad> token
vocab.append_token("<pad>")

'(MaxRetryError('HTTPSConnectionPool(host=\'huggingface.co\', port=443): Max retries exceeded with url: /bert-base-cased/resolve/main/tokenizer_config.json (Caused by NameResolutionError("<urllib3.connection.HTTPSConnection object at 0x7a265326ee40>: Failed to resolve \'huggingface.co\' ([Errno -3] Temporary failure in name resolution)"))'), '(Request ID: fb5d9b49-6f03-48dc-8975-9af5513d2a89)')' thrown while requesting HEAD https://huggingface.co/bert-base-cased/resolve/main/tokenizer_config.json
Retrying in 1s [Retry 1/5].
'(MaxRetryError('HTTPSConnectionPool(host=\'huggingface.co\', port=443): Max retries exceeded with url: /bert-base-cased/resolve/main/tokenizer_config.json (Caused by NameResolutionError("<urllib3.connection.HTTPSConnection object at 0x7a265326fb30>: Failed to resolve \'huggingface.co\' ([Errno -3] Temporary failure in name resolution)"))'), '(Request ID: 60d74054-5700-4bc3-9d49-35e9672a221e)')' thrown while requesting HEAD https://huggingface.co/bert-base-cased/res

KeyboardInterrupt: 

#### Configure Model

In [None]:
class LSTM_LM(nn.Module):
    def __init__(self, vocab_size, seq_len, embedding_dim, rnn_units, padding_idx):
        super().__init__()

        self.seq_len = seq_len
        self.padding_idx = padding_idx

        self.embedding = nn.Embedding(
            vocab_size,
            embedding_dim,
            padding_idx=padding_idx,
        )
        self.LSTM = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=rnn_units,
            num_layers=1,
            batch_first=True,
        )
        self.classify = nn.Linear(rnn_units, vocab_size)

    def forward(self, inputs, states=None, return_final_state=False):
        # embedding
        #   --> (batch_size, seq_len, embedding_dim)
        outs = self.embedding(inputs)
        # build "lengths" property to pack inputs (see above)
        lengths = (inputs != self.padding_idx).int().sum(dim=1, keepdim=False)
        # pack inputs for RNN
        packed_inputs = torch.nn.utils.rnn.pack_padded_sequence(
            outs,
            lengths.cpu(),
            batch_first=True,
            enforce_sorted=False,
        )
        # apply RNN
        if states is None:
            packed_outs, final_state = self.LSTM(packed_inputs)
        else:
            packed_outs, final_state = self.LSTM(packed_inputs, states)
        # unpack results
        #   --> (batch_size, seq_len, rnn_units)
        outs, _ = torch.nn.utils.rnn.pad_packed_sequence(
            packed_outs,
            batch_first=True,
            padding_value=0.0,
            total_length=self.seq_len,
        )
        # apply feed-forward to classify
        #   --> (batch_size, seq_len, vocab_size)
        logits = self.classify(outs)
        # return results
        if return_final_state:
            return logits, final_state  # This is used in prediction
        else:
            return logits               # This is used in training

#### Train Model

In [None]:
embedding_dim = 64
rnn_units = 512
max_seq_len = 256
pad_index = vocab.__len__() - 1

In [None]:
def collate_batch(batch):
    label_list, feature_list = [], []
    for text in batch:
        # tokenize to a list of word's indices
        tokens = vocab(tokenizer.tokenize(text))
        # separate into features and labels
        y = tokens[1:]
        y.append(-100)
        x = tokens
        # limit length to max_seq_len
        y = y[:max_seq_len]
        x = x[:max_seq_len]
        # pad features and labels
        y += [-100] * (max_seq_len - len(y))
        x += [pad_index] * (max_seq_len - len(x))
        # add to list
        label_list.append(y)
        feature_list.append(x)
    # convert to tensor
    label_list = torch.tensor(label_list, dtype=torch.int64).to(device)
    feature_list = torch.tensor(feature_list, dtype=torch.int64).to(device)
    return label_list, feature_list

In [None]:
dataloader = DataLoader(
    train_data,
    batch_size=batch_size,
    shuffle=True,
    collate_fn=collate_batch
)

model = LSTM_LM(
    vocab_size=vocab.__len__(),
    seq_len=max_seq_len,
    embedding_dim=embedding_dim,
    rnn_units=rnn_units,
    padding_idx=pad_index).to(device)

optimizer = torch.optim.AdamW(model.parameters(), lr=0.001)

for epoch in range(epochs):
    for labels, seqs in dataloader:
        # optimize
        optimizer.zero_grad()
        logits = model(seqs.to(device))
        loss = F.cross_entropy(logits.transpose(1,2), labels.to(device))
        loss.backward()
        optimizer.step()
        # calculate accuracy
        pred_labels = logits.argmax(dim=2)
        num_correct = (pred_labels == labels).float().sum()
        num_total = (labels != -100).float().sum()
        accuracy = num_correct / num_total
        print("Epoch {} - loss: {:2.4f} - accuracy: {:2.4f}".format(epoch+1, loss.item(), accuracy), end="\r")
    print("")

#### Text Generation

In [None]:
end_index = stoi["<end>"]
max_output = 128

def pred_output(text):
    generated_text = "<start> " + text
    _, inputs = collate_batch([generated_text])
    mask = (inputs != pad_index).int()
    last_idx = mask[0].sum() - 1
    final_states = None
    outputs, final_states = model(inputs, final_states, return_final_state=True)
    pred_index = outputs[0][last_idx].argmax()
    for loop in range(max_output):
        generated_text += " "
        next_word = itos[pred_index]
        generated_text += next_word
        if pred_index.item() == end_index:
            break
        _, inputs = collate_batch([next_word])
        outputs, final_states = model(inputs, final_states, return_final_state=True)
        pred_index = outputs[0][0].argmax()
    return generated_text

In [None]:
print(pred_output("in the united states president"))
print(pred_output("the man has accused by"))
print(pred_output("now he was expected to"))