# **Part of Speech Tagging using Hidden Markov Models**


My **OWN** Hidden Markov Model to predict part of speech tags of words.

In [62]:
import subprocess
import shutil
import os

def clone_turkish_treebanks():
  # We will Clone the Turkish treebanks repository from Google Research.
  repo_url = "https://github.com/google-research-datasets/turkish-treebanks.git"
  target_dir = "turkish-treebanks"
  result = subprocess.run(["git", "clone", repo_url], capture_output=True, text=True)

def move_conllu_file():
  # We will copy the conllu files to /content directory.
  # web.conllu is our Web domain text
  # wiki.conllu is our Wikipedia domain text
  source = "turkish-treebanks/data/web.conllu"
  dest_dir = "/content"
  dest_file = os.path.join(dest_dir, "web.conllu")
  shutil.copy2(source, dest_file)
  dest_file2 = os.path.join(dest_dir, "wiki.conllu")
  shutil.copy2(source, dest_file2)

# Run the functions
if __name__ == "__main__":
  clone_turkish_treebanks()
  move_conllu_file()

In [63]:
def read_conll(file_path):
    sentences = []
    sentence = []

    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()

            # Skip metadata lines starting with "#"
            if line.startswith("#"):
                continue

            # Sentence boundary (blank line)
            if not line:
                if sentence:
                    sentences.append(sentence)
                    sentence = []
            else:
                # Split the line into columns and extract word and POS tag
                columns = line.split('\t')
                if len(columns) > 3:  # Check to ensure the line has expected columns
                    word = columns[2]
                    if word == '_':
                      continue

                    pos_tag = columns[3]
                    sentence.append((word, pos_tag))

        # Add the last sentence if the file doesn't end with a blank line
        if sentence:
            sentences.append(sentence)

    return sentences

# Usage
web_file_path = "/content/web.conllu"
wiki_file_path = "/content/wiki.conllu"

web_sentences = read_conll(web_file_path)
wiki_sentences = read_conll(wiki_file_path)

sentences = web_sentences + wiki_sentences

In [64]:
len(sentences)

5082

## 1. Preprocessing function

In [103]:
def prepare_data(sentences, tag_list):

  # Considering only the tags that are in the tag_list

  # We will store filtered list of sentences that have valid tags
  prepared_sentences = []
  for sentence in sentences:
    filtered_words = [] # We will store filtered words of a sentence
    for (word, tag) in sentence: # Loop over words and tags
      if tag in tag_list: # List of POS tags that we keep
        filtered_words.append((word, tag)) # We add if the tag is in list
    if filtered_words: # If the sentence is not empty
      prepared_sentences.append(filtered_words) # We add the sentence to the list
  return prepared_sentences

## 2. Train test split

In [104]:
import random

# This is for our data splitting
# Initialize empty set to store unique tags
unique_tags = set()

# We loop through each sentence in corpus
for sentence in sentences:
  # Iterate through each word-tag pair in the sentence
  for word, tag in sentence:
    # Add each tag to the set -> we removed duplicates automatically
    unique_tags.add(tag)

# Convert set to sorted list
all_tags = list(unique_tags)

# We need two versions of the dataset
# 1 ->> Full dataset with all POS tags
full_sentences = prepare_data(sentences, all_tags)
# 2 ->> Limited dataset with only asked POS tags from Professor
limited_sentences = prepare_data(sentences, ["ADJ", "ADV", "NOUN", "VERB", "PUNC"])

# Set random seed for all random operations
random.seed(42)

# Calculate split sizes ->>> 80% train and 20% test
train_size_full = int(0.8 * len(full_sentences))
train_size_limited = int(0.8 * len(limited_sentences))

# Split full tagset data
train_sentences_full = full_sentences[:train_size_full]
test_sentences_full = full_sentences[train_size_full:]

# Split limited tagset data
train_sentences_limited = limited_sentences[:train_size_limited]
test_sentences_limited = limited_sentences[train_size_limited:]

## 3. Model Training

