# DD2417 Final Project - Dating Historical Texts - LSTM 1-Layer RNN

## Libraries + Imports

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import os
import urllib.request
import zipfile
from tqdm import tqdm

## Embeddings - Download + Setup

In [None]:
embeddings_path = "./Embeddings"


def download_progress(block_num, block_size, total_size):
    if not hasattr(download_progress, "pbar"):
        download_progress.pbar = tqdm(total=total_size, unit="B", unit_scale=True)
    download_progress.pbar.update(block_size)


if not os.path.exists(embeddings_path):
    print(f"create directory to store pre-trained glove embeddings")
    os.makedirs(embeddings_path)
    print(f"download pre-trained Glove Embeddings")
    urllib.request.urlretrieve(
        "http://nlp.stanford.edu/data/glove.6B.zip",
        "./Embeddings/glove.6B.zip",
        download_progress,
    )
    print("unpack embeddings")
    with zipfile.ZipFile("./Embeddings/glove.6B.zip", "r") as zip_ref:
        zip_ref.extractall("./Embeddings/")
    os.remove("./Embeddings/glove.6B.zip")

    print("embeddings download complete")

In [None]:
glove_6b_50_path = "./Embeddings/glove.6B.50d.txt"
glove_6b_100_path = "./Embeddings/glove.6B.100d.txt"
glove_6b_200_path = "./Embeddings/glove.6B.200d.txt"
glove_6b_300_path = "./Embeddings/glove.6B.300d.txt"
clean_train_split_path = "./Datasets/clean_train_split/"
clean_test_split_path = "./Datasets/clean_test_split"

## Tokenization - NLTk Tokenizer

In [None]:
import nltk

nltk.download("punkt_tab")
from collections import defaultdict

class HistoricalTextTokenizer:
    """
    This class defines a tokenizer adapted from the tokenizers developed by Professor Johan Boye for the DD2417 assignments
    """

    def __init__(self):
        self.word2id = defaultdict(lambda: None)
        self.id2word = defaultdict(lambda: None)
        self.latest_new_word = -1
        self.tokens_processed = 0

        self.UNKNOWN = "<unk>"
        self.PADDING_WORD = "<pad>"

        self.get_word_id(self.PADDING_WORD)
        self.get_word_id(self.UNKNOWN)

    def get_word_id(self, word):
        word = word.lower()
        if word in self.word2id:
            return self.word2id[word]
        else:
            self.latest_new_word += 1
            self.id2word[self.latest_new_word] = word
            self.word2id[word] = self.latest_new_word
            return self.latest_new_word

    """
    Process and tokenize all files at the directory level 
    """

    def process_files(self, file_or_dir):
        all_texts = []
        all_labels = []

        if os.path.isdir(file_or_dir):
            decade_dirs = sorted(
                [
                    d
                    for d in os.listdir(file_or_dir)
                    if os.path.isdir(os.path.join(file_or_dir, d))
                ]
            )
            for decade_dir in decade_dirs:
                decade_path = os.path.join(file_or_dir, decade_dir)
                decade = int(decade_dir)
                print(f"Processing decade: {decade}")
                text_files = sorted(
                    [f for f in os.listdir(decade_path) if f.endswith(".txt")]
                )
                print(f"number of files in {decade} directory: {len(text_files)}")

                for file in text_files:
                    filepath = os.path.join(decade_path, file)
                    print(f"tokenize file {file}")
                    text, labels = self.process_file(filepath, decade)
                    all_texts.extend(text)
                    all_labels.extend(labels)
        else:
            texts, labels = self.process_file(file_or_dir, 0)
            all_texts.extend(texts)
            all_labels.extend(labels)

        return all_texts, all_labels

    """
    Process and tokenize all files within a particular directory
    """

    def process_file(self, filepath, decade):
        stream = open(filepath, mode="r", encoding="utf-8", errors="ignore")
        text = stream.read()
        stream.close()

        try:
            self.tokens = nltk.word_tokenize(text)
        except LookupError:
            nltk.download("punkt")
            self.tokens = nltk.word_tokenize(text)

        for i, token in enumerate(self.tokens):
            self.tokens_processed += 1
            word_id = self.get_word_id(token)

            if self.tokens_processed % 1000000000 == 0:
                print("Processed", "{:,}".format(self.tokens_processed), "tokens")

        paragraphs = self.create_paragraphs(text)
        labels = [decade] * len(paragraphs)

        return paragraphs, labels

    """
    This function creates paragraphs of text by adapting the paragraph strategy 
    in the Deep Learning for Period Classification of Historical Texts paper.
    """

    def create_paragraphs(self, text, min_words=10, max_words=210):
        words = text.split()
        paragraphs = []
        start = 0

        while start < len(words):
            end = min(start + max_words, len(words))
            paragraph_words = words[start:end]
            if len(paragraph_words) >= min_words:
                paragraph_text = " ".join(paragraph_words)
                paragraphs.append(paragraph_text)
            start = end

        return paragraphs

    """
    Tokenize paragraphs and assign them an id
    """

    def tokenize_text_to_id(self, text):
        try:
            tokens = nltk.word_tokenize(text.lower())
        except LookupError:
            nltk.download("punkt")
            tokens = nltk.word_tokenize(text.lower())
        word_ids = []
        for token in tokens:
            if token in self.word2id:
                word_ids.append(self.word2id[token])
            else:
                word_ids.append(self.word2id[self.UNKNOWN])
        return word_ids

    def get_vocab_size(self):
        return len(self.word2id)


