Assignment 2

Pranjal Malik (M24CSA021)

Shivani Tiwari (M24CSA029)

Suvigya Sharma (M24CSA033)



---



Dataset Preprocessing

In [1]:
import re

def preprocess_twitter_text(text):

    # Replace user mentions with "@USER"
    text = re.sub(r'@\w+', '@USER', text)

    # Replace URLs with "URL"
    text = re.sub(r'http\S+|www\S+', 'URL', text)

    # Normalize repeated punctuation (e.g., "!!!" -> "!", "???" -> "?")
    text = re.sub(r'([!?.,])\1+', r'\1', text)

    # Normalize whitespace: Remove leading/trailing spaces & collapse multiple spaces
    text = re.sub(r'\s+', ' ', text).strip()

    return text

Loading Dataset

In [3]:
def load_dataset(file_name):

    dataset = []
    sentence = []

    with open(file_name, 'r', encoding='utf-8') as file:
        for line in file:
            line = line.strip()
            if line:
                parts = line.split("\t")
                if len(parts) == 2:
                    word, tag = parts
                    word = preprocess_twitter_text(word)  # Apply preprocessing
                    sentence.append((word, tag))
            else:  # Empty line indicates end of a sentence
                if sentence:
                    dataset.append(sentence)
                    sentence = []

    # Add last sentence if file does not end with a newline
    if sentence:
        dataset.append(sentence)

    return dataset
train_data = load_dataset("train.txt")
dev_data = load_dataset("dev.txt")
test_data = load_dataset("test.txt")

for i, sentence in enumerate(train_data[:3]):
    print(f"Sentence {i+1}:")
    for word, tag in sentence:
        print(f"({word}, {tag})", end=" ")
    print("\n")

Sentence 1:
(@USER, O) (@USER, O) (they, O) (will, O) (be, O) (all, O) (done, O) (by, O) (Sunday, O) (trust, O) (me, O) (*wink*, O) 

Sentence 2:
(Made, O) (it, O) (back, O) (home, O) (to, O) (GA, B-loc) (., O) (It, O) (sucks, O) (not, O) (to, O) (be, O) (at, O) (Disney, B-facility) (world, I-facility) (,, O) (but, O) (its, O) (good, O) (to, O) (be, O) (home, O) (., O) (Time, O) (to, O) (start, O) (planning, O) (the, O) (next, O) (Disney, B-facility) (World, I-facility) (trip, O) (., O) 

Sentence 3:
(', O) (Breaking, B-movie) (Dawn, I-movie) (', O) (Returns, O) (to, O) (Vancouver, B-loc) (on, O) (January, O) (11th, O) (URL, O) 



Calculating:

Start probability (π)

Transition probability (A)

Emission probability (B)

For both bigram and trigram model

In [4]:
from pickle import FALSE
from collections import defaultdict

