<a href="https://colab.research.google.com/github/vincentjordan27/Named-Entity-Recognition-BILSTM-CRF/blob/main/main.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import pandas as pd
import numpy as np
import sys
import csv
from sklearn.model_selection import train_test_split



In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
!pip install torchtext==0.6.0

import time
import torch
from torch import nn
from torch.optim import Adam
from torchtext.data import Field, BucketIterator
from torchtext.datasets import SequenceTaggingDataset
from spacy.lang.id import Indonesian



In [4]:
maxInt = sys.maxsize

while True:
   
    try:
        csv.field_size_limit(maxInt)
        break
    except OverflowError:
        maxInt = int(maxInt / 10)

dataset = '/content/drive/My Drive/Colab Notebooks/SINGGALANG.tsv'
data_df = pd.read_csv(dataset, sep='\t', error_bad_lines=False, header=None, engine="python", names=['word', 'tag'],
                      quoting=csv.QUOTE_NONE)

train_size = 0.6
validate_size = 0.2
train, validate, test = np.split(data_df.sample(frac=1), [int(train_size * len(data_df)), int((validate_size + train_size) * len(data_df))])
train.to_csv(F"/content/drive/My Drive/Colab Notebooks/train.csv", sep='\t', index = False)
validate.to_csv(F"/content/drive/My Drive/Colab Notebooks/validate.csv", sep='\t',  index = False)
test.to_csv(F"/content/drive/My Drive/Colab Notebooks/test.csv", sep='\t', index = False)

dataset_train = '/content/drive/My Drive/Colab Notebooks/train.csv'
data_df_train = pd.read_csv(dataset, error_bad_lines=False, header=None, engine="python", names=['word', 'tag'],
                      quoting=csv.QUOTE_NONE)
print(data_df_train.head())


Skipping line 235842: Expected 2 fields in line 235842, saw 3
Skipping line 554785: Expected 2 fields in line 554785, saw 3
Skipping line 597624: Expected 2 fields in line 597624, saw 3
Skipping line 612158: Expected 2 fields in line 612158, saw 6
Skipping line 653536: Expected 2 fields in line 653536, saw 3
Skipping line 750852: Expected 2 fields in line 750852, saw 3
Skipping line 750862: Expected 2 fields in line 750862, saw 3
Skipping line 750877: Expected 2 fields in line 750877, saw 3
Skipping line 779220: Expected 2 fields in line 779220, saw 3
Skipping line 866447: Expected 2 fields in line 866447, saw 3
Skipping line 1001113: Expected 2 fields in line 1001113, saw 3
Skipping line 1116605: Expected 2 fields in line 1116605, saw 3
Skipping line 1130262: Expected 2 fields in line 1130262, saw 3
Skipping line 1181274: Expected 2 fields in line 1181274, saw 3
Skipping line 1376400: Expected 2 fields in line 1376400, saw 3
Skipping line 1463189: Expected 2 fields in line 1463189, sa

          word   tag
0        Ia\tO  None
1  menjabat\tO  None
2   sebagai\tO  None
3  Presiden\tO  None
4    ketiga\tO  None


In [5]:
print(validate.head())


              word    tag
3259     Kecepatan      O
1333992      JKT48      O
293442       Utara  Place
973047           .      O
215836       tahun      O


In [6]:
class Corpus(object):

  def __init__(self, input_folder, min_word_freq, batch_size):
    # list all the fields
    self.word_field = Field(lower=True)
    self.tag_field = Field(unk_token=None)
    # create dataset using built-in parser from torchtext
    self.train_dataset, self.val_dataset, self.test_dataset = SequenceTaggingDataset.splits(
        path=input_folder,
        train="train.csv",
        validation="validate.csv",
        test="test.csv",
        fields=(("word", self.word_field), ("tag", self.tag_field))
    )

    # convert fields to vocabulary list
    self.word_field.build_vocab(self.train_dataset.word, min_freq=min_word_freq)
    self.tag_field.build_vocab(self.train_dataset.tag)
    # create iterator for batch input
    self.train_iter, self.val_iter, self.test_iter = BucketIterator.splits(
        datasets=(self.train_dataset, self.val_dataset, self.test_dataset),
        batch_size=batch_size
    )
    # prepare padding index to be ignored during model training/evaluation
    self.word_pad_idx = self.word_field.vocab.stoi[self.word_field.pad_token]
    self.tag_pad_idx = self.tag_field.vocab.stoi[self.tag_field.pad_token]
    