In [None]:
text_tokenizer = HistoricalTextTokenizer()

In [None]:
train_text_data, train_labels = text_tokenizer.process_files(clean_train_split_path)

In [None]:
test_text_data, test_labels = text_tokenizer.process_files(clean_test_split_path)

In [None]:
labels = sorted(set(train_labels + test_labels))
decade_to_label = {decade: i for i, decade in enumerate(labels)}
print(f"{decade_to_label}")

In [None]:
print(f"number of train labels -> {len(train_labels)}")
print(f"length of train text(paragraphs) -> {len(train_text_data)}")
print()

print(f"number of test labels -> {len(test_labels)}")
print(f"length of test text -> {len(test_text_data)}")
print()

print(f"train text {train_text_data[0]}")
print(f"train label {train_labels[0]}")
print()

print(f"test text(paragraphs) {test_text_data[0]}")
print(f"test label {test_labels[0]}")

In [None]:
train_sample = train_text_data[0]
train_sample_label = train_labels[0]
word_ids = text_tokenizer.tokenize_text_to_id(train_sample)

print(f"train sample -> {train_sample}")
print(f"train sample labe -> {train_sample_label}")
print(f"tokenized train_sample -> {word_ids}")
print(f"length of tokenized word {len(word_ids)}")

In [None]:
test_sample = test_text_data[0]
test_sample_label = test_labels[0]
word_ids = text_tokenizer.tokenize_text_to_id(test_sample)

print(f"test sample -> {test_sample}")
print(f"test sample label -> {test_sample_label}")
print(f"tokenized test_sample -> {word_ids}")
print(f"length of tokenized word {len(word_ids)}")

In [None]:
from collections import Counter, defaultdict
import random

"""
This function recomputes books by creating a set number of paragraphs per word and decade to ensure that 
a single book or period does not heavily weight the training dataset. Each book in each decade has a different number of paragraphs 
and thus books with more paragraphs in a particular decade can weight and skew the model's classification. 

The number of paragraphs 
created is the minimum number of paragraphs in each book in a particular decade and then further limited with the number of paragraphs 
specified by the decade paragraph. The downside to this approach is that data is lost that could be really helpful for training the model
"""

