In [25]:
import numpy as np
import pandas as pd

import torch
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.optim as optim
from TorchCRF import CRF
from torch.nn.utils.rnn import pad_sequence

from sklearn.model_selection import train_test_split

In [26]:
torch.__version__

'2.2.2+cu118'

### **1. Data Preparation**

In [4]:
def read_dataset():
    data = pd.read_csv('./ner_dataset/ner_dataset.csv', encoding='latin1')

    # remove white spaces from column names
    data.columns = data.columns.str.strip()

    print(data.columns)
    # print(data.columns)
    # Group by 'Sentence #' and aggregate
    grouped_data = data.groupby('Sentence #').agg({
        'Word': lambda x: ' '.join(x),  # Join words into a single sentence
        'Tag': lambda x: list(x),       # Collect tags into a list
        'Intent': lambda x: x     # Collect intents into a list
    }).reset_index()  # Reset index to make 'Sentence #' a regular column

    return data, grouped_data


def prepare_data(dataframe):
    dataset = []
    for _, row in dataframe.iterrows():
        sentence = row['Word']
        tags = row['Tag']
        intents = row['Intent'][0]
        dataset.append((sentence, tags, intents))

    return dataset

data, grouped_data = read_dataset()

prepared_dataset = prepare_data(grouped_data)

Index(['Sentence #', 'Word', 'Tag', 'Intent'], dtype='object')


In [5]:
print("Number of unique words in the dataset:", len(data['Word'].unique()) )# number of unique words in the dataset
print("Number of unique tags in the dataset:", len(data['Tag'].unique())) # number of unique tags in the dataset
print("Number of unique intents in the dataset:", len(data['Intent'].unique())) #number of unique intents in the dataset
print("Unique tags in the dataset:", data['Tag'].unique())

Number of unique words in the dataset: 304
Number of unique tags in the dataset: 6
Number of unique intents in the dataset: 1
Unique tags in the dataset: [' B-VAR' ' I-VAR' ' O' ' B-VAL' ' I-VAL' ' B-TYPE']


In [6]:
# getting unique words and labels from data
words = list(data['Word'].unique())
tags = list(data['Tag'].unique())

# word is key and its value is corresponding index
word_to_index = {word.strip() : i + 2 for i, word in enumerate(words)}
word_to_index["UNK"] = 1
word_to_index["PAD"] = 0

# label is key and value is index.
tag_to_index = {tag.strip() : i + 1 for i, tag in enumerate(tags)}
tag_to_index["PAD"] = 0

index_to_word = {i: word for word, i in word_to_index.items()}
index_to_tag = {i: tag for tag, i in tag_to_index.items()}

In [28]:
# save the mappings to file
import json
with open('word_to_index.json', 'w') as f:
    json.dump(word_to_index, f, indent=4)

with open('tag_to_index.json', 'w') as f:
    json.dump(tag_to_index, f)

In [29]:
with open('word_to_index.json', 'r') as f:
    data = json.load(f)
    _word2idx = data["word2idx"]
    words_list = [word for word in _word2idx.keys()]
    data["idx2word"] = words_list

with open('word_to_index.json', 'w') as f:
    json.dump(data, f, indent=4)

In [31]:
with open('tag_to_index.json', 'r') as f:
    data = json.load(f)
    _tag2idx = data["word2idx"]
    tag_list = [word for word in _tag2idx.keys()]
    data["idx2word"] = tag_list

with open('tag_to_index.json', 'w') as f:
    json.dump(data, f, indent=4)

In [22]:
# This is a class te get sentence. The each sentence will be list of tuples with its tag and pos.
class Sentence(object):
    def __init__(self, df):
        self.n_sent = 0
        self.df = df
        self.empty = False
        agg = lambda s : [(word.strip(), tag.strip(), intent.strip()) for word, tag, intent in zip(s['Word'].values.tolist(),
                                                       s['Tag'].values.tolist(),
                                                       s['Intent'].values.tolist())]
        self.grouped = self.df.groupby("Sentence #").apply(agg)
        self.sentences = [s for s in self.grouped]
        
    def get_text(self):
        try:
            s = self.grouped[self.n_sent]
            self.n_sent +=1
            return s
        except:
            return None
    
    def records_to_tuples(self):
        dataset = []

        grouped_data = data.groupby('Sentence #').agg({
        'Word': lambda x: ''.join(x),  # Join words into a single sentence
        'Tag': lambda x: list(x.str.strip()),       # Collect tags into a list
        'Intent': lambda x: x.str.strip()     # Collect intents into a list
        }).reset_index()

        for _, row in grouped_data.iterrows():
            sentence = row['Word']
            tags = row['Tag']
            intents = row['Intent'][0]
            dataset.append((sentence, tags, intents))

        return dataset
        