def calculate_hmm_parameters(dataset, model="bigram", laplace_k=0.1, boost_factor=0.75, use_context=True):

    state_counts = defaultdict(int)  # Count occurrences of each state
    start_counts = defaultdict(int)  # Count occurrences of states at the beginning of sentences
    transition_counts = defaultdict(lambda: defaultdict(int))  # Transition counts (bigram or trigram)

    # initialize emission and total emissions based on use_context
    if use_context:
        emission_counts = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
        total_emissions = defaultdict(lambda: defaultdict(int))
    else:
        emission_counts = defaultdict(lambda: defaultdict(int))  # P(word | tag)
        total_emissions = defaultdict(int)  # Total emissions for tag only

    total_transitions = defaultdict(int)  # Total transitions per state or state-pair
    vocabulary = set()  #Stores unique words

    # Step 1: Identify states and count occurrences
    for sentence in dataset:
        prev_tag = None
        prev_prev_tag = None  # Needed for trigram

        for i, (word, tag) in enumerate(sentence):
            word = word.lower()
            state_counts[tag] += 1  # Count tag occurrence
            vocabulary.add(word)  # Store unique words in vocabulary

            if use_context:
                emission_counts[tag][prev_tag][word] += 1  #
                total_emissions[tag][prev_tag] += 1
            else:
                emission_counts[tag][word] += 1  # Standard P(word | tag)
                total_emissions[tag] += 1

            if i == 0:
                start_counts[tag] += 1  # Count start state
            else:
                if model == "bigram":
                    transition_counts[prev_tag][tag] += 1  # Count bigram transitions
                    total_transitions[prev_tag] += 1  # Count total transitions from prev_tag
                elif model == "trigram":
                    if prev_prev_tag is not None:
                        transition_counts[(prev_prev_tag, prev_tag)][tag] += 1  # Count trigram transitions
                        total_transitions[(prev_prev_tag, prev_tag)] += 1  # Count total transitions

            prev_prev_tag = prev_tag
            prev_tag = tag

    # Total unique states and words
    num_states = len(state_counts)
    vocab_size = len(vocabulary)

    # Step 2: Calculate start probability π with Laplace smoothing
    total_sentences = len(dataset)
    states = list(state_counts.keys())
    start_prob = {state: (start_counts[state] + laplace_k) / (total_sentences + laplace_k * num_states) for state in states}

    # Step 3: Calculate transition probability A with Laplace smoothing
    trans_prob = {}

    if model == "bigram":
        trans_prob = {state1: {} for state1 in states}
        for state1 in states:
            total = total_transitions[state1] + (laplace_k * num_states)  # Apply smoothing
            for state2 in states:
                trans_prob[state1][state2] = (transition_counts[state1][state2] + laplace_k) / total

    elif model == "trigram":
        trans_prob = {state_pair: {} for state_pair in transition_counts}
        for state_pair in transition_counts:
            total = total_transitions[state_pair] + (laplace_k * num_states)  # Apply smoothing
            for state3 in states:
                trans_prob[state_pair][state3] = (transition_counts[state_pair][state3] + laplace_k) / total

    # Step 4: Calculate emission probability B with Laplace smoothing
    emit_prob = {}

    if use_context:
        emit_prob = {state: defaultdict(dict) for state in states}
        for state in states:
            for prev_tag in states:
                total = total_emissions[state][prev_tag] + (laplace_k * vocab_size)
                for word in vocabulary:
                    prob = (emission_counts[state][prev_tag][word] + laplace_k) / total

                    # Apply controlled boost to named entity tags
                    if state.startswith("B-") or state.startswith("I-"):
                        prob *= boost_factor

                    emit_prob[state][prev_tag][word] = prob

    else:
        emit_prob = {state: {} for state in states}
        for state in states:
            total = total_emissions[state] + (laplace_k * vocab_size)
            for word in vocabulary:
                prob = (emission_counts[state][word] + laplace_k) / total  # ✅ Standard P(word | tag)

                # Apply controlled boost to named entity tags
                if state.startswith("B-") or state.startswith("I-"):
                    prob *= boost_factor

                emit_prob[state][word] = prob

    # Print results
    print("States:", states)
    print("Start Probabilities:", start_prob)
    print(f"Transition Probabilities ({model} model, Laplace k={laplace_k}):", {k: v for k, v in list(trans_prob.items())[:3]})  # Print a few
    print("Emission Probabilities:", {k: v for k, v in list(emit_prob.items())[:3]})  # Print a few

    return states, start_prob, trans_prob, emit_prob

print("\nUsing Bigram Model:")
states_bigram, start_prob_bigram, trans_prob_bigram, emit_prob_bigram = calculate_hmm_parameters(train_data, model="bigram", laplace_k=0.1, boost_factor=0.75, use_context=False)

print("\nUsing Trigram Model:")
states_trigram, start_prob_trigram, trans_prob_trigram, emit_prob_trigram = calculate_hmm_parameters(train_data, model="trigram", laplace_k=0.1, boost_factor=0.75, use_context=False)