def balance_paragraphs(
    train_data, train_labels, max_paragraphs_per_book=25, decade_paragraphs=600
):
    original_paragraph_count = Counter(train_labels)
    max_paragraphs_per_book = float("inf")
    for decade, count in sorted(original_paragraph_count.items()):
        print(f"{decade}: {count} paragraphs")
        max_paragraphs_per_book = min(max_paragraphs_per_book, count)

    # group by decade
    decade_data = defaultdict(list)
    for text, label in zip(train_data, train_labels):
        decade_data[label].append(text)

    new_paragraphs = []
    new_labels = []
    for decade, texts in decade_data.items():
        book_size = 50
        new_books = [texts[i : i + book_size] for i in range(0, len(texts), book_size)]
        decade_books = []
        for book in new_books:
            if len(book) > max_paragraphs_per_book:
                sample_paragraphs = random.sample(book, max_paragraphs_per_book)
            else:
                sample_paragraphs = book
            decade_books.extend(sample_paragraphs)

        new_paragraphs.extend(decade_books)
        new_labels.extend([decade] * len(decade_books))

    # balance decades
    new_decade_data = defaultdict(list)
    for text, label in zip(new_paragraphs, new_labels):
        new_decade_data[label].append(text)

    min_paragraph_size = min(len(text) for text in new_decade_data.values())
    new_decade_paragraphs = min(decade_paragraphs, min_paragraph_size)

    balance_paragraphs = []
    balance_labels = []

    for decade in sorted(new_decade_data.keys()):
        text = new_decade_data[decade]
        if len(text) >= new_decade_paragraphs:
            sample_paragraphs = random.sample(text, new_decade_paragraphs)
        else:
            sample_paragraphs = random.choices(text, k=new_decade_paragraphs)

        balance_paragraphs.extend(sample_paragraphs)
        balance_labels.extend([decade] * len(sample_paragraphs))

    return balance_paragraphs, balance_labels

## Libraries + Imports for building neural network architecture

In [None]:
import csv
from tqdm import tqdm
import string
import codecs
import torch
import torch.optim as optim
from torch import nn
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils import clip_grad_norm_

### Tokenize Text

In [None]:
print(f"create new tokenizer")
tokenizer = HistoricalTextTokenizer()

print(f"tokenize -> {clean_train_split_path}")
original_train_data, original_train_labels = tokenizer.process_files(
    clean_train_split_path
)
print(f"succesfully tokenized <- {clean_train_split_path}")
print(
    f"balance training data -> {len(original_train_data)} and balance labels: {len(original_train_labels)}"
)
balance_train_data, balance_train_labels = balance_paragraphs(
    original_train_data, original_train_labels
)
print(f"succesfully balanced train data and labels <-")

# create train/validation splits
from sklearn.model_selection import train_test_split

print(f"create cross validation splits from the training data using {80 - 20} split")
model_train_data, model_valid_data, model_train_labels, model_valid_labels = (
    train_test_split(
        balance_train_data,
        balance_train_labels,
        test_size=0.2,
        random_state=42,
        stratify=balance_train_labels,
    )
)

print(f"tokenize -> {clean_test_split_path}")
test_data, test_labels = tokenizer.process_files(clean_test_split_path)
print(f"succesfully tokenized <- {clean_test_split_path}")

print(f"create decade labels")
decades = sorted(set(model_train_labels + test_labels))
decade_to_label = {decade: i for i, decade in enumerate(decades)}
label_to_decade = {i: decade for i, decade in enumerate(decades)}
print(f"successfully created decades labels")

UNKNOWN = "<unk>"  # Unknown char or unknown word
PADDING_WORD = "<pad>"
id_to_label = [f"decade_{decade}" for decade in decades]


def label_to_id(decade):
    return decade_to_label[decade]


### Load Glove Embeddings