In [105]:
def create_HMM(train_sentences):
  '''
    type train_sentences: list of tuples
    param train_sentences: The list of tuples  (word, POS tag) for the training data sentences
    rtype: dict
    return: the transitions count dictionary between tags
    rtype: dict
    return: the emissions count dictionary for tags and words
    rtype: dict
    return: the tag count dictionary for the POS tags
    rtype: set
    return: the vocabulary of the corpus

  '''

  transitions = {} # We stores tag transition probabilities
  emissions = {} # We stores word emission probabilities
  tag_counts = {} # To Keep count of each tag
  vocabulary = set() # Stores unique words
  total_word_count = 0  # Total word count

  # We go through each sentence
  for sentence in train_sentences:
    current_tag = "<START>" # Mark sentence beginning

  # Iterate through each word-tag pair in the sentence
    for current_word, next_tag in sentence:
      # We track transition from current_tag to next_tag
      if current_tag not in transitions:
        transitions[current_tag] = {}
      transitions[current_tag][next_tag] = transitions[current_tag].get(next_tag, 0) + 1

      # We track emission of word from its tag
      if next_tag not in emissions:
        emissions[next_tag] = {}
      emissions[next_tag][current_word] = emissions[next_tag].get(current_word, 0) + 1

      # We update statistics
      tag_counts[next_tag] = tag_counts.get(next_tag, 0) + 1
      vocabulary.add(current_word)
      total_word_count += 1

      # We prepare for next iteration
      current_tag = next_tag

    # Mark sentence end here
    if current_tag not in transitions:
      transitions[current_tag] = {}
    transitions[current_tag]["<END>"] = transitions[current_tag].get("<END>", 0) + 1

  return transitions, emissions, tag_counts, vocabulary, total_word_count

In [106]:
transitions_full, emissions_full, tag_counts_full, vocab_full, word_count_full = create_HMM(train_sentences_full)
transitions_limited, emissions_limited, tag_counts_limited, vocab_limited, word_count_limited = create_HMM(train_sentences_limited)

## 4. POS tag prediction for test data

Viterbi algorithm to predict the POS tags of the test data.

In [107]:
import numpy as np
import math
def viterbi(test_sentence, transitions, emissions, tag_counts, word_count):
    '''
    type test_sentence: list of strings
    param test_sentence: list of words in a sentence
    type transitions: dict
    param: the transitions count dictionary between tags
    type emissions: dict
    param: the emissions count dictionary for tags and words
    type tag_counts: dict
    param: the tag count dictionary for the POS tags
    type word_count: int
    param: the word count of the training corpus
    rtype: list
    return: the list of predicted tags for the test sentence

    '''
    #viterbi matrix for the test sentence. The matrix has +2 in each side since we add
    #START and END tags for probability calculation.
    v_matrix = np.empty(shape=(len(tag_counts)+2,len(sentences)+2))
    v_matrix.fill(-1)
    v_matrix[0][0] = 0

    #fill up the viterbi matrix for each cell, starting with the first word.
    # for every word, check each tag's probability of appearing here.
    ## iterate over the tags:
    ## calculate the best transition probability for the current tag from a previous tag (again iteration of all possible tags)
    ## calculate the emission probability of word coming up from the current tag
    ## sum them up to get to the best probability of P(tag|word).
    ## keep the best path coming to that cell
    #after filling up the viterbi matrix, follow the best path back to predict the tags from the final cell.
    # in the end you are looking at len(tags)xlen(tags) possibilities for each word

    tags = list(tag_counts.keys())  # We will get unique POS tags
    backpointer = np.zeros((len(test_sentence), len(tags)), dtype=int)  # And will take track of best paths

    # This is for Forward Pass -> We will calculate probabilities for each position
    for t in range(len(test_sentence)):
      word = test_sentence[t] # Get the word

      # We check each possible current tag
      for j, curr_tag in enumerate(tags):
        # We try to find max_probability
        max_prob = float('-inf')  # Track maximum probability
        best_prev = 0  # Index of best previous tag

        # Calculate emission probability P(word|tag) with smoothing
        emit_p = math.log(emissions.get(curr_tag, {}).get(word, 1) / (tag_counts[curr_tag] + word_count))

        # Find best previous tag
        for i, prev_tag in enumerate(tags):
          # Check first word (transition from START) compared to other positions
          if t == 0:
            trans_p = math.log(transitions.get("<START>", {}).get(curr_tag, 1) / (tag_counts.get("<START>", len(tags))))
          else:
            trans_p = math.log(transitions.get(prev_tag, {}).get(curr_tag, 1) / (tag_counts[prev_tag]))

          # Calculate total path prob coming from transition and emission
          curr_prob = v_matrix[i][t] + trans_p + emit_p

          # We update if there exist better path
          if curr_prob > max_prob:
            max_prob = curr_prob
            best_prev = i

        # In the end, we store best prob and backpointer
        v_matrix[j][t+1] = max_prob
        backpointer[t][j] = best_prev

    # BACKWARD PASS: We reconstruct the best tag sequence here
    predicted_tags = []  # Store final sequence of predicted tags here

    # Find the tag with highest prob in final position
    # 1 is reserved for END tag
    # We use -2 index
    curr_best = np.argmax(v_matrix[:, -2])  # Index of maximum prob

    # Backtrack through the sentence from right to left
    for t in range(len(test_sentence)-1, -1, -1):
        # Add current best tag to front of seq
        predicted_tags.insert(0, tags[curr_best])
        # Follow backpointer to previous best tag
        curr_best = backpointer[t][curr_best]

    return predicted_tags

