In [93]:
import os
import sys
from pathlib import Path
from collections import Counter

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import train_test_split

from nltk.corpus import brown

import torch
import torch.nn as nn
import torch.nn.functional as F
from torchtext.vocab import vocab
from torchinfo import summary
from torch.utils.data import DataLoader
import torch.nn.utils.rnn as rnn_utils

import random

## Define Paths

In [22]:
base_path = Path("C:/Users/shaur/Desktop/Research/low_resource_training/pos_tagging")
artifacts_path = os.path.join(base_path, "artifacts")

### Download dataset

In [23]:
brown_corpus = brown.tagged_sents(tagset='universal')
tagged_sentences = brown_corpus

In [24]:
X = [] # store input sequence
Y = [] # store output sequence

for sentence in tagged_sentences:

    for entity in sentence:         
        X.append(entity[0].lower())  # entity[0] contains the word
        Y.append(entity[1])  # entity[1] contains corresponding tag

### Map targets to integers

In [25]:
class_names = ['CONJ', 'DET', 'PRT', 'VERB', 'ADV', 'X', 'ADJ', 'NUM', 'PRON', 'NOUN', '.', 'ADP']

id2label = {}
for id_, label_ in enumerate(class_names):
    id2label[str(id_)] = label_

label2id = {}
for id_, label_ in enumerate(class_names):
    label2id[label_] = id_

In [26]:
Y_id = []
for i in range(len(Y)):
    Y_id.append(label2id[Y[i]])


### Train Test Split

In [27]:
X_train, X_val, y_train, y_val = train_test_split(X, Y_id, test_size=0.2, random_state=0)

### Custom Dataset Class

In [28]:
class CustomDataset(torch.utils.data.Dataset):

    def __init__(self, X, y):
        self.X = X
        self.y = y

    def __len__(self):
        return len(self.X)
    
    def __getitem__(self, idx):
        texts = self.X[idx]
        labels = self.y[idx]
        sample = (labels, texts)
        return sample
    
train_dataset = CustomDataset(X_train, y_train)
val_dataset = CustomDataset(X_val, y_val)

### Develop Tokenizer

#### Create Vocab

In [29]:
def get_vocab(dataset, min_freq=1):

    counter_word = Counter()
    for word in dataset: 
        counter_word.update([word])
    my_vocab_word = vocab(counter_word, min_freq=min_freq)
    my_vocab_word.insert_token('<unk>', 0)
    my_vocab_word.set_default_index(0)

    counter_character = Counter()
    for word in dataset:
        for i in range(0, len(word)):
            counter_character.update([word[i]])
    my_vocab_character = vocab(counter_character, min_freq=min_freq)
    my_vocab_character.insert_token('<unk>', 0)
    my_vocab_character.set_default_index(0)

    counter_bigram = Counter()
    for word in dataset:
        for i in range(0, len(word)):
            if i<len(word)-1:
                counter_bigram.update([word[i:i+2]])
    my_vocab_bigram = vocab(counter_bigram, min_freq=min_freq)
    my_vocab_bigram.insert_token('<unk>', 0)
    my_vocab_bigram.set_default_index(0)

    counter_trigram = Counter()
    for word in dataset:
        for i in range(0, len(word)):
            if i<len(word)-2:
                counter_trigram.update([word[i:i+3]])
    my_vocab_trigram = vocab(counter_trigram, min_freq=min_freq)
    my_vocab_trigram.insert_token('<unk>', 0)
    my_vocab_trigram.set_default_index(0)

    return my_vocab_word, my_vocab_character, my_vocab_bigram, my_vocab_trigram

In [30]:
vocab_word, vocab_character, vocab_bigram, vocab_trigram = get_vocab(X, min_freq=2)

### Collate Function for Dataloaders