In [7]:

corpus = Corpus(
    input_folder="/content/drive/My Drive/Colab Notebooks/",
    min_word_freq=3, 
    batch_size=64,
)
print(f"Train set: {len(corpus.train_dataset)} sentences")
print(f"Val set: {len(corpus.val_dataset)} sentences")
print(f"Test set: {len(corpus.test_dataset)} sentences")

Train set: 1 sentences
Val set: 1 sentences
Test set: 1 sentences


In [8]:
class BiLSTM(nn.Module):

  def __init__(self, input_dim, embedding_dim, hidden_dim, output_dim, lstm_layers,
               emb_dropout, lstm_dropout, fc_dropout, word_pad_idx):
    super().__init__()
    self.embedding_dim = embedding_dim
    self.embedding = nn.Embedding(
        num_embeddings=input_dim, 
        embedding_dim=embedding_dim, 
        padding_idx=word_pad_idx
    )
    self.emb_dropout = nn.Dropout(emb_dropout)
    self.lstm = nn.LSTM(
        input_size=embedding_dim,
        hidden_size=hidden_dim,
        num_layers=lstm_layers,
        bidirectional=True,
        dropout=lstm_dropout if lstm_layers > 1 else 0
    )
    self.fc_dropout = nn.Dropout(fc_dropout)
    self.fc = nn.Linear(hidden_dim * 2, output_dim)  

  def forward(self, sentence):
    embedding_out = self.emb_dropout(self.embedding(sentence))
    lstm_out, _ = self.lstm(embedding_out)
    ner_out = self.fc(self.fc_dropout(lstm_out))
    return ner_out

  def init_weights(self):
    for name, param in self.named_parameters():
      nn.init.normal_(param.data, mean=0, std=0.1)

  def init_embeddings(self, word_pad_idx):
    self.embedding.weight.data[word_pad_idx] = torch.zeros(self.embedding_dim)

  def count_parameters(self):
    return sum(p.numel() for p in self.parameters() if p.requires_grad)

In [9]:
bilstm = BiLSTM(
    input_dim=len(corpus.word_field.vocab),
    embedding_dim=300,
    hidden_dim=64,
    output_dim=len(corpus.tag_field.vocab),
    lstm_layers=2,
    emb_dropout=0.5,
    lstm_dropout=0.1,
    fc_dropout=0.25,
    word_pad_idx=corpus.word_pad_idx
)
bilstm.init_weights()
bilstm.init_embeddings(word_pad_idx=corpus.word_pad_idx)
print(f"The model has {bilstm.count_parameters():,} trainable parameters.")
print(bilstm)

The model has 6,310,294 trainable parameters.
BiLSTM(
  (embedding): Embedding(20076, 300, padding_idx=1)
  (emb_dropout): Dropout(p=0.5, inplace=False)
  (lstm): LSTM(300, 64, num_layers=2, dropout=0.1, bidirectional=True)
  (fc_dropout): Dropout(p=0.25, inplace=False)
  (fc): Linear(in_features=128, out_features=6, bias=True)
)