In [None]:
def load_glove_embeddings(
    embedding_file, tokenizer, padding_word=PADDING_WORD, unknown_word=UNKNOWN
):
    """
    Reads Glove embeddings from a file and aligns them with tokenizer vocabulary.
    """
    glove_vectors = {}
    D = None

    with open(embedding_file, encoding="utf8") as f:
        for line in f:
            data = line.split()
            word = data[0]
            vec = [float(x) for x in data[1:]]
            if D is None:
                D = len(vec)
            glove_vectors[word] = vec

    print(f"Loaded {len(glove_vectors)} GloVe vectors with dimension {D}")

    embeddings = []
    vocab_size = len(tokenizer.word2id)
    found_in_glove = 0

    # each word in tokenizer vocabulary
    for word_id in range(vocab_size):
        word = tokenizer.id2word[word_id]

        if word == padding_word:
            embeddings.append([0] * D)
        elif word == unknown_word:
            embeddings.append([-1] * D)
        elif word in glove_vectors:
            embeddings.append(glove_vectors[word])
            found_in_glove += 1
        # if word is not in glove create a random embedding
        else:
            embeddings.append([0.1] * D)

    print(f"Found {found_in_glove}/{vocab_size} words from vocabulary in GloVe")
    print(f"Glove token Coverage: {100 * found_in_glove / vocab_size:.2f}%")

    return D, tokenizer.word2id, embeddings

### Create Dataset and Dataloaders

In [None]:
class HistoricalTextDataset(Dataset):
    """
    A class loading NER dataset from a CSV file to be used as an input
    to PyTorch DataLoader.

    The CSV file has 4 fields: sentence number (only at the start of a new
    sentence), word, POS tag (ignored), and label.

    Datapoints are sentences + associated labels for each word. If the
    words have not been seen before (i.e, they are not found in the
    'word_to_id' dict), they will be mapped to the unknown word '<UNK>'.
    """

    def __init__(self, texts, labels, word_to_id, decade_to_label):
        self.texts = texts
        self.labels = labels
        self.word_to_id = word_to_id
        self.decade_to_label = decade_to_label

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        decade = self.labels[idx]
        label_id = self.decade_to_label[decade]

        return text, label_id

In [None]:
# Let's check out some of these data structures
dim, word_to_id, embeddings = load_glove_embeddings(glove_6b_100_path, tokenizer)

print("The embedding for the word 'good' looks like this:")
print(embeddings[word_to_id["good"]])
print()

# Read the data we are going to use for testing the model
test_set = HistoricalTextDataset(test_data, test_labels, word_to_id, decade_to_label)
print("There are", len(test_set), "documents in the testset")
dp = 0
text, label = test_set[dp]
print("Document", dp, "starts with:", text[:100], "...")
print("It has the label", label, "which corresponds to decade", label_to_decade[label])

In [None]:
def pad_sequence_documents(batch, padding_word=PADDING_WORD):
    """
    Dynamic Padding
    """
    batch_data, batch_labels = zip(*batch)

    # Convert documents to word ID sequences
    batch_sequences = []
    for text in batch_data:
        word_ids = tokenizer.tokenize_text_to_id(text)
        batch_sequences.append(word_ids)
    max_len = max(map(len, batch_sequences))
    padding_id = tokenizer.word2id[padding_word]

    padded_data = [
        [seq[i] if i < len(seq) else padding_id for i in range(max_len)]
        for seq in batch_sequences
    ]

    return padded_data, list(batch_labels)

In [None]:
x = [(model_train_data[0], model_train_labels[0])]
pad_sequence_documents(x)

### LSTM Architecture

In [None]:
class DocumentClassifier(nn.Module):
    def __init__(
        self,
        word_embeddings,  # Pre-trained word embeddings
        word_to_id,  # Mapping from words to ids
        num_classes,
        word_hidden_size=128,
        padding_word=PADDING_WORD,
        unknown_word=UNKNOWN,
        dropout_rate=0.3,
        num_layers=1,
        device="cpu",
    ):
        super(DocumentClassifier, self).__init__()
        self.padding_word = padding_word
        self.unknown_word = unknown_word
        self.word_to_id = word_to_id
        self.word_hidden_size = word_hidden_size
        self.device = device
        self.num_classes = num_classes
        self.dropout_rate = dropout_rate
        self.num_layers = num_layers

        # Create an embedding tensor for the words and import the Glove embeddings
        vocabulary_size = len(word_embeddings)
        self.word_emb_size = len(word_embeddings[0])

        self.word_emb = nn.Embedding(vocabulary_size, self.word_emb_size)
        self.word_emb.weight = nn.Parameter(
            torch.tensor(word_embeddings, dtype=torch.float), requires_grad=False
        )
        self.embedding_dropout = nn.Dropout(dropout_rate * 0.3)
        self.output_dropout = nn.Dropout(dropout_rate)

        # 1-layer LSTM
        self.word_lstm = nn.LSTM(
            self.word_emb_size,
            self.word_hidden_size,
            num_layers=1,
            batch_first=True,
            dropout=0,
        )

        # Document Classification
        self.final_pred = nn.Linear(self.word_hidden_size, num_classes)

    def forward(self, x):
        batch_size, seq_length = x.shape
        word_embeddings = self.word_emb(x)
        word_embeddings = self.embedding_dropout(word_embeddings)

        lstm_output, (hidden, cell) = self.word_lstm(word_embeddings)

        complete_doc = hidden[0]

        complete_doc = self.output_dropout(complete_doc)
        logits = self.final_pred(complete_doc)

        return logits