In [51]:
def tokenizer(x):
    """Converts text to a list of word, character, bigram, and trigram indices using vocabulary dictionaries."""
    
    word_indices_list, ch_indices_lists, bigram_indices_lists, trigram_indices_lists = 0, [], [], []

    for word in x:
        # Convert word to index using vocab_word
        word_indices_list= vocab_word([word])[0]  
        
        # ch_indices_list, bigram_indices_list, trigram_indices_list = [], [], []
        
        # Convert characters to indices
        for i in range(len(word)):
            ch_indices_lists.append(vocab_character([word[i]])[0])  # Using vocab_char for individual characters
            
            # Convert bigrams to indices if applicable
            if i < len(word) - 1:
                bigram = word[i:i+2]
                bigram_indices_lists.append(vocab_bigram([bigram])[0])  # Using vocab_bigram
            
            # Convert trigrams to indices if applicable
            if i < len(word) - 2:
                trigram = word[i:i+3]
                trigram_indices_lists.append(vocab_trigram([trigram])[0])  # Using vocab_trigram
        
        # ch_indices_lists.append(ch_indices_list)
        # bigram_indices_lists.append(bigram_indices_list)
        # trigram_indices_lists.append(trigram_indices_list)
    
    return word_indices_list, ch_indices_lists, bigram_indices_lists, trigram_indices_lists


In [66]:
class TokenizerDataLoader:
    def __init__(self, tokenizer, batch_size=32):
        self.tokenizer = tokenizer
        self.batch_size = batch_size
        self.data = []

    def add_data(self, texts):
        for text in texts:
            self.data.append(self.tokenizer([text]))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

    def get_batches(self):
        for i in range(0, len(self.data), self.batch_size):
            yield self.data[i:i + self.batch_size]

    def collate_fn(self, batch):
        merged_embeddings = {
            'word': [],
            'character': [],
            'bigram': [],
            'trigram': []
        }
        for item in batch:
            word_indices, ch_indices, bigram_indices, trigram_indices = item
            merged_embeddings['word'].append(word_indices)
            merged_embeddings['character'].append(ch_indices)
            merged_embeddings['bigram'].append(bigram_indices)
            merged_embeddings['trigram'].append(trigram_indices)
        return merged_embeddings

In [69]:
# Create an instance of TokenizerDataLoader
dataloader = TokenizerDataLoader(tokenizer, batch_size=32)

# Add data
texts = ["text", "example"]
dataloader.add_data(texts)

# Get batches
for batch in dataloader.get_batches():
    # Process the batch
    processed_batch = dataloader.collate_fn(batch)
    # Use the processed batch for training or further processing

In [70]:
processed_batch

{'word': [1420, 3037],
 'character': [[1, 3, 26, 1], [3, 26, 13, 21, 20, 6, 3]],
 'bigram': [[80, 89, 235], [89, 334, 186, 196, 76, 52]],
 'trigram': [[968, 473], [969, 2449, 1062, 452, 453]]}

### Define Custom Model

