In [216]:
from pathlib import Path
from collections import defaultdict
import numpy as np

data_dir = Path("data/")
dataset = "EN" # EN or ES

# Part 1(i): Emission scores
Compute $e(x|y) = \frac{\text{Count}(y \rightarrow x)}{\text{Count}(y)}$ and store it into a dictionary $f$.

In [217]:
def calculate_emission_scores(path):
    '''
    Inputs:
        path (Path object or str): path on the local directory to the dataset to load from.
    Outputs:
        f (dict): Key-value mapping of str(x, y) to log(e(x|y)) values.
    '''

    count_emission = defaultdict(int) # Stores Count(y -> x), where key is tuple (x, y), and value is Count(y -> x)
    count_labels = defaultdict(int) # Stores Count(y), where key is y
    
    # Read from dataset path
    with open(path) as f:
        lines = f.readlines()
        
        # Process lines
        for line in lines:
            # Strip newline
            formatted_line = line.strip()
            
            # Only process lines that are not newlines
            if len(formatted_line) > 0:
                # Split into (x, y) pair
                split_data = formatted_line.split(" ")
                x, y = split_data[0].lower(), split_data[1]

                count_emission[(x, y)] += 1
                count_labels[y] += 1
    
    # Result dictionary that maps str(x, y) to log(e(x|y)) values
    f = {}
    
    # Estimate e(x|y) based on counts
    for x, y in count_emission:
        # Create str(x, y)
        feature_str = f"emission:{y}+{x}"
        e = count_emission[(x, y)] / count_labels[y]
        
        # Store score
        f[feature_str] = np.log(e)
    
    return f

# Part 1(ii): Transition scores
Compute $q(y_i|y_{i-1}) = \frac{\text{Count}(y_{i-1}, y_i)}{\text{Count}(y_{i-1})}$ and store it into a dictionary $f$.

In [218]:
def calculate_transition_scores(path):
    '''
    Inputs:
        path (Path object or str): path on the local directory to the dataset to load from.
    Outputs:
        f (dict): Key-value mapping of str(y_{i-1}, y_i) to log(q(y_i|y_{i-1})) values.
    '''
    
    count_transition = defaultdict(int) # Key is tuple (y_i, y_{i-1}), and value is Count(y_{i-1}, y_i)
    count_labels = defaultdict(int) # Stores Count(y_i), where key is y_i
    
    # Read from dataset path
    with open(path) as f:
        lines = f.readlines()
        # Initialize prev_y as START
        prev_y = "START"
        
        # Process lines
        for line in lines:
            # Strip newline
            formatted_line = line.strip()
            
            # Only process lines that are not newlines
            if len(formatted_line) > 0:
                # Split into (x, y) pair
                split_data = formatted_line.split(" ")
                x, y = split_data[0].lower(), split_data[1]
                
                transition_key = (prev_y, y)
                count_transition[transition_key] += 1
                count_labels[y] += 1
                
                # Update for next word
                prev_y = y
            else:
                # End of sentence
                # Store Count(STOP|y_n)
                transition_key = (prev_y, "STOP")
                count_transition[transition_key] += 1
                
                # Start of next sentence: initialize prev_y to "START" and store Count(START)
                prev_y = "START"
                count_labels[prev_y] += 1
    
    # Result dictionary that maps str(x, y) to log(e(x|y)) values
    f = {}
    
    # Estimate e(x|y) based on counts
    for prev_y, y in count_transition:
        # Create str(y_{i-1}, y_i)
        feature_str = f"transition:{prev_y}+{y}"
        q = count_transition[(prev_y, y)] / count_labels[prev_y]
        
        # Store score
        f[feature_str] = np.log(q)
    
    return f

In [219]:
# Compute emission and transition parameters
f_emission_train = calculate_emission_scores(data_dir / dataset / "train")
f_transition_train = calculate_transition_scores(data_dir / dataset / "train")

# Combine the transition and emission dictionaries together
f = {**f_emission_train, **f_transition_train}
# Ensure the number of elements is correct
assert(len(f) == len(f_emission_train) + len(f_transition_train))

# Part 2(i)
Compute CRF scores for a given input and output sequence pair.

In [220]:
def compute_crf_score(x, y, feature_dict):
    ''' 
    Inputs:
        x (list[str]): Complete input word sentence (without START or STOP tags)
        y (list[str]): Complete output label sequence
        feature_dict (dict[str] -> float): Dictionary that maps a given feature to its score.
    Outputs:
        p (float): Score given by p(y | x)
    '''
    
    # Input and output sequences must be of the same length
    assert(len(x) == len(y))
    n = len(x) # Sequence length
    
    # Stores the number of times each feature appears in (x, y)
    feature_count = defaultdict(int)
    
    # Compute emission features
    for i in range(n):
        formatted_word = x[i].lower()
        emission_key = f"emission:{y[i]}+{formatted_word}"
        feature_count[emission_key] += 1
    
    # Compute transition features
    # Add START and STOP tags to y
    updated_y = ["START"] + y + ["STOP"]
    for i in range(1, n+2):
        prev_y = updated_y[i-1]
        y_i = updated_y[i]
        transition_key = f"transition:{prev_y}+{y_i}"
        feature_count[transition_key] += 1
    
    # Compute score
    score = 0
    for feature_key, count in feature_count.items():
        weight = feature_dict[feature_key]
        score += weight * count
    
    return score

