# Libraries Import 📚

In [2]:
import os
import glob

import torch
from torch.optim import AdamW
from torch.utils.data import DataLoader

from datasets import Dataset

from transformers import AutoTokenizer, BertForTokenClassification, TrainingArguments, Trainer,DataCollatorForTokenClassification,get_scheduler


import matplotlib.pyplot as plt

import evaluate

from tqdm import tqdm

import time

import numpy as np


# File Parsing 📝

In [3]:
def read_conllx(filepath):
    sentences_tokens = []
    sentences_tags = []
    with open(filepath, 'r', encoding='utf8') as f:
        tokens = []
        tags = []
        for line in f:
            line = line.strip()
            if not line:
                if tokens:
                    sentences_tokens.append(tokens)
                    sentences_tags.append(tags)
                    tokens, tags = [], []
            else:
                parts = line.split()
                if len(parts) < 5:
                    continue
                tokens.append(parts[1])
                tags.append(parts[4])
        if tokens:
            sentences_tokens.append(tokens)
            sentences_tags.append(tags)
    return sentences_tokens, sentences_tags

def load_data_from_folders(folders, base_path="wsj"):
    all_tokens = []
    all_tags = []
    for folder in folders:
        folder_path = os.path.join(base_path, folder)
        for file in glob.glob(os.path.join(folder_path, "*.conllx")):
            tokens, tags = read_conllx(file)
            all_tokens.extend(tokens)
            all_tags.extend(tags)
    return all_tokens, all_tags


# Data Split 🗂️

In [4]:
train_folders = [f"{i:02d}" for i in range(0, 19)]   # Folders 00 to 18
val_folders   = [f"{i:02d}" for i in range(19, 22)]    # Folders 19 to 21
test_folders  = [f"{i:02d}" for i in range(22, 25)]    # Folders 22 to 24

train_tokens, train_tags = load_data_from_folders(train_folders, base_path="wsj")
val_tokens, val_tags = load_data_from_folders(val_folders, base_path="wsj")
test_tokens, test_tags = load_data_from_folders(test_folders, base_path="wsj")

data_train = {"tokens": train_tokens, "pos_tags": train_tags}
data_val   = {"tokens": val_tokens, "pos_tags": val_tags}
data_test  = {"tokens": test_tokens, "pos_tags": test_tags}

# i wanna see a ll the unique pos tags in the dataset
# Flatten the list of lists to get all tags and then find unique tags
unique_train_tags = set(tag for tags in train_tags for tag in tags)
print("a ", unique_train_tags)

unique_val_tags = set(tag for tags in val_tags for tag in tags)
print("b ", unique_val_tags)

unique_test_tags = set(tag for tags in test_tags for tag in tags)
print("c ", unique_test_tags)

print(len(unique_train_tags),len(unique_val_tags),len(unique_test_tags))




a  {'$', 'CD', 'IN', 'RP', 'CC', 'WP', 'WRB', '#', 'MD', '``', 'VBG', '-LRB-', 'RBS', 'RBR', 'EX', 'VB', 'RB', 'FW', 'JJR', 'NNS', 'SYM', 'NNPS', 'UH', 'PRP$', 'VBN', 'PDT', 'LS', 'POS', 'WP$', 'TO', 'DT', 'VBP', 'JJS', 'WDT', 'NN', "''", 'PRP', 'JJ', 'NNP', 'VBD', ':', '.', ',', 'VBZ', '-RRB-'}
b  {'$', 'CD', 'IN', 'RP', 'CC', 'WP', 'WRB', '#', 'MD', '``', 'VBG', '-LRB-', 'RBS', 'RBR', 'EX', 'VB', 'RB', 'FW', 'JJR', 'NNS', 'SYM', 'NNPS', 'UH', 'PDT', 'VBN', 'PRP$', 'LS', 'POS', 'WP$', 'TO', 'DT', 'VBP', 'JJS', 'WDT', 'NN', "''", 'PRP', 'JJ', 'NNP', 'VBD', ':', '.', ',', 'VBZ', '-RRB-'}
c  {'$', 'CD', 'IN', 'RP', 'CC', 'WP', 'WRB', '#', 'MD', '``', 'VBG', '-LRB-', 'RBS', 'RBR', 'EX', 'VB', 'RB', 'FW', 'JJR', 'NNS', 'SYM', 'NNPS', 'LS', 'PRP$', 'VBN', 'PDT', 'UH', 'POS', 'WP$', 'TO', 'DT', 'VBP', 'JJS', 'WDT', 'NN', "''", 'PRP', 'JJ', 'NNP', 'VBD', ':', '.', ',', 'VBZ', '-RRB-'}
45 45 45


# Load Tokenizer and Model 🤖

In [5]:
tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")

