# Part 1

Write a function that estimates the emission parameters from the training set using MLE (maximum
likelihood estimation):

In [1]:
def read_file(file_path):
    with open(file_path, "r", encoding="utf-8") as file:
        file_content = file.read()
    return file_content

def write_file(file_path, content):
    with open(file_path, 'w', encoding="utf-8") as file:
        file.write(content)

In [2]:
def estimate_emission_parameters(file_content):
    tags = {}  # Dictionary to store counts of each observation
    emission_params = {}  # Dictionary to store emission probabilities

    # Split the file content into lines
    lines = file_content.strip().split("\n")

    # Iterate through each line and extract observations and tags
    for line in lines:
        if len(line) == 0:
            continue
        if line[-1] == 'O':
            observation, tag = line[:-1], line[-1]
        elif ('B-positive' in line 
            or 'I-positive' in line 
            or 'B-negative' in line 
            or 'I-negative' in line):
            observation, tag = line[:-10], line[-10:]
        elif ('B-neutral' in line
            or 'I-neutral' in line):
            observation, tag = line[:-9], line[-9:]
#         observation, tag = line.split()
    
        if tag in tags:
            tags[tag][observation] = tags[tag].get(observation, 0) + 1
        else:
            tags[tag] = {observation: 1}

    # Calculate emission probabilities for each observation and tag
    for tag, observations_count in tags.items():
        total_count = sum(observations_count.values())
        emission_params[tag] = {
            observation: count / total_count for observation, count in observations_count.items()
        }

    return emission_params

file_content = read_file("ES/train.txt")

emission_parameters = estimate_emission_parameters(file_content)
# print(emission_parameters, sum(len(words) for words in emission_parameters.values()))

One problem with estimating the emission parameters is that some words that appear in the test set
do not appear in the training set. One simple idea to handle this issue is as follows. We introduce
a special word token #UNK#, and make the following modifications to the computation of emission
probabilities.During the testing phase, if the word does not appear in the training set, we replace that word with
#UNK#. Set k to 1, implement this fix into your function for computing the emission parameters.

In [3]:
def estimate_emission_parameters_modified(training_file_content, test_file_content, k=1):
    tags = {}  # Dictionary to store counts of each observation
    emission_params = {}  # Dictionary to store emission probabilities
    train_words = []
    test_words = []
    
    # Split the file content into lines
    train_data_lines = training_file_content.strip().split("\n")
    test_data_lines = test_file_content.strip().split("\n")

    # Iterate through each line and extract observations and tags
    for line in train_data_lines:
        if len(line) == 0:
            continue
        if ' O' in line:
            observation, tag = line[:-2], line[-1]
        elif ('B-positive' in line 
            or 'I-positive' in line 
            or 'B-negative' in line 
            or 'I-negative' in line):
            observation, tag = line[:-11], line[-10:]
        elif ('B-neutral' in line
            or 'I-neutral' in line):
            observation, tag = line[:-10], line[-9:]
        if observation not in train_words:
            train_words.append(observation)
        if tag in tags:
            tags[tag][observation] = tags[tag].get(observation, 0) + 1
        else:
            tags[tag] = {observation: 1}
    # Iterate through each line to extract observations from test set
    for line in test_data_lines:
        if len(line) == 0:
            continue
        if ' O' in line:
            observation, tag = line[:-2], line[-1]
        elif ('B-positive' in line 
            or 'I-positive' in line 
            or 'B-negative' in line 
            or 'I-negative' in line):
            observation, tag = line[:-11], line[-10:]
        elif ('B-neutral' in line
            or 'I-neutral' in line):
            observation, tag = line[:-10], line[-9:]
        else:
            observation = line
        if observation not in test_words:
            test_words.append(observation)
    # Extract words that are in test set but not in train set   
    unique_words = find_unique_words(test_words, train_words)

    # Calculate emission probabilities for each observation and tag
    for tag, observations_count in tags.items():
        total_count = sum(observations_count.values())
        emission_params[tag] = {
            observation: count / (total_count + k) for observation, count in observations_count.items()
        }
        for word in unique_words:
            emission_params[tag][word] = k / (total_count + k)


    return emission_params, list(set(test_words).union(train_words))

