# 50.007 Machine Learning, Spring 2025
# Design Project

Due 27 Apr 2025, 5:00pm

By: Aishwarya Iyer (1007141) and Khoo Zi Qi (1006984)

## Part 1 (30points)

In [5]:
import os
train_file_path = "EN/train"  # Adjust if necessary
print("File exists:", os.path.exists(train_file_path))


File exists: True


Write a function that estimates the emission parameters from the training set using MLE (maximum likelihood estimation):

In [6]:
"""
Computes emission parameters for an HMM: e(x|y) = Count(y → x) / Count(y)
where:
- x: observed word
- y: corresponding tag (e.g., 'B-NP', 'I-VP', 'O')
"""
# Use defaultdict to automatically handles missing keys
from collections import defaultdict

def compute_emission_parameters(train_file_path):
    """
    Args:
        train_file_path: Path to training file (word-tag pairs separated by whitespace)
    
    Returns:
        Dictionary of dictionaries: emission_parameters[tag][word] = probability
    """
    
    # Initialize counters:
    # - emission_counts[tag][word] = times word appears with tag
    # - tag_counts[tag] = total occurrences of tag
    emission_counts = defaultdict(lambda: defaultdict(int))
    tag_counts = defaultdict(int)

    # Count word-tag co-occurrences and tag frequencies
    with open(train_file_path, 'r', encoding='utf-8') as file:
        for line in file:
            line = line.strip()
            if line:  # Skip empty lines
                try:
                    word, tag = line.split()  # Split by any whitespace
                    emission_counts[tag][word] += 1
                    tag_counts[tag] += 1
                except ValueError:
                    print(f"Skipping invalid line: {line}")

    # Calculate emission probabilities
    emission_parameters = defaultdict(dict)
    for tag in emission_counts:
        total_tag_occurrences = tag_counts[tag]
        for word in emission_counts[tag]:
            emission_parameters[tag][word] = (
                emission_counts[tag][word] / total_tag_occurrences
            )
    
    return emission_parameters

emission_parameters = compute_emission_parameters(train_file_path)
# print(emission_parameters)

Use smoothing
- Identify words that appear less than 3 times
- Replace those words with #UNK#


