# Import Required Libraries

In [9]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from datasets import load_dataset
from sklearn.metrics import classification_report


# Load and Preprocess Dataset

In [12]:
dataset = load_dataset("eriktks/conll2003", trust_remote_code=True)

tags = dataset['train'].features['ner_tags'].feature
tag2idx = tags.str2int
idx2tag = tags.int2str

word2idx = {}
for split in ["train", "validation", "test"]:
    for sentence in dataset[split]["tokens"]:
        for word in sentence:
            if word not in word2idx:
                word2idx[word] = len(word2idx) + 1
word2idx["PAD"] = 0

MAX_LEN = 50

def encode_data(dataset_split, max_len, word2idx, tag2idx):
    sentences = []
    labels = []
    for tokens, ner_tags in zip(dataset_split["tokens"], dataset_split["ner_tags"]):
        encoded_sentence = [word2idx.get(word, 0) for word in tokens]
        encoded_tags = ner_tags

        encoded_sentence = encoded_sentence[:max_len] + [0] * (max_len - len(encoded_sentence))
        encoded_tags = encoded_tags[:max_len] + [tag2idx("O")] * (max_len - len(encoded_tags))

        sentences.append(encoded_sentence)
        labels.append(encoded_tags)
    return torch.tensor(sentences), torch.tensor(labels)

train_sentences, train_tags = encode_data(dataset["train"], MAX_LEN, word2idx, tag2idx)
val_sentences, val_tags = encode_data(dataset["validation"], MAX_LEN, word2idx, tag2idx)


# Define Dataset and DataLoader

In [13]:
class NERDataset(Dataset):
    def __init__(self, sentences, tags):
        self.sentences = sentences
        self.tags = tags

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        return self.sentences[idx], self.tags[idx]

train_dataset = NERDataset(train_sentences, train_tags)
val_dataset = NERDataset(val_sentences, val_tags)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)


# Define LSTM Model

In [None]:
class NERLSTM(nn.Module):
    def __init__(self, vocab_size, tagset_size, embedding_dim, hidden_dim):
        super(NERLSTM, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_dim * 2, tagset_size)

    def forward(self, x):
        x = self.embedding(x)
        lstm_out, _ = self.lstm(x)
        logits = self.fc(lstm_out)
        return logits

VOCAB_SIZE = len(word2idx)
TAGSET_SIZE = len(tag2idx)
EMBEDDING_DIM = 100
HIDDEN_DIM = 128

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = NERLSTM(VOCAB_SIZE, TAGSET_SIZE, EMBEDDING_DIM, HIDDEN_DIM).to(device)


# Define Loss and Optimizer

In [None]:
criterion = nn.CrossEntropyLoss(ignore_index=0) 
optimizer = optim.Adam(model.parameters(), lr=0.001)


# Train the Model

In [None]:
def train_model(model, train_loader, optimizer, criterion, epochs):
    model.train()

    for epoch in range(epochs):
        total_loss = 0
        
        for sentences, tags in train_loader:
            sentences, tags = sentences.to("cpu"), tags.to("cpu")

            optimizer.zero_grad()
            outputs = model(sentences)

            outputs = outputs.view(-1, outputs.shape[-1])
            tags = tags.view(-1)

            loss = criterion(outputs, tags)
            loss.backward()  
            optimizer.step()  

            total_loss += loss.item()  

        print(f"Epoch {epoch + 1}/{epochs}, Loss: {total_loss:.4f}")

train_model(model, train_loader, optimizer, criterion, epochs=5)


# Evaluate the Model

In [None]:
from sklearn.metrics import classification_report

def evaluate_model(model, val_loader, idx2tag):
    model.eval()
    all_preds = []
    all_tags = []
    with torch.no_grad():
        for sentences, tags in val_loader:
            sentences, tags = sentences.to("cpu"), tags.to("cpu")
            outputs = model(sentences)

            predictions = torch.argmax(outputs, dim=2)

            all_preds.extend(predictions.cpu().numpy().flatten())
            all_tags.extend(tags.cpu().numpy().flatten())

    valid_preds = [p for p, t in zip(all_preds, all_tags) if t != 0]
    valid_tags = [t for t in all_tags if t != 0]

    valid_preds = [idx2tag[p] for p in valid_preds]
    valid_tags = [idx2tag[t] for t in valid_tags]

    target_names = [tag for idx, tag in idx2tag.items() if idx != 0]

    print(classification_report(valid_tags, valid_preds, target_names=target_names))

evaluate_model(model, val_loader, idx2tag)


# Inference Function

In [None]:
def predict(model, sentence, word2idx, idx2tag, max_len):
    model.eval()
    sentence_idx = [word2idx.get(word, 0) for word in sentence]
    padded_sentence = sentence_idx[:max_len] + [0] * (max_len - len(sentence_idx))
    input_tensor = torch.tensor([padded_sentence], dtype=torch.long).to("cpu")

    with torch.no_grad():
        outputs = model(input_tensor)
        predictions = torch.argmax(outputs, dim=2).cpu().numpy()[0]

    return [(word, idx2tag[pred]) for word, pred in zip(sentence, predictions[:len(sentence)])]

test_sentence = ["Mustafa", "Rizwan", "lives", "in", "Pakistan"]
print(predict(model, test_sentence, word2idx, idx2tag, MAX_LEN))