def find_unique_words(list1, list2):
    set1 = set(list1)
    set2 = set(list2)

    unique_in_list1 = set1 - set2

    return list(unique_in_list1)


training_file_content = read_file("ES/train.txt")
test_file_content = read_file("ES/dev.in")
emission_parameters = estimate_emission_parameters_modified(training_file_content, test_file_content, 1)[0]
# print(emission_parameters, sum(len(words) for words in emission_parameters.values()))
count = 0
for emission_probabilities in emission_parameters.values():
    count += len(emission_probabilities)
# print(count)

In [4]:
def sentiment_analysis(path_to_train_file, path_to_test_file):
    training_file_content = read_file(path_to_train_file)
    test_file_content = read_file(path_to_test_file)
    emission_parameters, words = estimate_emission_parameters_modified(training_file_content, test_file_content)
    word_to_label = {}
    for word in words:
        probabilities = []
        for label, freqs in emission_parameters.items():
            if word in freqs:
                probabilities.append((label, freqs[word]))
        word_to_label[word] = max(probabilities, key=lambda x: x[1])[0]
    return word_to_label

def write_result_to_file(word_to_label, path_to_dev_set, path_to_output):
    result = ""
    dev_file_content = read_file(path_to_dev_set)
    dev_set_lines = dev_file_content.strip().split("\n")
    counter = 0
    for line in dev_set_lines:
        if line == "":
            result += "\n"
            continue  
        result += line + " " + word_to_label.get(line) + "\n"
 
    write_file(path_to_output, result)        

In [5]:
mapping = sentiment_analysis("ES/train.txt", "ES/dev.in")
write_result_to_file(mapping, "ES/dev.in", "ES/dev.p1.out")

In [6]:
mapping = sentiment_analysis("RU/train.txt", "RU/dev.in")
write_result_to_file(mapping, "RU/dev.in", "RU/dev.p1.out")

## ES
#Entity in gold data: 229
#Entity in prediction: 1466

#Correct Entity : 178
Entity  precision: 0.1214
Entity  recall: 0.7773
Entity  F: 0.2100

#Correct Sentiment : 97
Sentiment  precision: 0.0662
Sentiment  recall: 0.4236
Sentiment  F: 0.1145

## RU
#Entity in gold data: 389
#Entity in prediction: 1816

#Correct Entity : 266
Entity  precision: 0.1465
Entity  recall: 0.6838
Entity  F: 0.2413

#Correct Sentiment : 129
Sentiment  precision: 0.0710
Sentiment  recall: 0.3316
Sentiment  F: 0.1170

# Part 2

Write a function that estimates the transition parameters from the training set using MLE (maximum
likelihood estimation):