### Test on small subset of the data

In [None]:
# # ================== Hyper-parameters ==================== #

learning_rate = 0.01
epochs = 10
batch_size = 8

if torch.backends.mps.is_available():
    device = "mps"
    print("Running on MGPU")
elif torch.cuda.is_available():
    device = "cuda"
    print("Running on CUDA")
else:
    device = "cpu"
    print("Running on CPU")

dim, word_to_id, embeddings = load_glove_embeddings(glove_6b_300_path, tokenizer)

print("Using first 1000 documents for testing...")
train_data_small = model_train_data[:1000]
train_labels_small = model_train_labels[:1000]
valid_data_small = model_valid_data[:200]
valid_labels_small = model_valid_labels[:200]
test_data_small = test_data[:100]
test_labels_small = test_labels[:100]

training_set = HistoricalTextDataset(
    train_data_small, train_labels_small, word_to_id, decade_to_label
)
validation_set = HistoricalTextDataset(
    valid_data_small, valid_labels_small, word_to_id, decade_to_label
)
test_set = HistoricalTextDataset(
    test_data_small, test_labels_small, word_to_id, decade_to_label
)

training_loader = DataLoader(
    training_set, batch_size=batch_size, collate_fn=pad_sequence_documents
)
validation_loader = DataLoader(
    validation_set,
    batch_size=batch_size,
    shuffle=False,
    collate_fn=pad_sequence_documents,
)
test_loader = DataLoader(
    test_set, batch_size=batch_size, collate_fn=pad_sequence_documents
)

print(f"Training on {len(training_set)} documents")
print(f"Validation on {len(validation_set)} documents")
print(f"Testing on {len(test_set)} documents")

lstm_classifier = DocumentClassifier(
    word_embeddings=embeddings,
    word_to_id=word_to_id,
    num_classes=len(decades),
    word_hidden_size=64,
    dropout_rate=0.2,
    num_layers=1,
    device=device,
).to(device)

optimizer = optim.Adam(lstm_classifier.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=2, gamma=0.5)
criterion = nn.CrossEntropyLoss()

for epoch in range(epochs):
    lstm_classifier.train()
    epoch_loss = 0
    correct = 0
    total = 0

    for x, y in tqdm(training_loader, desc="Epoch {}".format(epoch + 1)):
        x = torch.tensor(x, dtype=torch.long).to(device)
        y = torch.tensor(y, dtype=torch.long).to(device)

        optimizer.zero_grad()
        logits = lstm_classifier(x)
        loss = criterion(logits, y)
        loss.backward()
        clip_grad_norm_(lstm_classifier.parameters(), 1.0)
        optimizer.step()

        epoch_loss += loss.item()
        _, predicted = torch.max(logits.data, 1)
        total += y.size(0)
        correct += (predicted == y).sum().item()

    lstm_classifier.eval()
    val_loss = 0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for x, y in validation_loader:
            x = torch.tensor(x, dtype=torch.long).to(device)
            y = torch.tensor(y, dtype=torch.long).to(device)

            logits = lstm_classifier(x)
            loss = criterion(logits, y)

            val_loss += loss.item()
            _, predicted = torch.max(logits.data, 1)
            val_total += y.size(0)
            val_correct += (predicted == y).sum().item()

    # Calculate metrics
    val_acc = 100 * val_correct / val_total
    avg_val_loss = val_loss / len(validation_loader)
    train_acc = 100 * correct / total
    avg_loss = epoch_loss / len(training_loader)

    scheduler.step()

    print(
        f"Epoch [{epoch + 1}/{epochs}], Train Loss: {avg_loss:.4f}, Train Acc: {train_acc:.2f}%, Val Loss: {avg_val_loss:.4f}, Val Acc: {val_acc:.2f}%"
    )