## 5. Evaluate the HMM

In [115]:
from sklearn.metrics import accuracy_score

# This function created to display the confusion matrix in a readable format.
def print_confusion_matrix(conf_matrix, tags):
  print("\nConfusion Matrix:")

  print("True\\Pred", end="\t") # Label for the true tags
  for tag in tags:
    print(f"{tag:>8}", end="") # Print each predicted tag as column head
  print()# Move to the next line after headers

  # Print each row of the confusion matrix
  for i, true_tag in enumerate(tags):
    print(f"{true_tag:8}", end="\t") # Print the actual tag at the start of the row
    for j in range(len(tags)):
      print(f"{conf_matrix[i][j]:8}", end="") # Print the count of predictions
    print()


# This function created to calculate f1 score
def calculate_f1_score(tp, fp, fn):

  # tp: True positives - correct predictions
  # fp: False positives - incorrect predictions
  # fn: False negatives - missed predictions

  # If no true positives -> to prevent 0 / x issue
  if tp == 0:
    return 0.0

  # Calculate precision
  # If denominator is 0 -> return 0 to avoid division by zero
  precision = tp / (tp + fp) if (tp + fp) > 0 else 0

  # Calculate recall
  # If denominator is 0 -> return 0 to avoid division by zero
  recall = tp / (tp + fn) if (tp + fn) > 0 else 0

  # Calculate F1 score: 2 * (precision * recall) / (precision + recall)
  # If denominator is 0 -> return 0 to avoid division by zero
  if precision + recall > 0:
    f1 = 2 * (precision * recall) / (precision + recall)
  else:
    f1 = 0.0

  return f1