Using Bigram Model:
States: ['O', 'B-loc', 'B-facility', 'I-facility', 'B-movie', 'I-movie', 'B-company', 'B-product', 'B-person', 'B-other', 'I-other', 'B-sportsteam', 'I-sportsteam', 'I-product', 'I-company', 'I-person', 'I-loc', 'B-tvshow', 'B-musicartist', 'I-musicartist', 'I-tvshow']
Start Probabilities: {'O': 0.9394849964525688, 'B-loc': 0.005467217561871374, 'B-facility': 0.0033804933016151244, 'I-facility': 4.1734485205125e-05, 'B-movie': 0.0025458035975126246, 'I-movie': 4.1734485205125e-05, 'B-company': 0.007136596970076375, 'B-product': 0.005049872709820124, 'B-person': 0.019239597679562626, 'B-other': 0.011310045490588875, 'I-other': 4.1734485205125e-05, 'B-sportsteam': 0.0037978381536663743, 'I-sportsteam': 4.1734485205125e-05, 'I-product': 4.1734485205125e-05, 'I-company': 4.1734485205125e-05, 'I-person': 4.1734485205125e-05, 'I-loc': 4.1734485205125e-05, 'B-tvshow': 4.1734485205125e-05, 'B-musicartist': 0.0021284587454613747, 'I-musicartist': 4.1734485205125e-05, 'I-tvs

Viterbi Algorithm for finding best tag sequence for bigram and trigram

In [5]:
#viterbi for finding best tag for bigram and trigram
import numpy as np

# Change this variable to switch between "bigram" and "trigram"
MODEL_TYPE = "trigram"  # Change to "trigram" to use the trigram model

# Assign model-specific variables dynamically
if MODEL_TYPE == "bigram":
    states = states_bigram
    start_prob = start_prob_bigram
    trans_prob = trans_prob_bigram
    emit_prob = emit_prob_bigram
elif MODEL_TYPE == "trigram":
    states = states_trigram
    start_prob = start_prob_trigram
    trans_prob = trans_prob_trigram
    emit_prob = emit_prob_trigram

def viterbi_algorithm(sentence, states, start_prob, trans_prob, emit_prob, model_type="bigram"):
    n = len(sentence)
    m = len(states)
    epsilon = 1e-6  # Small probability to handle unseen cases

    # Convert states to an index mapping
    state_idx = {state: i for i, state in enumerate(states)}

    # Convert words to lowercase before looking them up
    sentence = [word.lower() for word in sentence]

    # Initialize DP tables
    dp = np.zeros((m, n))  # DP table to store probabilities
    backpointer = np.zeros((m, n), dtype=int)  # Stores the best previous state

    # **Initialization step**
    for i, state in enumerate(states):
        dp[i, 0] = start_prob.get(state, epsilon) * emit_prob.get(state, {}).get(sentence[0], epsilon)  # Handle unseen words

    # **Recursion step**
    if model_type == "bigram":
        for t in range(1, n):
            for i, state in enumerate(states):
                max_prob = -1
                best_prev_state = 0

                for j, prev_state in enumerate(states):
                    prob = dp[j, t-1] * trans_prob.get(prev_state, {}).get(state, epsilon) * emit_prob.get(state, {}).get(sentence[t], epsilon)

                    if prob > max_prob:
                        max_prob = prob
                        best_prev_state = j

                dp[i, t] = max_prob
                backpointer[i, t] = best_prev_state

    elif model_type == "trigram":
        if n == 1:
            # If only one word, use bigram initialization
            return viterbi_algorithm(sentence, states, start_prob, trans_prob, emit_prob, model_type="bigram")

        # **Handling first two words using bigram model**
        for i, state in enumerate(states):
            for j, prev_state in enumerate(states):
                dp[i, 1] = dp[j, 0] * trans_prob.get(prev_state, {}).get(state, epsilon) * emit_prob.get(state, {}).get(sentence[1], epsilon)
                backpointer[i, 1] = j

        # **Trigram-based recursion from t=2 onwards**
        for t in range(2, n):
            for i, state in enumerate(states):
                max_prob = -1
                best_prev_state = 0
                best_prev_prev_state = 0  # Track second previous state

                for j, prev_state in enumerate(states):
                    for k, prev_prev_state in enumerate(states):  # Add extra loop for trigram
                        prob = (
                            dp[k, t-2] *
                            trans_prob.get((prev_prev_state, prev_state), {}).get(state, epsilon) *
                            emit_prob.get(state, {}).get(sentence[t], epsilon)
                        )

                        if prob > max_prob:
                            max_prob = prob
                            best_prev_state = j
                            best_prev_prev_state = k  # Store second previous state

                dp[i, t] = max_prob
                backpointer[i, t] = best_prev_state  # Track previous state

    # **Termination step: Find the best last state**
    best_last_state = np.argmax(dp[:, -1])
    best_tags = [states[best_last_state]]

    # **Backtracking to reconstruct the best tag sequence**
    for t in range(n-1, 0, -1):
        best_last_state = backpointer[best_last_state, t]
        best_tags.insert(0, states[best_last_state])

    return best_tags

Evaluation on Validation data and Test Data

In [6]:
def evaluate_viterbi(data, states, start_prob, trans_prob, emit_prob, dataset_name="dev.txt"):

    correct = 0
    total = 0
    predictions = []

    for sentence in data:
        words = [word for word, _ in sentence]
        actual_tags = [tag for _, tag in sentence]
        predicted_tags = viterbi_algorithm(words, states, start_prob, trans_prob, emit_prob, model_type = MODEL_TYPE)

        # Count correct predictions
        for actual, predicted in zip(actual_tags, predicted_tags):
            if actual == predicted:
                correct += 1
            total += 1

        predictions.append((words, actual_tags, predicted_tags))  # Store for display

    accuracy = correct / total if total > 0 else 0

    # Print Accuracy
    print(f"\n📊 **Model Accuracy on {dataset_name} ({MODEL_TYPE} HMM):** {accuracy:.4f}")

    # Print a few sample sentences
    print(f"\n🔍 **Sample Predictions using {MODEL_TYPE} HMM (First 3 sentences from {dataset_name}):**")
    for words, actual, predicted in predictions[:3]:
        print("\n📌 Sentence:", " ".join(words))
        print("✅ Actual Tags:  ", actual)
        print("🔮 Predicted Tags:", predicted)

    return accuracy, predictions


# Load dev.txt and test.txt data
dev_data = load_dataset("dev.txt")
test_data = load_dataset("test.txt")

# Evaluate on Dev Data (Hyperparameter Tuning)
accuracy_dev, predictions_dev = evaluate_viterbi(dev_data, states, start_prob, trans_prob, emit_prob, dataset_name="dev.txt")

# Evaluate on Test Data (Final Accuracy Calculation)
accuracy_test, predictions_test = evaluate_viterbi(test_data, states, start_prob, trans_prob, emit_prob, dataset_name="test.txt")


📊 **Model Accuracy on dev.txt (trigram HMM):** 0.8452

🔍 **Sample Predictions using trigram HMM (First 3 sentences from dev.txt):**

📌 Sentence: STOP WHAT YOU'RE DOING AND GO GET #ExpelledMovieToNumberOne ON ITUNES BECAUSE IT'S ONLY 2ND ! @USER Shs
✅ Actual Tags:   ['O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'B-other', 'O', 'O', 'O', 'O', 'O', 'O', 'O']
🔮 Predicted Tags: ['I-tvshow', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'B-movie', 'I-movie']

📌 Sentence: RT @USER : Safe to say Super Bowl Sunday is my favourite holiday of the year
✅ Actual Tags:   ['O', 'O', 'O', 'O', 'O', 'O', 'B-other', 'I-other', 'I-other', 'O', 'O', 'O', 'O', 'O', 'O', 'O']
🔮 Predicted Tags: ['I-tvshow', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O']

📌 Sentence: RT @USER : @USER second is Bawana constituency on Feb 4 - Final
✅ Actual Tags:   ['O', 'O', 'O', 'O', 'O', 'O', 'B-loc', 'O', 'O', 'O', 'O', 'O', 'O']
🔮 Predicted Tags: ['I-tvshow', 'O', 'O', '

Per Class Accuracy, precion, Recall, F score

In [None]:
from collections import defaultdict

def analyze_per_class_accuracy(data, states, start_prob, trans_prob, emit_prob, dataset_name="Validation"):

    correct = 0
    total = 0

    # Track per-class statistics
    class_correct = defaultdict(int)   # True Positives (TP)
    class_total = defaultdict(int)     # Actual occurrences (Total Ground Truth)
    class_predicted = defaultdict(int) # Total predictions of this class

    predictions = []

    for sentence in data:
        words = [word for word, _ in sentence]
        actual_tags = [tag for _, tag in sentence]
        predicted_tags = viterbi_algorithm(words, states, start_prob, trans_prob, emit_prob, model_type = MODEL_TYPE)

        for actual, predicted in zip(actual_tags, predicted_tags):
            if actual == predicted:
                correct += 1
                class_correct[actual] += 1  # Correct prediction for this class
            class_total[actual] += 1  # Total occurrences of this class
            class_predicted[predicted] += 1  # How often this class is predicted

            total += 1

        predictions.append((words, actual_tags, predicted_tags))

    accuracy = correct / total if total > 0 else 0

    # Compute per-class precision, recall, F1-score, and accuracy
    print(f"\n📊 **Per-Class Performance Analysis on {dataset_name} Data:**")
    print("{:<15} {:<10} {:<10} {:<10} {:<10}".format("Class", "Precision", "Recall", "F1-score", "Accuracy"))
    print("-" * 60)

    total_precision = 0
    total_recall = 0
    total_f1 = 0
    class_count = 0

    for tag in states:
        tp = class_correct[tag]  # True Positives
        total_actual = class_total[tag]  # Total in ground truth (actual occurrences)
        total_predicted = class_predicted[tag]  # Total predictions of this class

        precision = tp / total_predicted if total_predicted > 0 else 0
        recall = tp / total_actual if total_actual > 0 else 0
        f1 = (2 * precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        accuracy_class = tp / total_actual if total_actual > 0 else 0

        # Sum up values for macro-averaged computation
        total_precision += precision
        total_recall += recall
        total_f1 += f1
        class_count += 1

        print("{:<15} {:<10.4f} {:<10.4f} {:<10.4f} {:<10.4f}".format(tag, precision, recall, f1, accuracy_class))

    # Compute macro-averaged precision, recall, and F1-score
    macro_precision = total_precision / class_count if class_count > 0 else 0
    macro_recall = total_recall / class_count if class_count > 0 else 0
    macro_f1 = total_f1 / class_count if class_count > 0 else 0

    # Compute micro-averaged precision, recall, and F1-score
    total_tp = sum(class_correct.values())
    total_actual = sum(class_total.values())
    total_predicted = sum(class_predicted.values())

    micro_precision = total_tp / total_predicted if total_predicted > 0 else 0
    micro_recall = total_tp / total_actual if total_actual > 0 else 0
    micro_f1 = (2 * micro_precision * micro_recall) / (micro_precision + micro_recall) if (micro_precision + micro_recall) > 0 else 0

    print("\n📌 **Overall Performance Metrics on {} Data**:".format(dataset_name))
    print(f"✅ **Accuracy:** {accuracy:.4f}")
    print(f"✅ **Macro Precision:** {macro_precision:.4f}")
    print(f"✅ **Macro Recall:** {macro_recall:.4f}")
    print(f"✅ **Macro F1-score:** {macro_f1:.4f}")
    print(f"✅ **Micro Precision:** {micro_precision:.4f}")
    print(f"✅ **Micro Recall:** {micro_recall:.4f}")
    print(f"✅ **Micro F1-score:** {micro_f1:.4f}")

    return accuracy, macro_precision, macro_recall, macro_f1, micro_precision, micro_recall, micro_f1, predictions


# Run Per-Class Accuracy Analysis for Validation Data
print("\n🔍 **Analyzing Per-Class Accuracy on Validation Data...**")
acc_dev, macro_prec_dev, macro_rec_dev, macro_f1_dev, micro_prec_dev, micro_rec_dev, micro_f1_dev, predictions_dev = analyze_per_class_accuracy(
    dev_data, states, start_prob, trans_prob, emit_prob, dataset_name="Validation"
)

# Run Per-Class Accuracy Analysis for Test Data
print("\n🔍 **Analyzing Per-Class Accuracy on Test Data...**")
acc_test, macro_prec_test, macro_rec_test, macro_f1_test, micro_prec_test, micro_rec_test, micro_f1_test, predictions_test = analyze_per_class_accuracy(
    test_data, states, start_prob, trans_prob, emit_prob, dataset_name="Test"
)



🔍 **Analyzing Per-Class Accuracy on Validation Data...**