In [10]:
class NER(object):

  def __init__(self, model, data, optimizer_cls, loss_fn_cls):
    self.model = model
    self.data = data
    self.optimizer = optimizer_cls(model.parameters())
    self.loss_fn = loss_fn_cls(ignore_index=self.data.tag_pad_idx)

  @staticmethod
  def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

  def accuracy(self, preds, y):
    max_preds = preds.argmax(dim=1, keepdim=True) 
    non_pad_elements = (y != self.data.tag_pad_idx).nonzero() 
    correct = max_preds[non_pad_elements].squeeze(1).eq(y[non_pad_elements])
    return correct.sum() / torch.FloatTensor([y[non_pad_elements].shape[0]])

  def epoch(self):
      epoch_loss = 0
      epoch_acc = 0
      self.model.train()
      for batch in self.data.train_iter:
        text = batch.word
        true_tags = batch.tag
        self.optimizer.zero_grad()
        pred_tags = self.model(text)
        pred_tags = pred_tags.view(-1, pred_tags.shape[-1])
        true_tags = true_tags.view(-1)
        batch_loss = self.loss_fn(pred_tags, true_tags)
        batch_acc = self.accuracy(pred_tags, true_tags)
        print
        batch_loss.backward()
        self.optimizer.step()
        epoch_loss += batch_loss.item()
        epoch_acc += batch_acc.item()
      return epoch_loss / len(self.data.train_iter), epoch_acc / len(self.data.train_iter)

  def evaluate(self, iterator):
      epoch_loss = 0
      epoch_acc = 0
      self.model.eval()
      with torch.no_grad():

          for batch in iterator:
              text = batch.word
              true_tags = batch.tag
              pred_tags = self.model(text)
              pred_tags = pred_tags.view(-1, pred_tags.shape[-1])
              true_tags = true_tags.view(-1)
              batch_loss = self.loss_fn(pred_tags, true_tags)
              batch_acc = self.accuracy(pred_tags, true_tags)
              epoch_loss += batch_loss.item()
              epoch_acc += batch_acc.item()
      return epoch_loss / len(iterator), epoch_acc / len(iterator)

  def train(self, n_epochs):
    for epoch in range(n_epochs):
        start_time = time.time()
        train_loss, train_acc = self.epoch()
        end_time = time.time()
        epoch_mins, epoch_secs = NER.epoch_time(start_time, end_time)
        print(f"Epoch: {epoch + 1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s")
        print(f"\tTrn Loss: {train_loss:.3f} | Trn Acc: {train_acc * 100:.2f}%")
        val_loss, val_acc = self.evaluate(self.data.val_iter)
        print(f"\tVal Loss: {val_loss:.3f} | Val Acc: {val_acc * 100:.2f}%")
    test_loss, test_acc = self.evaluate(self.data.test_iter)
    print(f"Test Loss: {test_loss:.3f} |  Test Acc: {test_acc * 100:.2f}%")

  def infer(self, sentence, true_tags=None):
    self.model.eval()

    nlp = Indonesian()
    tokens = [token.text.lower() for token in nlp(sentence)]

    numericalized_tokens = [self.data.word_field.vocab.stoi[t] for t in tokens]

    unk_idx = self.data.word_field.vocab.stoi[self.data.word_field.unk_token]
    unks = [t for t, n in zip(tokens, numericalized_tokens) if n == unk_idx]

    token_tensor = torch.LongTensor(numericalized_tokens)
    token_tensor = token_tensor.unsqueeze(-1)
    predictions = self.model(token_tensor)

    top_predictions = predictions.argmax(-1)
    predicted_tags = [self.data.tag_field.vocab.itos[t.item()] for t in top_predictions]

    max_len_token = max([len(token) for token in tokens] + [len("word")])
    max_len_tag = max([len(tag) for tag in predicted_tags] + [len("pred")])
    print(
        f"{'word'.ljust(max_len_token)}\t{'unk'.ljust(max_len_token)}\t{'pred tag'.ljust(max_len_tag)}" 
        + ("\ttrue tag" if true_tags else "")
        )
    for i, token in enumerate(tokens):
      is_unk = "✓" if token in unks else ""
      print(
          f"{token.ljust(max_len_token)}\t{is_unk.ljust(max_len_token)}\t{predicted_tags[i].ljust(max_len_tag)}" 
          + (f"\t{true_tags[i]}" if true_tags else "")
          )
    return tokens, predicted_tags, unks

In [None]:
ner = NER(
  model=bilstm,
  data=corpus,
  optimizer_cls=Adam,
  loss_fn_cls=nn.CrossEntropyLoss
)
ner.train(1)