def evaluate_models(test_sentences, transitions, emissions, tag_counts, word_count):
    # test_sentences: List of sentences
    # transitions: Transition probabilities between tags
    # emissions: Emission probabilities of words given tags
    # tag_counts: Count of how many times each tag appears in given data
    # word_count: Total number of words in data

    true_tags = []  # This is list to store all true tags from the test data
    prediction_tags = []  # This is list to store all predicted tags by the model

    # Loop through each sentence in the test data
    for sentence in test_sentences:
      # Here, we separate words and their true tags from the sentence
      words = list(map(lambda x: x[0], sentence))
      true = list(map(lambda x: x[1], sentence))

      # We get the predictions from Viterbi
      pred = viterbi(words, transitions, emissions, tag_counts, word_count)

      # We add the true and predicted tags to their respective lists
      true_tags.extend(true)
      prediction_tags.extend(pred)

    # Calculate the overall accuracy of the model
    accuracy = accuracy_score(true_tags, prediction_tags)

    # We prepare to build confusion matrix and calculate F1 scores here
    tag_list = list(tag_counts.keys()) # Get a list of all possible tags
    tag_count = len(tag_list)
    confusion_matrix = []
    for _ in range(tag_count):
        row = [0] * tag_count
        confusion_matrix.append(row)

    tag_metrics = {}
    for tag in tag_list: # Also, initialize metrics for each tag
        tag_metrics[tag] = {'tp': 0, 'fp': 0, 'fn': 0}

    # Fill the confusion matrix and update metrics for each tag
    for t, p in zip(true_tags, prediction_tags):
      i = tag_list.index(t)  # Find index of true tag
      j = tag_list.index(p)  # Find index of predicted tag
      confusion_matrix[i][j] += 1  # Increment count in confusion matrix

      if t == p:
        tag_metrics[t]['tp'] += 1  # True Positive
      else:
        tag_metrics[t]['fn'] += 1  # False Negative
        tag_metrics[p]['fp'] += 1  # False Positive

    # Calculate F1 scores for each tag
    f1_scores = {}
    for tag in tag_list:
      tp = tag_metrics[tag]['tp']
      fp = tag_metrics[tag]['fp']
      fn = tag_metrics[tag]['fn']

      if tp + fp > 0 and tp + fn > 0:
        precision = tp / (tp + fp)
        recall = tp / (tp + fn)
        f1_scores[tag] = 2 * (precision * recall) / (precision + recall)
      else:
        f1_scores[tag] = 0.0

    # Calculate the macro average of F1 scores (simple average)
    f1_macro = sum(f1_scores.values()) / len(f1_scores)

    # Calculate the weighted average of F1 scores based on tag frequency
    total_tags = len(true_tags)
    weighted_f1 = 0
    for tag in tag_list:
      weighted_f1 += f1_scores[tag] * true_tags.count(tag)
    weighted_f1 /= total_tags

    # Return all the evaluation metrics
    return accuracy, f1_macro, weighted_f1, confusion_matrix, tag_list


# Evaluation code starts here
print("--------------------------Results for full tag set--------------------------")

# Evaluate the model using the full set of tags
acc_full, f1_macro_full, f1_weighted_full, conf_full, tags_full = evaluate_models(
  test_sentences_full,
  transitions_full,
  emissions_full,
  tag_counts_full,
  word_count_full
)

# Print the accuracy and F1 scores for full tag set
print(f"Accuracy: {acc_full:.4f}")
print(f"F1 Macro: {f1_macro_full:.4f}")
print(f"F1 Weighted: {f1_weighted_full:.4f}")

print("\n--------------------------Results for limited tag set--------------------------")

# Evaluate the model using a limited set of tags
acc_limited, f1_macro_limited, f1_weighted_limited, conf_limited, tags_limited = evaluate_models(
  test_sentences_limited,
  transitions_limited,
  emissions_limited,
  tag_counts_limited,
  word_count_limited
)

# Print the accuracy and F1 scores for limited tag set
print(f"Accuracy: {acc_limited:.4f}")
print(f"F1 Macro: {f1_macro_limited:.4f}")
print(f"F1 Weighted: {f1_weighted_limited:.4f}")

# Print the confusion matrix for full tag set
print("\n--------------------------Confusion Matrix (Full Tags)--------------------------")
print_confusion_matrix(conf_full, tags_full)

# Print the confusion matrix for limited tag set
print("\n--------------------------Confusion Matrix (Limited Tags)--------------------------")
print_confusion_matrix(conf_limited, tags_limited)

--------------------------Results for full tag set--------------------------
Accuracy: 0.7551
F1 Macro: 0.5322
F1 Weighted: 0.7549

--------------------------Results for limited tag set--------------------------
Accuracy: 0.8889
F1 Macro: 0.7478
F1 Weighted: 0.8792

--------------------------Confusion Matrix (Full Tags)--------------------------