#Displaying one full sentence
getter = Sentence(data)

examples = getter.records_to_tuples()

print("Examples sample:", examples[0])

Examples sample: (' is approved equals clustering algorithms', ['B-VAR', 'I-VAR', 'O', 'B-VAL', 'I-VAL'], 'variable_declaration')


In [33]:
with open('dataset.txt', 'w') as f:
    for example in examples:
        f.write(str(example[0].split()) + '\t' + str(example[1]) + '\n')

In [23]:
# We should convert each sentence to integers
#Getting unique words and labels from data -> our vocab
words = list(data['Word'].unique())
tags = list(data['Tag'].unique())

# 1. Each word to integer
# word is key and its value is corresponding index
word_to_index = {word.strip() : i + 2 for i, word in enumerate(words)}
word_to_index['UNK'] = 1
word_to_index['PAD'] = 0

vocab_size = len(word_to_index)
print("Vocab size:", vocab_size)

# 2. Each label to integer
# label is key and value is index.
tag_to_index = {tag.strip() : i + 1 for i, tag in enumerate(tags)}
tag_to_index['PAD'] = 0

print("Tag to index:", tag_to_index)

# conver _ to index to index_ to word
index_to_word = dict(sorted({i: word for word, i in word_to_index.items()}.items(), key=lambda item: item[0]))
index_to_tag = dict(sorted({i: tag for tag, i in tag_to_index.items()}.items(), key=lambda item: item[0]))

print("Index to tag:", index_to_tag)

sentences = [example[0][1:] for example in examples]

max_len = max(len(s.split()) for s in sentences)

print("Max length of sentence:", max_len)

tags = [example[1] for example in examples]

print("Sample sentence:", sentences[0])

print("Sample tags:", tags[0])

Vocab size: 306
Tag to index: {'B-VAR': 1, 'I-VAR': 2, 'O': 3, 'B-VAL': 4, 'I-VAL': 5, 'B-TYPE': 6, 'PAD': 0}
Index to tag: {0: 'PAD', 1: 'B-VAR', 2: 'I-VAR', 3: 'O', 4: 'B-VAL', 5: 'I-VAL', 6: 'B-TYPE'}
Max length of sentence: 20
Sample sentence: is approved equals clustering algorithms
Sample tags: ['B-VAR', 'I-VAR', 'O', 'B-VAL', 'I-VAL']


In [42]:
# Padding function
# def pad_sequences(sequences, max_len, pad_value=0):
#     padded_sequences = []
#     for seq in sequences:
#         if len(seq) < max_len:
#             padded_seq = seq + [pad_value] * (max_len - len(seq))
#         else:
#             padded_seq = seq[:max_len]
#         padded_sequences.append(padded_seq)
#     return padded_sequences
test = "['is', '2', 'a', 'variable']"
test_list = list(map(str, test[1:-1].split(',')))

test_list = [word.strip().replace('\'', '') for word in test_list]
test_list
sentence = ["<START>"]
tago = ["<START>"]
line = "['is', 'approved', 'equals', 'clustering', 'algorithms']  ['B-VAR', 'I-VAR', 'O', 'B-VAL', 'I-VAL']"

line = line.split('\t')
line[0] = list(map(str, line[0][1:-1].split(',')))
line[0] = [word.strip().replace('\'', '') for word in line[0]]
line[0].append('<END>')

line[1] = list(map(str, line[1][1:-1].split(',')))
print(line[1])
line[1] = [tag.strip().replace('\'', '') for tag in line[1]]
line[1].append('<END>')

print(line[0])
print(line[1])


IndexError: list index out of range

#### The Dataset Class

In [24]:
# Dataset class
# class NERDataset(Dataset):
#     def __init__(self, sentences, tags, word_to_index, tag_to_index):
#         self.sentences = [[word_to_index[word] for word in sentence] for sentence in sentences]
#         self.tags = [[tag_to_index[tag] for tag in tag_seq] for tag_seq in tags]

