In [2]:
pip install torchtext==0.16.0

Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install portalocker==2.8.2

Note: you may need to restart the kernel to use updated packages.


In [4]:
import numpy as np

import torch
import torchtext

import spacy
nlp = spacy.load('en_core_web_sm')

from torchtext.vocab import build_vocab_from_iterator
import torchtext.transforms as T

from torch.utils.data import DataLoader

In [5]:
train_dataset = torchtext.datasets.UDPOS(split = 'train')
test_dataset = torchtext.datasets.UDPOS(split = 'test')

In [6]:
train_text = []
train_tags = []

for data_idx,data in enumerate(train_dataset):
  train_text.append(data[0])
  train_tags.append(data[1])

In [7]:
test_text = []
test_tags = []

for data_idx,data in enumerate(test_dataset):
  test_text.append(data[0])
  test_tags.append(data[1])

In [8]:
class Text_Tokenizer:
  def __init__(self,nlp):
    self.nlp = nlp

  def _spacy_tokenizer(self,doc):
    doc = ' '.join(doc)
    return [token.lemma_ for token in self.nlp(doc)]

  def _yield_tokens(self,doc):
    for text in doc:
      text = self._spacy_tokenizer(text)
      yield text

  def _vocab(self,doc):
    vocab = build_vocab_from_iterator(
        self._yield_tokens(doc),
        specials = ['<pad>','<unk>']
    )
    vocab.set_default_index(vocab['<unk>'])
    self.vocab = vocab

    return vocab

  def tokenize(self,doc,maxlen,vocab = None):
    if vocab == None:
      self._vocab(doc)

    transforms = T.Sequential(
        T.VocabTransform(self.vocab),
        T.Truncate(max_seq_len = maxlen),
        T.ToTensor(padding_value = 0),
        T.PadTransform(max_length = maxlen,pad_value = 0)
    )

    output = np.array([transforms(text) for text in doc])
    return output

In [9]:
class Tagging_rnn(torch.nn.Module):
  def __init__(self,sequence_length,text_vocab_size,embedding_size,hidden_size,num_layers,tags_vocab_size):
    super().__init__()
    self.sequence_length = sequence_length
    self.text_vocab_size = text_vocab_size
    self.embedding_size = embedding_size
    self.hidden_size = hidden_size
    self.num_layers = num_layers
    self.tags_vocab_size = tags_vocab_size

    self.embedding = torch.nn.Embedding(text_vocab_size,embedding_size,padding_idx = 0)
    self.rnn = torch.nn.RNN(embedding_size,hidden_size,num_layers,batch_first = True,bidirectional = True)


    self.linear_1 = torch.nn.Linear(in_features = 2 * hidden_size , out_features = 128)
    self.linear_2 = torch.nn.Linear(in_features = 128 , out_features = tags_vocab_size)
    self.sigmoid = torch.nn.Sigmoid()

  def forward(self,X):
    X = self.embedding(X)

    output,_ = self.rnn(X)


    output = self.linear_1(output)
    output = self.sigmoid(output)

    output = self.linear_2(output)

    return output

In [10]:
class Tagging_lstm(torch.nn.Module):
  def __init__(self,sequence_length,text_vocab_size,embedding_size,hidden_size,num_layers,tags_vocab_size):
    super().__init__()
    self.sequence_length = sequence_length
    self.text_vocab_size = text_vocab_size
    self.embedding_size = embedding_size
    self.hidden_size = hidden_size
    self.num_layers = num_layers
    self.tags_vocab_size = tags_vocab_size

    self.embedding = torch.nn.Embedding(text_vocab_size,embedding_size,padding_idx = 0)
    self.lstm = torch.nn.LSTM(embedding_size,hidden_size,num_layers,batch_first = True,bidirectional = True)


    self.linear_1 = torch.nn.Linear(in_features = 2 * hidden_size , out_features = 128)
    self.linear_2 = torch.nn.Linear(in_features = 128 , out_features = tags_vocab_size)
    self.sigmoid = torch.nn.Sigmoid()

  def forward(self,X):

    X = self.embedding(X)

    output,_ = self.lstm(X)

    output = self.linear_1(output)
    output = self.sigmoid(output)

    output = self.linear_2(output)

    return output

In [11]:
class Train_Model:
  def __init__(self,model,loss_function,optimizer,epochs):
    self.model = model
    self.loss_function = loss_function
    self.optimizer = optimizer
    self.epochs = epochs

  def fit(self,train_dataset_batched):
    from sklearn.metrics import accuracy_score as accuracy
    from tqdm.auto import tqdm

    model = self.model

    for epoch in tqdm(range(self.epochs)):
      print(f'\n')
      model.train()

      train_batch_loss = 0
      train_batch_acc = 0

      for batch,(text,tag) in tqdm(enumerate(train_dataset_batched)):
        train_prediction = model(text).permute(0,2,1)
        train_labels = train_prediction.argmax(1)

        train_loss = self.loss_function(train_prediction,tag)
        train_acc = accuracy(tag.flatten(),train_labels.flatten())

        train_batch_loss += train_loss
        train_batch_acc += train_acc

        self.optimizer.zero_grad()
        train_loss.backward()
        self.optimizer.step()


      train_batch_loss /= len(train_dataset_batched)
      train_batch_acc /= len(train_dataset_batched)

      print(f'Epoch: {epoch} | Train Loss: {train_batch_loss} | Train Accuracy: {train_batch_acc}')

  def eval(self,test_dataset_batched):
    from sklearn.metrics import accuracy_score as accuracy
    from tqdm.auto import tqdm
    model = self.model

    model.eval()

    test_batch_loss = 0
    test_batch_acc = 0

    for batch,(text,tag) in enumerate(test_dataset_batched):
      test_prediction = model(text).permute(0,2,1)
      test_labels = test_prediction.argmax(1)

      test_loss = self.loss_function(test_prediction,tag)
      test_acc = accuracy(tag.flatten(),test_labels.flatten())

      test_batch_loss += test_loss
      test_batch_acc += test_acc

    test_batch_loss /= len(test_dataset_batched)
    test_batch_acc /= len(test_dataset_batched)

    print(f'Test Loss: {test_batch_loss} | Test Accuracy: {test_batch_acc}')