x = "Great food with an awesome atmosphere !".split()
y = "O B-positive O O O B-positive O".split()
compute_crf_score(x, y, f)

-44.57667948595218

# Part 2(ii)
Viterbi algorithm for decoding.

In [222]:
def get_states(path):
    '''
    Inputs:
        path (Path object or str): path on the local directory to the dataset to load from.
    Outputs:
        states (list[str]): Unique states in the dataset.
    '''
    states = set()
    
    # Read from dataset path
    with open(path) as f:
        lines = f.readlines()
        
        # Process lines
        for line in lines:
            # Strip newline
            formatted_line = line.strip()
            
            # Only process lines that are not newlines
            if len(formatted_line) > 0:
                # Split into (x, y) pair
                split_data = formatted_line.split(" ")
                x, y = split_data[0], split_data[1]
                states.add(y)
    
    return list(states)

def viterbi_decode(x, states, feature_dict):
    '''
    Inputs:
        x (list[str]): Input sequence.
        states (list[str]): Possible output states.
        feature_dict (dict[str] -> float): Dictionary that maps a given feature to its score.
    Outputs:
        y (list[str]): Most probable output sequence.
    '''
    
    n = len(x) # Number of words
    d = len(states) # Number of states
    scores = np.full((n, d), np.nan)
    bp = np.ones((n, d), dtype=np.int) # TODO: Default to 'O', or something else?

    # Convert to lowercase
    x = [x[i].lower() for i in range(n)]
    
    # Compute START transition scores
    for i, current_y in enumerate(states):
        transition_key = f"transition:START+{current_y}"
        emission_key = f"emission:{current_y}+{x[0]}"
        if transition_key in feature_dict and emission_key in feature_dict:
            transmission_score = feature_dict[transition_key]
            emission_score = feature_dict[emission_key]
            scores[0, i] = transmission_score + emission_score
    
    # Recursively compute best scores based on transmission and emission scores at each node
    for i in range(1, n):
        for k, prev_y in enumerate(states):
            for j, current_y in enumerate(states):
                transition_key = f"transition:{prev_y}+{current_y}"
                emission_key = f"emission:{current_y}+{x[i]}"
                
                # Only consider if the feature exists
                if transition_key in feature_dict and emission_key in feature_dict:
                    transition_score = feature_dict[transition_key]
                    emission_score = feature_dict[emission_key]
                    overall_score = emission_score + transition_score + scores[i-1, k]
                    
                    # Better score is found: Update backpointer and score arrays
                    if (np.isnan(scores[i, j]) and not np.isnan(overall_score)) or overall_score > scores[i, j]:
                        scores[i, j] = overall_score
                        bp[i,j] = k
    
    # Compute for STOP
    highest_score = None
    highest_bp = 1
    
    for j, prev_y in enumerate(states):
        if not np.isnan(scores[n-1, j]):
            transition_key = f"transition:{prev_y}+STOP"

            if transition_key in feature_dict:
                transition_score = feature_dict[transition_key]
                overall_score = transition_score + scores[n-1, j]
                if highest_score == None or overall_score > highest_score:
                    highest_score = overall_score
                    highest_bp = j
    
    # Follow backpointers to get output sequence
    result = [states[highest_bp]]
    prev_bp = highest_bp
    for i in range(n-1, 0, -1):
        prev_bp = bp[i, prev_bp]
        output = states[prev_bp]
        # Prepend result to output list
        result = [output] + result
    
    return result

states = get_states(data_dir / dataset / "train")
viterbi_decode(x, states, f)

['O', 'B-positive', 'O', 'O', 'O', 'B-positive', 'O']

In [225]:
# Perform decoding on dev sets
def inference(path, states, feature_dict):
    '''
    Given a path, perform inference on sentences in the dataset and writes it to disk.
    
    Inputs:
        path (Path object or str): path on the local directory to the dataset to perform inference on.
        states (list[str]): Unique states that can be predicted.
        feature_dict (dict[str] -> float): Dictionary that maps a given feature to its score.
    Outputs:
        None
    '''
    
    # TODO: Can downcase everything? Ask the TA
    sentences = []

    # Write predictions to file
    output_filename = str(path).replace(".in", ".p2.out")

    # Read from dataset path
    with open(path) as f:
        lines = f.readlines()
        sentence = []
        
        for line in lines:
            formatted_line = line.strip()
            
            # Not the end of sentence, add it to the list
            if len(formatted_line) > 0:
                sentence.append(formatted_line)
            else:
                # End of sentence
                sentences.append(sentence)
                sentence = []
    
    # Write output file
    with open(output_filename, "w") as wf:
        for sentence in sentences:
            # Run predictions
            pred_sentence = viterbi_decode(sentence, states, feature_dict)
            
            # Write original word and predicted tags
            for i in range(len(sentence)):
                wf.write(sentence[i] + " " + pred_sentence[i] + "\n")
            
            # End of sentence, write newline
            wf.write("\n")

inference(data_dir / dataset / "dev.in", states, f)