In [7]:
def estimate_transition_parameters(file_content):
    transition_counts = {}
    transition_params = {}  # Dictionary to store emission probabilities

    # Split the file content into lines
    lines = file_content.strip().split("\n")

    # Iterate through each line and extract observations and tags
    for line_index in range(len(lines)):
        next_line = ""
        curr_line = lines[line_index]
        if line_index < len(lines) - 1:
            next_line = lines[line_index + 1]
        
        if len(curr_line) == 0:
            continue
        if curr_line[-1] == 'O':
            curr_observation, curr_tag = curr_line[:-1], curr_line[-1]
        elif ('B-positive' in curr_line 
            or 'I-positive' in curr_line 
            or 'B-negative' in curr_line 
            or 'I-negative' in curr_line):
            curr_observation, curr_tag = curr_line[:-10], curr_line[-10:]
        elif ('B-neutral' in curr_line
            or 'I-neutral' in curr_line):
            curr_observation, curr_tag = curr_line[:-9], curr_line[-9:]
        
        if len(next_line) == 0: #true if line_index = len(lines - 1) or line_index is last index of a document
            pass
        elif next_line[-1] == 'O':
            next_observation, next_tag = next_line[:-1], next_line[-1]
        elif ('B-positive' in next_line 
            or 'I-positive' in next_line 
            or 'B-negative' in next_line 
            or 'I-negative' in next_line):
            next_observation, next_tag = next_line[:-10], next_line[-10:]
        elif ('B-neutral' in next_line
            or 'I-neutral' in next_line):
            next_observation, next_tag = next_line[:-9], next_line[-9:]        
        
        # Handle transitions from START
        if line_index == 0 or lines[line_index - 1] == "":
            if 'START' in transition_counts:
                transition_counts['START'][curr_tag] = transition_counts['START'].get(curr_tag, 0) + 1
            else:
                transition_counts['START'] = {curr_tag: 1}
                
        # Handle transitions to STOP
        if len(next_line) == 0:
            if curr_tag in transition_counts:
                transition_counts[curr_tag]['STOP'] = transition_counts[curr_tag].get('STOP', 0) + 1
            else:
                transition_counts[curr_tag] = {'STOP': 1}
                
        # Rest of the transitions
        else:
            if curr_tag in transition_counts:
                transition_counts[curr_tag][next_tag] = transition_counts[curr_tag].get(next_tag, 0) + 1
            else:
                transition_counts[curr_tag] = {next_tag: 1}
        

    # Calculate transition probabilities 
    for tag, next_tag_counts in transition_counts.items():
        total_count = sum(next_tag_counts.values())
        transition_params[tag] = {
            next_tag: count / total_count for next_tag, count in next_tag_counts.items()
        }

    return transition_params

file_content = read_file("ES/train.txt")

transition_parameters = estimate_transition_parameters(file_content)
print(transition_parameters)

{'START': {'O': 0.9289176090468497, 'B-positive': 0.052234787291330104, 'B-negative': 0.014001077005923533, 'B-neutral': 0.004846526655896607}, 'O': {'O': 0.8856896848630963, 'B-positive': 0.03650766316514551, 'STOP': 0.06344067504735663, 'B-negative': 0.012226623041157224, 'B-neutral': 0.0021353538832443605}, 'B-positive': {'O': 0.871551724137931, 'I-positive': 0.11637931034482758, 'STOP': 0.008620689655172414, 'B-neutral': 0.0008620689655172414, 'B-positive': 0.002586206896551724}, 'B-negative': {'O': 0.8110236220472441, 'I-negative': 0.1784776902887139, 'STOP': 0.010498687664041995}, 'B-neutral': {'I-neutral': 0.20833333333333334, 'O': 0.7916666666666666}, 'I-neutral': {'I-neutral': 0.6511627906976745, 'O': 0.3488372093023256}, 'I-positive': {'I-positive': 0.5700636942675159, 'O': 0.4267515923566879, 'STOP': 0.0031847133757961785}, 'I-negative': {'O': 0.39766081871345027, 'I-negative': 0.6023391812865497}}


In [8]:
def viterbi(observation_sequence, emission_params, transition_params):
    states = list(emission_params.keys())
    T = len(observation_sequence)
    N = len(states)

    # Initialization step
    viterbi_table = [{}]
    backpointer = [{}]
    for state in states:
        viterbi_table[0][state] = transition_params['START'].get(state, 0) * emission_params[state].get(observation_sequence[0], 0)
        backpointer[0][state] = None

    # Recursion step
    for t in range(1, T):
        viterbi_table.append({})
        backpointer.append({})
        for state in states:
            max_prob = max(
                viterbi_table[t - 1][prev_state] * transition_params[prev_state].get(state, 0) * emission_params[state].get(observation_sequence[t], 0)
                for prev_state in states
            )
            viterbi_table[t][state] = max_prob
            backpointer[t][state] = max(states, key=lambda prev_state: viterbi_table[t - 1][prev_state] * transition_params[prev_state].get(state, 0))

    # Termination step
    max_prob_last_state = max(viterbi_table[T - 1].values())
    best_last_state = max(states, key=lambda state: viterbi_table[T - 1][state] * transition_params[state].get('STOP', 0))

    # Backtrack to find the best sequence
    best_sequence = [best_last_state]
    prev_state = best_last_state
    for t in range(T - 2, -1, -1):
        best_sequence.insert(0, backpointer[t + 1][prev_state])
        prev_state = backpointer[t + 1][prev_state]

    return best_sequence