Confusion Matrix:
True\Pred	    PRON     PRT    VERB    CONJ   PUNCT    NOUN     ADJ     NUM     DET     ADP     ADV       X   AFFIX    ONOM
PRON    	     191       0      29       0       0      41       4       0      59       0       3       0       0       0
PRT     	       0     119      73      11       0      12       0       0       0       0       3       0       0       0
VERB    	      24       5    2014       0       6     219       8       0       7       4       1       0       0       0
CONJ    	       1       4       6     359       1      28       4       0       4      11       5       1       0       0
PUNCT   	     957     

In [116]:
# Bonus Enhancements
def replace_rare_words(sentences, min_freq=2):
  word_freq = {}

  # Count word frequencies
  for sentence in sentences:
    for word, _ in sentence:
        word_freq[word] = word_freq.get(word, 0) + 1

  # This part mostly same as prepare_data function.
  modified_sentences = []
  for sentence in sentences:
      modified_sentence = []
      for word, tag in sentence:
          if word_freq[word] < min_freq: # If lower than our freq
              modified_sentence.append(("<UNK>", tag)) # Replace with <unk> tag
          else:
              modified_sentence.append((word, tag)) # Unless, same tactic as prepare_data function
      modified_sentences.append(modified_sentence)
  return modified_sentences

# We replace here the rare words in both training and test sets
train_sentences_full = replace_rare_words(train_sentences_full, min_freq=2)
train_sentences_limited = replace_rare_words(train_sentences_limited, min_freq=2)

#  We replace here the rare words in the test sets based on the vocabulary of the training sets
test_sentences_full = replace_rare_words(test_sentences_full, min_freq=2)
test_sentences_limited = replace_rare_words(test_sentences_limited, min_freq=2)

# Recalculate HMM
transitions_full, emissions_full, tag_counts_full, vocab_full, word_count_full = create_HMM(train_sentences_full)
transitions_limited, emissions_limited, tag_counts_limited, vocab_limited, word_count_limited = create_HMM(train_sentences_limited)

# Evaluate the HMM
print("--------------------------Results after Rare Word Replacement (Full Tags)--------------------------")

acc_full, f1_macro_full, f1_weighted_full, conf_full, tags_full = evaluate_models(
    test_sentences_full,
    transitions_full,
    emissions_full,
    tag_counts_full,
    word_count_full
)

print(f"Accuracy: {acc_full:.4f}")
print(f"F1 Macro: {f1_macro_full:.4f}")
print(f"F1 Weighted: {f1_weighted_full:.4f}")

print("\n--------------------------Results after Rare Word Replacement (Limited Tags)--------------------------")

acc_limited, f1_macro_limited, f1_weighted_limited, conf_limited, tags_limited = evaluate_models(
    test_sentences_limited,
    transitions_limited,
    emissions_limited,
    tag_counts_limited,
    word_count_limited
)

print(f"Accuracy: {acc_limited:.4f}")
print(f"F1 Macro: {f1_macro_limited:.4f}")
print(f"F1 Weighted: {f1_weighted_limited:.4f}")

print("\n--------------------------Confusion Matrix (Full Tags after Rare Word Replacement)--------------------------")
print_confusion_matrix(conf_full, tags_full)

print("\n--------------------------Confusion Matrix (Limited Tags after Rare Word Replacement)--------------------------")
print_confusion_matrix(conf_limited, tags_limited)

--------------------------Results after Rare Word Replacement (Full Tags)--------------------------
Accuracy: 0.7596
F1 Macro: 0.5311
F1 Weighted: 0.7583

--------------------------Results after Rare Word Replacement (Limited Tags)--------------------------
Accuracy: 0.8833
F1 Macro: 0.7399
F1 Weighted: 0.8732

--------------------------Confusion Matrix (Full Tags after Rare Word Replacement)--------------------------

Confusion Matrix:
True\Pred	    PRON     PRT    VERB    CONJ   PUNCT    NOUN     ADJ     NUM     DET     ADP     ADV       X   AFFIX    ONOM
PRON    	     188       0      29       0       0      44       5       0      58       0       3       0       0       0
PRT     	       0     118      73      11       0      13       0       0       0       0       3       0       0       0
VERB    	      24       5    1916       0       1     323       7       0       7       4       1       0       0       0
CONJ    	       1       4       6     359       0      29       4     