### Test on all data

In [None]:
# ================== Hyper-parameters ==================== #
learning_rate = 0.001
epochs = 8
batch_size = 24
# ======================= Training ======================= #
if torch.backends.mps.is_available():
    device = "mps"
    print("Running on MGPU")
elif torch.cuda.is_available():
    device = "cuda"
    print("Running on CUDA")
else:
    device = "cpu"
    print("Running on CPU")

dim, word_to_id, embeddings = load_glove_embeddings(glove_6b_50_path, tokenizer)

training_set = HistoricalTextDataset(
    model_train_data, model_train_labels, word_to_id, decade_to_label
)
validation_set = HistoricalTextDataset(
    model_valid_data, model_valid_labels, word_to_id, decade_to_label
)
test_set = HistoricalTextDataset(test_data, test_labels, word_to_id, decade_to_label)

training_loader = DataLoader(
    training_set, batch_size=batch_size, shuffle=True, collate_fn=pad_sequence_documents
)
validation_loader = DataLoader(
    validation_set,
    batch_size=batch_size,
    shuffle=False,
    collate_fn=pad_sequence_documents,
)
test_loader = DataLoader(
    test_set, batch_size=batch_size, collate_fn=pad_sequence_documents
)

print(f"Training on {len(training_set)} documents")
print(f"Validation on {len(validation_set)} documents")
print(f"Testing on {len(test_set)} documents")

lstm_classifier = DocumentClassifier(
    word_embeddings=embeddings,
    word_to_id=word_to_id,
    num_classes=len(decades),
    word_hidden_size=64,
    num_layers=1,
    device=device,
).to(device)

optimizer = optim.Adam(lstm_classifier.parameters(), lr=learning_rate)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=2, gamma=0.5)
criterion = nn.CrossEntropyLoss()

for epoch in range(epochs):
    lstm_classifier.train()
    epoch_loss = 0
    correct = 0
    total = 0

    for x, y in tqdm(training_loader, desc="Epoch {}".format(epoch + 1)):
        x = torch.tensor(x, dtype=torch.long).to(device)
        y = torch.tensor(y, dtype=torch.long).to(device)

        optimizer.zero_grad()
        logits = lstm_classifier(x)
        loss = criterion(logits, y)
        loss.backward()
        clip_grad_norm_(lstm_classifier.parameters(), 5)
        optimizer.step()

        epoch_loss += loss.item()
        _, predicted = torch.max(logits.data, 1)
        total += y.size(0)
        correct += (predicted == y).sum().item()

    lstm_classifier.eval()
    val_loss = 0
    val_correct = 0
    val_total = 0

    with torch.no_grad():
        for x, y in validation_loader:
            x = torch.tensor(x, dtype=torch.long).to(device)
            y = torch.tensor(y, dtype=torch.long).to(device)

            logits = lstm_classifier(x)
            loss = criterion(logits, y)

            val_loss += loss.item()
            _, predicted = torch.max(logits.data, 1)
            val_total += y.size(0)
            val_correct += (predicted == y).sum().item()

    # calculate metrics
    val_acc = 100 * val_correct / val_total
    avg_val_loss = val_loss / len(validation_loader)
    train_acc = 100 * correct / total
    avg_loss = epoch_loss / len(training_loader)

    scheduler.step()

    print(
        f"Epoch [{epoch + 1}/{epochs}], Train Loss: {avg_loss:.4f}, Train Acc: {train_acc:.2f}%, Val Loss: {avg_val_loss:.4f}, Val Acc: {val_acc:.2f}%"
    )