In [12]:
def Tag_Sentence(sentence,model):
  sentence = text_tokenizer.tokenize([sentence],80,text_vocab)
  sentence = torch.tensor(sentence)

  prediction = model(sentence)
  tags = prediction.argmax(-1)
  tags = np.array(tags)

  words = [inverse_text_vocab[token] for token in np.array(sentence[0]) if  token != 0]
  tags = [inverse_tag_vocab[tags[0][token_idx]] for token_idx in range(len(words))]
  
  taged_sentence = np.array(list(zip(words,tags)))
  
  return taged_sentence

In [13]:
text_tokenizer = Text_Tokenizer(nlp)
tags_tokenizer = Text_Tokenizer(nlp)

tokenized_train_text = text_tokenizer.tokenize(train_text,80)
tokenized_train_tags = tags_tokenizer.tokenize(train_tags,80)

text_vocab = text_tokenizer.vocab
inverse_text_vocab = {value:key for key,value in text_vocab.get_stoi().items()}

tags_vocab = tags_tokenizer.vocab
inverse_tag_vocab = {value:key for key,value in tags_vocab.get_stoi().items()}

tokenized_test_text = text_tokenizer.tokenize(test_text,80,text_vocab)
tokenized_test_tags = tags_tokenizer.tokenize(test_tags,80,tags_vocab)

In [14]:
batch_size = 32
train_dataset_batched = DataLoader(list(zip(tokenized_train_text,tokenized_train_tags)),batch_size = batch_size,shuffle = True)
test_dataset_batched = DataLoader(list(zip(tokenized_test_text,tokenized_test_tags)),batch_size = batch_size)

In [15]:
Test_text,Test_tags = next(iter(train_dataset_batched))

In [16]:
sequence_length = 80
text_vocab_size = len(text_vocab.get_stoi())
embedding_size = 164
hidden_size = 164
num_layers = 1
tags_vocab_size = len(tags_vocab.get_stoi())

In [17]:
model_rnn = Tagging_rnn(sequence_length,text_vocab_size,embedding_size,hidden_size,num_layers,tags_vocab_size)
model_lstm = Tagging_lstm(sequence_length,text_vocab_size,embedding_size,hidden_size,num_layers,tags_vocab_size)

In [18]:
lr = 0.0001

loss_function = torch.nn.CrossEntropyLoss()
optimizer_lstm = torch.optim.Adam(model_lstm.parameters(),lr = lr)
optimizer_rnn = torch.optim.Adam(model_rnn.parameters(),lr = lr)

epochs = 5

Trainer_rnn = Train_Model(model_rnn,loss_function,optimizer_rnn,epochs)
Trainer_lstm = Train_Model(model_lstm,loss_function,optimizer_lstm,epochs)

print(f'RNN model: ')
Trainer_rnn.fit(train_dataset_batched)
Trainer_rnn.eval(test_dataset_batched)


print(f'LSTM model: ')
Trainer_lstm.fit(train_dataset_batched)
Trainer_lstm.eval(test_dataset_batched)

RNN model: 


  0%|          | 0/5 [00:00<?, ?it/s]





0it [00:00, ?it/s]

Epoch: 0 | Train Loss: 1.1571853160858154 | Train Accuracy: 0.7734371464059415




0it [00:00, ?it/s]

Epoch: 1 | Train Loss: 0.530951738357544 | Train Accuracy: 0.8597926267281105




0it [00:00, ?it/s]

Epoch: 2 | Train Loss: 0.4284038543701172 | Train Accuracy: 0.8882275037030944




0it [00:00, ?it/s]

Epoch: 3 | Train Loss: 0.35990628600120544 | Train Accuracy: 0.9002603738067816




0it [00:00, ?it/s]

Epoch: 4 | Train Loss: 0.30723732709884644 | Train Accuracy: 0.9120598075419679
Test Loss: 0.2282353639602661 | Test Accuracy: 0.9361283570954905
LSTM model: 


  0%|          | 0/5 [00:00<?, ?it/s]





0it [00:00, ?it/s]

Epoch: 0 | Train Loss: 1.352070689201355 | Train Accuracy: 0.6839395649893024




0it [00:00, ?it/s]

Epoch: 1 | Train Loss: 0.5569143295288086 | Train Accuracy: 0.8494644978707203




0it [00:00, ?it/s]

Epoch: 2 | Train Loss: 0.4502781331539154 | Train Accuracy: 0.8757764925526667




0it [00:00, ?it/s]

Epoch: 3 | Train Loss: 0.37023913860321045 | Train Accuracy: 0.9026095884422314




0it [00:00, ?it/s]

Epoch: 4 | Train Loss: 0.30035918951034546 | Train Accuracy: 0.9183307660261684
Test Loss: 0.21578410267829895 | Test Accuracy: 0.9419512599469493


In [22]:
sentence = test_text[34]

Tag_Sentence(sentence,model_lstm)

array([['<unk>', 'VERB'],
       ['can', 'AUX'],
       ['anyone', 'VERB'],
       ['use', 'VERB'],
       ['military', 'NOUN'],
       ['pressure', 'NOUN'],
       ['without', 'NOUN'],
       ['proof', 'NOUN'],
       ['?', 'PUNCT']], dtype='<U8')