In [48]:
from collections import defaultdict
import re

class HMM:
  def __init__(self):
    self.states = [
    "B-PER",  # Beginning of a person named entity
    "I-PER",  # Inside a person named entity
    "B-ORG",  # Beginning of an organization named entity
    "I-ORG",  # Inside an organization named entity
    "B-LOC",  # Beginning of a location named entity
    "I-LOC",  # Inside a location named entity
    "B-MISC",  # Beginning of a miscellaneous named entity
    "I-MISC",  # Inside a miscellaneous named entity
    "O"       # Outside of any named entity
    ]
    self.dataset = []
    self.transition_probs = defaultdict(lambda: defaultdict(float))
    self.emission_probs = defaultdict(lambda: defaultdict(float))
    self.initial_probs = defaultdict(float)

  def process_dataset(self, filename):
    """
    Process the CoNLL 2003 dataset.

    Args:
    - filename: The path to the CoNLL 2003 dataset file.

    Returns:
    - dataset: A list of tuples, where each tuple contains the first and last words from each line.
    """

    with open(filename, 'r') as file:
        for line in file:
            line = line.strip()  # Remove leading and trailing whitespace

            # Skip lines starting with '-DOCSTART-'
            if line.startswith("-DOCSTART-"):
                continue

            # If line is empty (contains only newline character), add newline character to dataset
            if not line:
                self.dataset.append(('\n', '\n'))
            else:
                # Split the line into words
                words = line.split(' ')

                # Extract the first and last words
                first_word = words[0]
                last_word = words[-1]

                self.dataset.append((first_word, last_word))

  def train(self, filename):
    """
    Train a Hidden Markov Model (HMM) using the given dataset.

    Args:
    - dataset: A list of tuples, where each tuple contains the observed word and its corresponding NER tag.

    Returns:
    - transition_probs: Transition probabilities between NER tags.
    - emission_probs: Emission probabilities of words given NER tags.
    - initial_probs: Initial probabilities of NER tags.
    """
    self.process_dataset(filename)
    transition_counts = defaultdict(lambda: defaultdict(int))
    emission_counts = defaultdict(lambda: defaultdict(int))
    initial_counts = defaultdict(int)
    # Count occurrences of NER tags and transitions
    prev_tag = None
    for word, ner_tag in self.dataset:
        if prev_tag is None or prev_tag == '\n':
            initial_counts[ner_tag] += 1
        elif word == '\n':
            prev_tag = ner_tag
            continue
        else:
            transition_counts[prev_tag][ner_tag] += 1
        emission_counts[ner_tag][word] += 1
        prev_tag = ner_tag

    # Calculate initial probabilities
    total_initial = sum(initial_counts.values())
    for tag, count in initial_counts.items():
        self.initial_probs[tag] = count / total_initial

    # Calculate transition probabilities
    for prev_tag, next_tags in transition_counts.items():
        total_transitions = sum(next_tags.values())
        for tag, count in next_tags.items():
            self.transition_probs[prev_tag][tag] = count / total_transitions

    # Calculate emission probabilities
    for tag, word_counts in emission_counts.items():
        total_emissions = sum(word_counts.values())
        for word, count in word_counts.items():
            self.emission_probs[tag][word] = count / total_emissions
  def viterbi(self, sentence):
    """
    Perform NER tagging on a given sentence using the Viterbi algorithm.

    Args:
    - sentence: A list of words in the sentence.

    Returns:
    - tagged_sentence: A list of tuples, where each tuple contains a word and its corresponding NER tag.
    """
    if not hasattr(self, 'transition_probs') or not self.transition_probs \
        or not hasattr(self, 'emission_probs') or not self.emission_probs \
        or not hasattr(self, 'initial_probs') or not self.initial_probs:
        raise RuntimeError("Model has not been trained. Please call the train method first.")

    # Define the time pattern
    time_pattern = r'\d{1,2}:\d{2}(?:am|pm)?|\d+(?:am|pm)'
    time_regex = re.compile(time_pattern)

    # Check if the sentence is empty
    if not sentence:
        return []

    # Initialize matrices for dynamic programming
    dp = [{} for _ in range(len(sentence))]
    backpointer = [{} for _ in range(len(sentence))]

    # Initialize initial probabilities
    for state in self.states:
        # Use a small non-zero value instead of directly using the emission probability
        dp[0][state] = self.initial_probs[state] * self.emission_probs[state].get(sentence[0], 1e-10)
        # Set the backpointer for the initial state to itself
        backpointer[0][state] = state

    # Iterate over each word in the sentence
    for t in range(1, len(sentence)):
        for state in self.states:
            # Calculate the maximum probability and corresponding backpointer
            max_prob = float('-inf')
            max_state = None
            for prev_state in self.states:
                prob = dp[t - 1][prev_state] * self.transition_probs[prev_state][state]
                if prob > max_prob:
                    max_prob = prob
                    max_state = prev_state
            dp[t][state] = max_prob * self.emission_probs[state].get(sentence[t], 0)
            backpointer[t][state] = max_state

    # Backtrack to find the most likely sequence of states
    best_path_prob = float('-inf')
    best_path_end_state = None
    for state in self.states:
        if dp[-1][state] > best_path_prob:
            best_path_prob = dp[-1][state]
            best_path_end_state = state
    best_path = [best_path_end_state]
    for t in range(len(sentence) - 1, 0, -1):
        best_path.append(backpointer[t][best_path[-1]])
    best_path.append(backpointer[0][best_path[-1]])  # Add backpointer at index 0
    best_path.reverse()

    # Check if a word matches the time pattern and tag it as "O" if it does
    tagged_sentence = []
    for word, tag in zip(sentence, best_path):
        if time_regex.match(word):
            tag = "O"  # Tag as non-NER
        tagged_sentence.append((word, tag))

    return tagged_sentence



  def viterbi_batch(self, filename):
    """
    Perform NER tagging on a list of sentences using the Viterbi algorithm.

    Args:
    - sentences: A list of lists, where each inner list contains words in a sentence.

    Returns:
    - tagged_sentences: A list of tagged sentences, where each tagged sentence is a list of tuples
      containing words and their corresponding NER tags.
    """
    sentences = self.tokenize_file(filename)
    tagged_sentences = []
    for sentence in sentences:
        tagged_sentence = self.viterbi(sentence)
        tagged_sentences.append(tagged_sentence)
    return tagged_sentences
  def tokenize_file(self,filename):
    """
    Tokenize the sentences in a file based on the CoNLL 2003 dataset.

    Args:
    - filename: The path to the file to be tokenized.

    Returns:
    - tokenized_sentences: A list of tokenized sentences.
    """
    tokenized_sentences = []

    # Regular expression to identify time expressions like "12:30", "3pm", "3:30am", etc.
    time_pattern = r'\d{1,2}:\d{2}(?:am|pm)?|\d+(?:am|pm)'

    with open(filename, 'r') as file:
        for line in file:
            # Tokenize the sentence using whitespace as the delimiter
            tokens = line.strip().split(' ')
            # Initialize a list to store the tokenized words
            tokenized_sentence = []
            for token in tokens:
                # Check if the token matches the time pattern
                if re.match(time_pattern, token):
                    # If it's a time expression, split it into separate tokens
                    time_tokens = re.findall(r'\d+|\D+', token)
                    tokenized_sentence.extend(time_tokens)
                else:
                    # Otherwise, treat the token as a single word
                    # Handle punctuation marks, numbers, symbols, special characters, and quotation marks as separate words
                    tokenized_sentence.extend(re.findall(r'\w+|[^\w\s]', token))
            # Add the tokenized sentence to the list
            tokenized_sentences.append(tokenized_sentence)

    return tokenized_sentences