label_list = sorted({tag for tags in data_train["pos_tags"] for tag in tags})
label_to_id = {label: i for i, label in enumerate(label_list)}

#use bert for token classification
model = BertForTokenClassification.from_pretrained("bert-base-cased", num_labels=len(label_list), label2id=label_to_id, id2label={i: label for i, label in enumerate(label_list)})
#set the device to cuda if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-cased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


BertForTokenClassification(
  (bert): BertModel(
    (embeddings): BertEmbeddings(
      (word_embeddings): Embedding(28996, 768, padding_idx=0)
      (position_embeddings): Embedding(512, 768)
      (token_type_embeddings): Embedding(2, 768)
      (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
      (dropout): Dropout(p=0.1, inplace=False)
    )
    (encoder): BertEncoder(
      (layer): ModuleList(
        (0-11): 12 x BertLayer(
          (attention): BertAttention(
            (self): BertSdpaSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.1, inplace=False)
            )
            (output): BertSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (LayerNorm): LayerNorm((768,), eps=1e-12

# Tokenize and Align Labels 📝

In [6]:
def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(examples["tokens"], truncation=True, is_split_into_words=True)
    all_labels = []
    for i, labels in enumerate(examples["pos_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            if word_idx is None:
                label_ids.append(-100)  # Ignore special tokens
            elif word_idx != previous_word_idx:
                # Assign the label for the first token of the word
                label_ids.append(label_to_id.get(labels[word_idx], -100))
            else:
                # Assign -100 to subsequent tokens of the same word
                label_ids.append(-100)
            previous_word_idx = word_idx
        all_labels.append(label_ids)
    tokenized_inputs["labels"] = all_labels
    return tokenized_inputs

# Convert datasets to Hugging Face Dataset objects
train_dataset = Dataset.from_dict(data_train)
val_dataset = Dataset.from_dict(data_val)
test_dataset = Dataset.from_dict(data_test)

# Apply the tokenization and alignment function
train_dataset = train_dataset.map(tokenize_and_align_labels, batched=True)
val_dataset = val_dataset.map(tokenize_and_align_labels, batched=True)
test_dataset = test_dataset.map(tokenize_and_align_labels, batched=True)

# Print dataset sizes
print(f"Train dataset size: {len(train_dataset)}")
print(f"Validation dataset size: {len(val_dataset)}")
print(f"Test dataset size: {len(test_dataset)}")


Map:   0%|          | 0/38219 [00:00<?, ? examples/s]

Map:   0%|          | 0/5527 [00:00<?, ? examples/s]

Map:   0%|          | 0/5462 [00:00<?, ? examples/s]

Train dataset size: 38219
Validation dataset size: 5527
Test dataset size: 5462


# Training 🦾

In [7]:
# Set the format for PyTorch
train_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "labels"])
val_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "labels"])
test_dataset.set_format(type="torch", columns=["input_ids", "attention_mask", "labels"])

# Define training arguments
training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=32,
    per_device_eval_batch_size=32,
    num_train_epochs=3,
    weight_decay=0.01,
    logging_dir="./logs",
    logging_steps=10,
)

# Define the data collator
data_collator = DataCollatorForTokenClassification(tokenizer)
# Define the Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    tokenizer=tokenizer,
    data_collator=data_collator,
)

# Train the model
trainer.train()
# Save the model
trainer.save_model("./results")
# Save the tokenizer
tokenizer.save_pretrained("./results")


  trainer = Trainer(


Epoch,Training Loss,Validation Loss
1,0.0835,0.072144
2,0.055,0.069548
3,0.0396,0.068169


('./results\\tokenizer_config.json',
 './results\\special_tokens_map.json',
 './results\\vocab.txt',
 './results\\added_tokens.json',
 './results\\tokenizer.json')

# Evalutation 👨‍🏫

In [8]:
# evaluate the model on the test set
trainer.evaluate(test_dataset)

# Load the model for evaluation
model = BertForTokenClassification.from_pretrained("./results", num_labels=len(label_list), label2id=label_to_id, id2label={i: label for i, label in enumerate(label_list)})
model.to(device)

# Define the evaluation metric
metric = evaluate.load("accuracy")

# Create a DataLoader for the test dataset
test_dataloader = DataLoader(test_dataset, batch_size=16, collate_fn=data_collator)
# Initialize lists to store predictions and labels
predictions_list = []
labels_list = []
# Set the model to evaluation mode
model.eval()
# Disable gradient calculation for inference
with torch.no_grad():
    # Iterate over the test dataset
    for batch in tqdm(test_dataloader, desc="Evaluating"):
        # Move the batch to the device
        batch = {k: v.to(device) for k, v in batch.items()}
        # Get the model predictions
        outputs = model(**batch)
        logits = outputs.logits
        # Get the predicted labels
        predictions = torch.argmax(logits, dim=2)
        # Append the predictions and labels to the lists
        predictions_list.extend(predictions.cpu().numpy())
        labels_list.extend(batch["labels"].cpu().numpy())
# Flatten the lists of predictions and labels
predictions_flat = [item for sublist in predictions_list for item in sublist]
labels_flat = [item for sublist in labels_list for item in sublist]
# Remove ignored index (-100) from predictions and labels
predictions_flat = [p for p, l in zip(predictions_flat, labels_flat) if l != -100]
labels_flat = [l for l in labels_flat if l != -100]
# Calculate accuracy
accuracy = np.sum(np.array(predictions_flat) == np.array(labels_flat)) / len(labels_flat)





Evaluating: 100%|██████████| 342/342 [01:24<00:00,  4.05it/s]


In [11]:
print(f"Test Accuracy: {accuracy:.4f}")
# Print classification report
from sklearn.metrics import classification_report,balanced_accuracy_score,f1_score,accuracy_score
print("Classification Report:")
print(classification_report(labels_flat, predictions_flat, target_names=label_list))

#save in a text file
with open("classification_report.txt", "w") as f:
    f.write(classification_report(labels_flat, predictions_flat, target_names=label_list))

# Calculate and print balanced accuracy
balanced_acc = balanced_accuracy_score(labels_flat, predictions_flat)
print(f"Balanced Accuracy: {balanced_acc:.4f}")
# Calculate and print F1 score
f1 = f1_score(labels_flat, predictions_flat, average='weighted')
print(f"F1 Score: {f1:.4f}")
# Calculate and print accuracy score
acc_score = accuracy_score(labels_flat, predictions_flat)

#save these metrics in a text file
with open("metrics.txt", "w") as f:
    f.write(f"Balanced Accuracy: {balanced_acc:.4f}\n")
    f.write(f"F1 Score: {f1:.4f}\n")
    f.write(f"Accuracy Score: {acc_score:.4f}\n")


Test Accuracy: 0.9773
Classification Report:
              precision    recall  f1-score   support

           #       1.00      1.00      1.00        15
           $       1.00      1.00      1.00       943
          ''       1.00      1.00      1.00      1045
           ,       1.00      1.00      1.00      6876
       -LRB-       0.99      1.00      0.99       186
       -RRB-       1.00      1.00      1.00       187
           .       1.00      1.00      1.00      5381
           :       1.00      1.00      1.00       752
          CC       1.00      1.00      1.00      3250
          CD       0.99      0.99      0.99      4823
          DT       1.00      0.99      0.99     11183
          EX       0.97      1.00      0.98       126
          FW       0.43      0.30      0.35        30
          IN       0.98      0.99      0.98     13492
          JJ       0.94      0.94      0.94      8215
         JJR       0.91      0.95      0.93       423
         JJS       0.96      0.98   

# User Test

In [17]:
import random
import gradio as gr

# Define a function to pick a random sentence from the test dataset and perform POS tagging
def random_sentence_pos_tagging():
    # Pick a random folder and file
    random_folder = random.choice(test_folders)
    folder_path = os.path.join("wsj", random_folder)
    random_file = random.choice(glob.glob(os.path.join(folder_path, "*.conllx")))
    
    # Read sentences from the selected file
    sentences_tokens, _ = read_conllx(random_file)
    
    # Pick a random sentence from the file
    random_index = random.randint(0, len(sentences_tokens) - 1)
    sentence = " ".join(sentences_tokens[random_index])
    
    # Tokenize the input sentence
    words = sentence.split()
    tokenized_input = tokenizer(words, return_tensors="pt", is_split_into_words=True, truncation=True)
    tokenized_input = {key: value.to(device) for key, value in tokenized_input.items()}
    
    # Get model predictions
    model.eval()
    with torch.no_grad():
        outputs = model(**tokenized_input)
        logits = outputs.logits
        predictions = torch.argmax(logits, dim=2)
    
    # Map predictions to labels
    predicted_labels = [model.config.id2label[label_id] for label_id in predictions[0].cpu().numpy()]
    
    # Create a result dictionary
    result = [{"word": word, "label": label} for word, label in zip(words, predicted_labels)]
    
    # Return the sentence, result, folder, and file
    return f"Folder: {random_folder}, File: {os.path.basename(random_file)}", sentence, result

# Create a Gradio interface
iface = gr.Interface(
    fn=random_sentence_pos_tagging,
    inputs=None,
    outputs=["text", "text", "json"],
    title="Random Sentence POS Tagging",
    description="Click the button to pick a random sentence from folders 22 to 24 and get its POS tags."
)

iface.launch(share=True)


* Running on local URL:  http://127.0.0.1:7866

Could not create share link. Please check your internet connection or our status page: https://status.gradio.app.