#     def __len__(self):
#         return len(self.sentences)

#     def __getitem__(self, idx):
#         return torch.tensor(self.sentences[idx], dtype=torch.long), torch.tensor(self.tags[idx], dtype=torch.long)

# Dataset class
class NERDataset(Dataset):
    def __init__(self, sentences, tags):
        self.sentences = [torch.tensor(sentence, dtype=torch.long) for sentence in sentences]
        self.tags = [torch.tensor(tag, dtype=torch.long) for tag in tags]

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        return self.sentences[idx], self.tags[idx]


# Split the data
train_sentences, val_sentences, train_tags, val_tags = train_test_split(sentences, tags, test_size=0.1, random_state=42)

# # Create Dataset and DataLoader for training and validation
# train_dataset = NERDataset(train_sentences, train_tags, word_to_index, tag_to_index)
# val_dataset = NERDataset(val_sentences, val_tags, word_to_index, tag_to_index)

# train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True)
# val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False)

train_dataset = NERDataset(train_sentences, train_tags)
val_dataset = NERDataset(val_sentences, val_tags)

def collate_fn(batch):
    sentences, tags = zip(*batch)
    sentences_padded = pad_sequence(sentences, batch_first=True, padding_value=word_to_index['PAD'])
    tags_padded = pad_sequence(tags, batch_first=True, padding_value=tag_to_index['PAD'])
    return sentences_padded, tags_padded

train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_size=1, shuffle=False, collate_fn=collate_fn)


TypeError: new(): invalid data type 'str'

### **2. Model Definition**

In [10]:
class BiLSTMCRF(nn.Module):
    def __init__(self, vocab_size, number_of_tags, embedding_dim=128, hidden_dim=128):
        super(BiLSTMCRF, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim // 2, num_layers=1, bidirectional=True)
        self.hidden_to_tag = nn.Linear(hidden_dim, number_of_tags)
        self.crf = CRF(number_of_tags, batch_first=True)

    def forward(self, sentences):
        embeddings = self.embedding(sentences)
        lstm_out, _ = self.lstm(embeddings)
        emissions = self.hidden_to_tag(lstm_out)
        return emissions

    def loss(self, emissions, tags, mask):
        return -self.crf(emissions, tags, mask=mask, reduction='mean')

    def predict(self, sentences, mask):
        emissions = self.forward(sentences)
        return self.crf.decode(emissions, mask=mask)

In [11]:
# define model parameters
vocab_size = len(word_to_index)
number_of_tags = len(tag_to_index)
embedding_dimension = 128

### **3. Model Training**

In [12]:
model = BiLSTMCRF(vocab_size=vocab_size, number_of_tags=number_of_tags)
optimizer = optim.Adam(model.parameters(), lr=0.01)

  from .autonotebook import tqdm as notebook_tqdm
  _torch_pytree._register_pytree_node(


In [13]:
num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    total_train_loss = 0
    for sentences, tags in train_loader:
        mask = sentences != 0  # Mask for padding
        optimizer.zero_grad()
        emissions = model(sentences)
        loss = model.loss(emissions, tags, mask)
        loss.backward()
        optimizer.step()
        total_train_loss += loss.item()
    
    # Validation
    model.eval()
    total_val_loss = 0
    with torch.no_grad():
        for sentences, tags in val_loader:
            mask = sentences != 0
            emissions = model(sentences)
            loss = model.loss(emissions, tags, mask)
            total_val_loss += loss.item()

    print(f"Epoch {epoch + 1}, Training Loss: {total_train_loss / len(train_loader)}, Validation Loss: {total_val_loss / len(val_loader)}")

ValueError: the first two dimensions of emissions and tags must match, got (1, 11) and (1, 12)

### **4. Model Evaluation**

In [None]:
model.eval()
with torch.no_grad():
    for sentences, tags in val_loader:
        mask = sentences != 0  # Mask for padding
        predictions = model.predict(sentences, mask)
        for sentence, prediction, tag in zip(sentences, predictions, tags):
            sentence = sentence.tolist()
            prediction = [index_to_tag[p] for p in prediction]
            tag = [index_to_tag[t] for t in tag.tolist()]
            print(f"Sentence: {sentence}")
            print(f"Prediction: {prediction}")
            print(f"Ground Truth: {tag}")