ner = HMM()
ner.train('engtrain.txt')

In [62]:
aadarsh = "Hello my name is Aadarsh".split()
print(aadarsh)
print(ner.viterbi(aadarsh))
tayaba = "Hello i am Tayaba Jameel and i am crazy".split()
print(ner.viterbi(tayaba))
blue = "EU rejects German call to boycott British lamb .".split()
print(ner.viterbi(blue))

['Hello', 'my', 'name', 'is', 'Aadarsh']
[('Hello', 'O'), ('my', 'O'), ('name', 'O'), ('is', 'O'), ('Aadarsh', 'O')]
[('Hello', 'B-ORG'), ('i', 'B-ORG'), ('am', 'I-ORG'), ('Tayaba', 'O'), ('Jameel', 'B-PER'), ('and', 'B-PER'), ('i', 'B-PER'), ('am', 'B-PER'), ('crazy', 'B-PER')]
[('EU', 'B-ORG'), ('rejects', 'B-ORG'), ('German', 'O'), ('call', 'B-MISC'), ('to', 'O'), ('boycott', 'O'), ('British', 'O'), ('lamb', 'B-MISC'), ('.', 'O')]


In [74]:
def process_dataset(filename):
    sentences = []
    sentence = []
    with open(filename, 'r') as file:
        for line in file:
            line = line.strip()  # Remove leading and trailing whitespace

            # Skip lines starting with '-DOCSTART-'
            if line.startswith("-DOCSTART-"):
                continue

            # If line is empty (contains only newline character), add newline character to dataset
            if not line:
                sentences.append(sentence)
                sentence = []
            else:
                # Split the line into words
                words = line.split(' ')

                # Extract the first word and append it as a list
                first_word = words[0]
                sentence.append(first_word)
    return sentences

data = process_dataset('engtesta.txt')
tagged_sentences = []
for sentence in data:
  tagged_sentences.append(ner.viterbi(sentence))
del tagged_sentences[0]

def calculate_accuracy(predicted_tags, actual_tags):
    """
    Calculate the accuracy of predicted NER tags compared to actual NER tags.

    Args:
    - predicted_tags: A list of predicted NER tags for each word in the test dataset.
    - actual_tags: A list of actual NER tags for each word in the test dataset.

    Returns:
    - accuracy: The accuracy of the predicted NER tags.
    """
    correct = 0
    total = 0

    for pred_sentence, actual_sentence in zip(predicted_tags, actual_tags):
        for pred_tag, actual_tag in zip(pred_sentence, actual_sentence):
            if pred_tag[1] == actual_tag[1]:  # Compare NER tags
                correct += 1
            total += 1

    accuracy = correct / total if total > 0 else 0
    return accuracy

def process_test_dataset(filename):
    sentences = []
    sentence = []
    with open(filename, 'r') as file:
        for line in file:
            line = line.strip()  # Remove leading and trailing whitespace

            # Skip lines starting with '-DOCSTART-'
            if line.startswith("-DOCSTART-"):
                continue

            # If line is empty (contains only newline character), add newline character to dataset
            if not line:
                sentences.append(sentence)
                sentence = []
            else:
                # Split the line into words
                words = line.split(' ')

                # Extract the first word and append it as a list
                first_word = words[0]
                last_word = words[-1]
                sentence.append((first_word,last_word))
    return sentences

# Load test dataset
test_dataset = process_test_dataset("engtesta.txt")
del test_dataset[0]


# Calculate accuracy
accuracy = calculate_accuracy(predicted_tags, actual_tags)
print("Accuracy:", accuracy)

Accuracy: 0.4307269966122815