In [91]:
class SimpleMLP(nn.Module):
    def __init__(self, 
                 word_vocab_size, 
                 ch_vocab_size, 
                 bigram_vocab_size, 
                 trigram_vocab_size, 
                 embedding_dim, 
                 hidden_dim1, 
                 hidden_dim2, 
                 drop_prob1, 
                 drop_prob2, 
                 num_outputs):
        
        super().__init__()

        self.embedding_word = nn.Embedding(word_vocab_size, embedding_dim)
        self.embedding_ch = nn.Embedding(ch_vocab_size, embedding_dim)
        self.embedding_bigram = nn.Embedding(bigram_vocab_size, embedding_dim)
        self.embedding_trigram = nn.Embedding(trigram_vocab_size, embedding_dim)

        self.linear1 = nn.Linear(embedding_dim*4, hidden_dim1)
        # Batch normalization for first linear layer
        self.batchnorm1 = nn.BatchNorm1d(num_features=hidden_dim1)
        # Dropout for first linear layer
        self.dropout1 = nn.Dropout(p=drop_prob1)

        # Second Linear layer
        self.linear2 = nn.Linear(hidden_dim1, hidden_dim2)
        # Batch normalization for second linear layer
        self.batchnorm2 = nn.BatchNorm1d(num_features=hidden_dim2)
        # Dropout for second linear layer
        self.dropout2 = nn.Dropout(p=drop_prob2)

        # Final Linear layer
        self.linear3 = nn.Linear(hidden_dim2, num_outputs)

    def forward(self, input_tuple):

        print(input_tuple)

        indices_word, indices_ch, indices_bigram, indices_trigram = input_tuple['word'], input_tuple['character'], input_tuple['bigram'], input_tuple['trigram']

        emb_word, emb_ch, emb_bigram, emb_trigram = self.embedding_word(indices_word), self.embedding_ch(indices_ch), self.embedding_bigram(indices_bigram), self.embedding_trigram(indices_trigram)
        x = torch.cat((emb_word, emb_ch, emb_bigram, emb_trigram), dim=-1)
        
        x = self.linear1(x)
        x = nn.ReLU()(x)
        x = self.batchnorm1(x)
        x = self.dropout1(x)

        x = self.linear2(x)
        x = nn.ReLU()(x)
        x = self.batchnorm2(x)
        x = self.dropout2(x)

        x = self.linear3(x)

        return x



In [94]:
def pad_nested_list(list):
    list = [torch.tensor(sublist) for sublist in list]
    return rnn_utils.pad_sequence(list, batch_first=True)

In [96]:
# Define the device
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

# Define the sequential model
# this will invoke the __init__() function of the model
model = SimpleMLP(word_vocab_size = 7, 
                 ch_vocab_size = 7, 
                 bigram_vocab_size = 7, 
                 trigram_vocab_size = 7, 
                 embedding_dim = 20, 
                 hidden_dim1 = 10, 
                 hidden_dim2 = 5, 
                 drop_prob1 = 0.5, 
                 drop_prob2 = 0.5, 
                 num_outputs = 2)

# Move the model to the device
model = model.to(device)

# Generate some dummy input data and offsets, and move them to the device
indices_word, indices_ch, indices_bigram, indices_trigram = processed_batch['word'], processed_batch['character'], processed_batch['bigram'], processed_batch['trigram']
indices_word = torch.tensor(indices_word, dtype=torch.int64)
indices_ch = pad_nested_list(indices_ch)
indices_bigram = pad_nested_list(indices_bigram)
indices_trigram = pad_nested_list(indices_trigram)

# Generate summary
summary(model, input_data=[(indices_word, indices_ch, indices_bigram, indices_trigram)], device=device, depth =10, verbose = False)


(tensor([1420, 3037], device='cuda:0'), tensor([[ 1,  3, 26,  1,  0,  0,  0],
        [ 3, 26, 13, 21, 20,  6,  3]], device='cuda:0'), tensor([[ 80,  89, 235,   0,   0,   0],
        [ 89, 334, 186, 196,  76,  52]], device='cuda:0'), tensor([[ 968,  473,    0,    0,    0],
        [ 969, 2449, 1062,  452,  453]], device='cuda:0'))


RuntimeError: Failed to run torchinfo. See above stack traces for more details. Executed layers up to: []

In [89]:
import torch

nested_list = [[1, 3, 26, 1], [3, 26, 13, 21, 20, 6, 3]]
tensor_list = [torch.tensor(sublist) for sublist in nested_list]

print(tensor_list)

[tensor([ 1,  3, 26,  1]), tensor([ 3, 26, 13, 21, 20,  6,  3])]


In [73]:
output = model((word_ind, ch_ind, bigram_ind, trigram_ind))

print(output)

tensor([[-0.4889,  1.3869],
        [-1.6105,  0.5258],
        [ 1.0008, -1.2242],
        [-0.2340,  0.5366]], device='cuda:0', grad_fn=<AddmmBackward0>)