def create_observation_sequences(file_path):
    with open(file_path, "r", encoding="utf-8") as file:
        file_content = file.read().strip()

    observation_sequences = []
    current_sequence = []

    # Split the file content by empty lines to get individual sequences
    sequence_lines = file_content.split("\n\n")

    for sequence_line in sequence_lines:
        # Split each sequence into individual observations (words)
        observation_sequence = sequence_line.strip().split("\n")
        current_sequence.extend(observation_sequence)
        observation_sequences.append(current_sequence)
        current_sequence = []

    return observation_sequences

def viterbi_for_sequences(observation_sequences, emission_params, transition_params):
    best_sequences = []
    for observation_sequence in observation_sequences:
        best_sequence = viterbi(observation_sequence, emission_params, transition_params)
        best_sequences.append(best_sequence)

    return best_sequences

def write_sequences_to_file(output_file_path, observation_sequences, best_sequences):
    with open(output_file_path, 'w', encoding='utf-8') as file:
        for obs_sequence, best_sequence in zip(observation_sequences, best_sequences):
            for word, tag in zip(obs_sequence, best_sequence):
                file.write(f"{word} {tag}\n")
            file.write("\n")  # Separate sequences with an empty line

In [9]:
training_file_content = read_file("ES/train.txt")
test_file_content = read_file("ES/dev.in")
emission_params = estimate_emission_parameters_modified(training_file_content, test_file_content, 1)[0]
transition_params = estimate_transition_parameters(training_file_content)
observation_sequences = create_observation_sequences("ES/dev.in")
best_sequences = viterbi_for_sequences(observation_sequences, emission_params, transition_params)
# print(best_sequences)
   
write_sequences_to_file("ES/dev.p2.out", observation_sequences, best_sequences)

In [10]:
training_file_content = read_file("RU/train.txt")
test_file_content = read_file("RU/dev.in")
emission_params = estimate_emission_parameters_modified(training_file_content, test_file_content, 1)[0]
transition_params = estimate_transition_parameters(training_file_content)
observation_sequences = create_observation_sequences("RU/dev.in")
best_sequences = viterbi_for_sequences(observation_sequences, emission_params, transition_params)
# print(best_sequences)
   
write_sequences_to_file("RU/dev.p2.out", observation_sequences, best_sequences)

## ES
#Entity in gold data: 229
#Entity in prediction: 542

#Correct Entity : 134
Entity  precision: 0.2472
Entity  recall: 0.5852
Entity  F: 0.3476

#Correct Sentiment : 97
Sentiment  precision: 0.1790
Sentiment  recall: 0.4236
Sentiment  F: 0.2516

## RU

#Entity in gold data: 389
#Entity in prediction: 514

#Correct Entity : 190
Entity  precision: 0.3696
Entity  recall: 0.4884
Entity  F: 0.4208

#Correct Sentiment : 129
Sentiment  precision: 0.2510
Sentiment  recall: 0.3316
Sentiment  F: 0.2857

# Part 4: Design Challenge
For our design, we propose the use of Higher Order Hidden Markov Models (HOHMM). 
In a Higher Order HMM, emission pand transition probabilities are conditioned on the current state and the previous several states. This allows the model to capture longer-range dependencies in the sequence. For sentiment analysis, this could help capture more complex sentiment patterns that extend over multiple words.