In [7]:
def compute_emission_parameters_smoothing(train_file_path, k):

    """
    Args:
        train_file_path: Path to training file (word-tag pairs separated by whitespace)
        k: minimum count of word. If word count less than k, replace word with #UNK#.
    
    Returns:
        Dictionary of dictionaries: emission_parameters[tag][word] = probability
    """
    
    word_counts = defaultdict(int)
    with open(train_file_path, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if line:  # not an empty line
                word, tag = line.split()
                word_counts[word] += 1
    
    # Identify rare words
    rare_words = {word for word, count in word_counts.items() if count < k}

    # Initialize counters:
    # - emission_counts[tag][word] = times word appears with tag
    # - tag_counts[tag] = total occurrences of tag
    emission_counts = defaultdict(lambda: defaultdict(int))
    tag_counts = defaultdict(int)

    # Count word-tag co-occurrences and tag frequencies
    with open(train_file_path, 'r', encoding='utf-8') as file:
        for line in file:
            line = line.strip()
            if line:  # Skip empty lines
                try:
                    word, tag = line.split()  # Split by any whitespace
                    processed_word = word if word not in rare_words else '#UNK#' #modify training set
                    emission_counts[tag][processed_word] += 1
                    tag_counts[tag] += 1
                except ValueError:
                    print(f"Skipping invalid line: {line}")

    # Calculate emission probabilities
    emission_parameters = defaultdict(dict)
    for tag in emission_counts:
        total_tag_occurrences = tag_counts[tag]
        for word in emission_counts[tag]:
            emission_parameters[tag][word] = (
                emission_counts[tag][word] / total_tag_occurrences
            )
    
    return emission_parameters

emission_parameters_smoothing = compute_emission_parameters_smoothing(train_file_path, k = 3)
# print(emission_parameters_smoothing)

Implement a simple system that produces the tag
y∗= arg maxy e(x|y)
for each word x in the sequence.

In [8]:
dev_in_file_path = 'EN/dev.in'

In [9]:
def predict_tags(dev_in_file_path, emission_parameters, unknown_tag='O'):
    """
    Predicts tags for a sentence using emission probabilities.
    
    Args:
        sentence: List of words to tag
        emission_params: Dictionary from compute_emission_parameters()
        unknown_tag: Default tag for unseen words
    
    Returns:
        List of (word, predicted_tag) tuples
    """
    predicted = []
    
    with open(dev_in_file_path, 'r', encoding='utf-8') as file:
        for line in file:
            word = line.strip()
            if word:  # Skip empty lines, each line has one word
                max_prob = -1
                best_tag = unknown_tag  # Default fallback
                try:
                    # Find tag with highest emission probability for this word
                    for tag in emission_parameters:
                        if word in emission_parameters[tag]:
                            if emission_parameters[tag][word] > max_prob:
                                max_prob = emission_parameters[tag][word]
                                best_tag = tag
                    
                    predicted.append((word, best_tag))
                except ValueError:
                    print(f"Skipping invalid line: {line}")         
    return predicted

predicted_list = predict_tags(dev_in_file_path, emission_parameters_smoothing)

Learn these parameters with train, and evaluate your system on the development set dev.in for
each of the dataset. Write your output to dev.p2.out.

In [10]:
def write_predictions(predicted_list, output_file_path):
    with open(output_file_path, 'w', encoding='utf-8') as fout:
        for word, tag in predicted_list:
            fout.write(f"{word} {tag}\n")

output_file_path = 'outputs/dev.p2.out'
write_predictions(predicted_list, output_file_path)

Compare your outputs and the gold-standard outputs in dev.out and report the precision, recall and F scores of such a baseline system

In [11]:
def extract_chunks(tag_sequence):
    """Convert tag sequence to list of (start_idx, end_idx, chunk_type) tuples"""
    chunks = []
    current_chunk = None
    
    for i, tag in enumerate(tag_sequence):
        if tag.startswith('B-'):
            if current_chunk:
                chunks.append(current_chunk)
            current_chunk = (i, i+1, tag[2:])
        elif tag.startswith('I-'):
            if current_chunk and current_chunk[2] == tag[2:]:
                current_chunk = (current_chunk[0], i+1, current_chunk[2])
            else:
                # Invalid transition (O → I), treat as B-
                if current_chunk:
                    chunks.append(current_chunk)
                current_chunk = (i, i+1, tag[2:])
        else:  # O
            if current_chunk:
                chunks.append(current_chunk)
            current_chunk = None
    
    if current_chunk:
        chunks.append(current_chunk)
    
    return chunks

In [14]:
def evaluate(gold_file, pred_file):
    """Calculate precision, recall and F1"""
    gold_chunks = []
    pred_chunks = []
    
    # Read both files simultaneously
    with open(gold_file, 'r', encoding='utf-8') as fgold, \
         open(pred_file, 'r', encoding='utf-8') as fpred:
        
        gold_sentence = []
        pred_sentence = []
        
        for gold_line, pred_line in zip(fgold, fpred):
            gold_line = gold_line.strip()
            pred_line = pred_line.strip()
            
            if gold_line and pred_line:
                # Get tags (assuming format: word\tTag)
                gold_tag = gold_line.split()[1]
                pred_tag = pred_line.split()[1]
                gold_sentence.append(gold_tag)
                pred_sentence.append(pred_tag)
            else:
                # End of sentence
                if gold_sentence and pred_sentence:
                    gold_chunks.extend(extract_chunks(gold_sentence))
                    pred_chunks.extend(extract_chunks(pred_sentence))
                gold_sentence = []
                pred_sentence = []
    
    # Calculate metrics
    gold_set = set(gold_chunks)
    pred_set = set(pred_chunks)
    
    tp = len(gold_set & pred_set)  # True positives
    fp = len(pred_set - gold_set)  # False positives
    fn = len(gold_set - pred_set)  # False negatives
    
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    
    return {
        'precision': precision,
        'recall': recall,
        'f1': f1,
        'tp': tp,
        'fp': fp,
        'fn': fn
    }

def print_metrics(metrics):
    """Pretty-print evaluation metrics"""
    print(f"Precision: {metrics['precision']:.4f}")
    print(f"Recall:    {metrics['recall']:.4f}")
    print(f"F1 Score:  {metrics['f1']:.4f}")
    print(f"True Positives:  {metrics['tp']}")
    print(f"False Positives: {metrics['fp']}")
    print(f"False Negatives: {metrics['fn']}")

metrics = evaluate('EN/dev.out', 'outputs/dev.p2.out')
print_metrics(metrics)

Precision: 0.6478
Recall:    0.6575
F1 Score:  0.6526
True Positives:  526
False Positives: 286
False Negatives: 274


In [15]:
def compute_transition_parameters(train_file_path):
    """
    Args:
        train_file_path: Path to training file (word-tag pairs separated by whitespace)
    
    Returns:
        Dictionary of dictionaries: transition_parameters[prev_tag][tag] = probability
    """
    # Initialize counters:
    # - transition_counts[prev_tag][tag] = times prev_tag transitions to tag
    # - prev_tag_counts[prev_tag] = total occurrences of prev_tag
    transition_counts = defaultdict(lambda: defaultdict(int))
    prev_tag_counts = defaultdict(int)

    # Count tag transitions and previous tag frequencies
    with open(train_file_path, 'r', encoding='utf-8') as file:
        prev_tag = 'START'
        for line in file:
            line = line.strip()
            if line:  # Skip empty lines
                try:
                    _, tag = line.split()  # Split by any whitespace
                    transition_counts[prev_tag][tag] += 1
                    prev_tag_counts[prev_tag] += 1
                    prev_tag = tag
                except ValueError:
                    print(f"Skipping invalid line: {line}")

    # Calculate transition probabilities
    transition_parameters = defaultdict(dict)
    total_prev_tag_occurrences = sum(prev_tag_counts.values())
    for prev_tag in transition_counts:
        total_tag_occurrences = prev_tag_counts[prev_tag]
        for tag in transition_counts[prev_tag]:
            transition_parameters[prev_tag][tag] = (
                transition_counts[prev_tag][tag] / total_tag_occurrences
            )
    
    # Add special cases for q(STOP|yn) and q(y1|START)
    transition_parameters['STOP'] = defaultdict(int)
    transition_parameters['START'] = defaultdict(int)
    transition_parameters['START']['y1'] = 1.0
    
    return transition_parameters

transition_parameters = compute_transition_parameters(train_file_path)
#print(transition_parameters)