## 4.1 Data Processing
Here, we aim to use integers to index each possible state in order. This is done so as to enable the use of numpy arrays for calculating of Higher-order Transition Probabilities, that take into account not just the previous state but one before it that will be done in section `4.2`. 

In [46]:
import numpy as np
import pandas as pd

In [47]:
# Functions for this step 4.1
def read_data_from_file(file_path):
    """
    Seperates txt file into texts and labels arary
    """
    texts = []
    labels = []

    with open(file_path, 'r') as file:
        for line in file:
            parts = line.strip().split()
            if len(parts) == 2:
                text = parts[0]
                label = parts[1]
                texts.append(text)
                labels.append(label)
    
    return texts, labels

def find_HO_transition_probabilities(file_path):
    """
    Finds higher order transition probabilities
    INPUTS:
    - file_path (string): Path to the data file in txt format
    OUTPUTS:
    - texts: array containing all texts
    - labels: array containing coressponding labels of each text
    - states: array containing unique states
    - state_to_idx: mapping of states to a numerical index
    - label_idx: array of statesin numerical idx, corresponding to texts
    
    
    """
    texts, labels = read_data_from_file(file_path)
    # Get states
    states = list(set(labels))
    num_states = len(states)
    
    
    # Define a mapping from states to indices
    state_to_idx = {state: idx for idx, state in enumerate(states)}
    
    # Get array of state indices corresponding to each text in the texts array
    label_idx = [state_to_idx[label] for label in labels]

    return texts, labels, states, state_to_idx, label_idx

In [48]:
ES_train = "ES/train.txt"
RU_train = "RU/train.txt"

ES_texts, ES_labels, ES_states, ES_state_mappings, ES_label_idx = process_data_HOHMM(ES_train)
RU_texts, RU_labels, RU_states, RU_state_mappings, RU_label_idx = process_data_HOHMM(RU_train)

ValueError: not enough values to unpack (expected 5, got 4)

## 4.2 Calculation of Higher Order Transition Probabilities
In addition to standard HMM initialization, consider the higher order transition probabilities. These probabilities involve multiple previous states, capturing longer-range dependencies.

Simply put, we need to find the parameter $a_{u, v, q}$ which is the probability of state $q$ appearing after $u, v$ appears.
$$a_{u, v, q} = P(q|y, v) = \frac{Count(u, v, q)}{Count(u,v)}$$

whereby $Count(u, v, q)$ is the number of times `q` appears after `u, v` appears before it

In [40]:
def find_a_HOHMM(states):
    """
    Finds a_uvq, HO transition probabilities
    INPUTS:
    - states (arr): array containing states in dataset
    OUTPUTS:
    - a_uvq (np.array): np.array containign transition probabilities
    """
    # Find number of states
    num_states = len(states)
    
    # Initiate a np array to store HO a_uvq
    transition_probs = np.zeros((num_states, num_states, num_states))
    # Finding Count(u, v, q)
    for i in range(2, len(label_idx)):
        prev_state_1 = label_idx[i - 2]
        prev_state_2 = label_idx[i - 1]
        current_state = label_idx[i]
        
        transition_probs[prev_state_1, prev_state_2, current_state] += 1
        
    # Finding HO transition params
    count_uv = np.nansum(transition_probs, axis=2, keepdims=True) 
    a_uvq = transition_probs / count_uv
    
    # Handle NaNs, convert to 0 probability
    a_uvq = np.nan_to_num(a_uvq, nan=0.0)

    return a_uvq

In [41]:
ES_a = find_a_HOHMM(ES_states)
RU_a = find_a_HOHMM(RU_states)

NameError: name 'ES_states' is not defined

## 4.3 Calculating Emission Probabilities
Follows that of normal HMM
$$b_u(o) = \frac{Count(u \to o)}{Count(u)}$$