# SUTD 2021 50.007 Machine Learning HMM Project Part 1

In [1]:
# Setup and install dependencies
!pip3 install numpy

# Import libraries
import os
import numpy as np
from collections import Counter

# Enable floating-point underflow warning
np.seterr(under="warn")

# Set OS-independent paths, relative to current directory
es_train_path = os.path.join("data", "ES", "train")
es_dev_in_path = os.path.join("data", "ES", "dev.in")
es_dev_out_path = os.path.join("data", "ES", "dev.out")
es_dev_p1_out_path = os.path.join("data", "ES", "dev.p1.out")
ru_train_path = os.path.join("data", "RU", "train")
ru_dev_in_path = os.path.join("data", "RU", "dev.in")
ru_dev_out_path = os.path.join("data", "RU", "dev.out")
ru_dev_p1_out_path = os.path.join("data", "RU", "dev.p1.out")

# Define constant variables
N = 7
START, O, BPOS, IPOS, BNEU, INEU, BNEG, INEG, END = 0, 1, 2, 3, 4, 5, 6, 7, 8
labels = {"START": START,
          "O": O,
          "B-positive": BPOS,
          "I-positive": IPOS,
          "B-neutral": BNEU,
          "I-neutral": INEU,
          "B-negative": BNEG,
          "I-negative": INEG,
          "END": END}
labels_list = ["START", "O", "B-positive", "I-positive", "B-neutral", "I-neutral", "B-negative", "I-negative", "END"]



In [2]:
# Read dev.in data
def read_dev_in_data(filepath):
    results = []
    with open(filepath, "r", encoding="utf-8") as file:
        lines = file.readlines()
        for line in lines:
            results.append(line.strip())
    return results

# Read dev.out data
def read_dev_out_data(filepath):
    results = []
    with open(filepath, "r", encoding="utf-8") as file:
        lines = file.readlines()
        for line in lines:
            if len(line.strip().rsplit(" ", 1)) == 2:
                token, label = line.strip().rsplit(" ", 1)
                results.append((token, labels[label]))
            else:
                continue
    return results

# Read training data
def read_training_data(filepath):
    results = []
    with open(filepath, "r", encoding="utf-8") as file:
        lines = file.readlines()
        for line in lines:
            if len(line.strip().rsplit(" ", 1)) == 2:
                token, label = line.strip().rsplit(" ", 1)
                results.append((token.lower(), labels[label]))
            else:
                continue
    return results

In [3]:
def calculate_number_of_labels(input_data):
    return Counter(elem[1] for elem in input_data)

def get_all_unique_tokens(input_data):
    # Ensure that this order stays consistent between runs
    return list(set(item[0] for item in input_data))

# For the return value, we follow the matrix format defined in the slides accordingly
def calculate_emission_parameters(input_data, all_unique_tokens):
    k = 1.0
    # Final index is for #UNK# tokens
    emission_counts = np.zeros((N, len(all_unique_tokens) + 1), dtype=np.longdouble)

    label_counts = np.array(list(val[1] for val in sorted(calculate_number_of_labels(input_data).items())))

    for token, label in input_data:
        emission_counts[label - 1][all_unique_tokens.index(token)] += 1

    # This is for the other case of #UNK# tokens
    emission_counts[:, -1] = np.full((1, N), k)[0]

    emission_parameters = np.empty((N, len(all_unique_tokens) + 1), dtype=np.longdouble)

    for index, _ in enumerate(emission_counts):
        emission_parameters[index] = emission_counts[index] / (label_counts[index] + k)

    # Do some assertion checks
    for row in emission_parameters:
        assert np.absolute(1.0 - np.sum(row)) < np.finfo(np.longdouble).eps

    return emission_parameters

In [4]:
# Get tag from word
def get_label_from_token(input_word, all_unique_tokens, emission_parameters):
    if input_word not in all_unique_tokens:
        return labels_list[np.argmax(emission_parameters[:, -1]) + 1]

    else:
        return labels_list[np.argmax(emission_parameters[:, all_unique_tokens.index(input_word)]) + 1]

In [5]:
def write_prediction_output_to_file(language):
    if language == "ES":
        # Conduct training/supervised learning (M-Step)
        train_data = read_training_data(es_train_path)
        all_unique_tokens = get_all_unique_tokens(train_data)
        emission_parameters = calculate_emission_parameters(train_data, all_unique_tokens)

        # Execute testing/decoding (E-Step)
        predicted_results = []
        test_data = read_dev_in_data(es_dev_in_path)
        for token in test_data:
            if token:
                predicted_results.append(token + " " + get_label_from_token(token.lower(), all_unique_tokens, emission_parameters))
            else:
                predicted_results.append("")
        with open(es_dev_p1_out_path, "w+", encoding="utf-8") as file:
            for line in predicted_results:
                file.write(line + "\n")

    elif language == "RU":
        # Conduct training/supervised learning (M-Step)
        train_data = read_training_data(ru_train_path)
        all_unique_tokens = get_all_unique_tokens(train_data)
        emission_parameters = calculate_emission_parameters(train_data, all_unique_tokens)

        # Execute testing/decoding (E-Step)
        predicted_results = []
        test_data = read_dev_in_data(ru_dev_in_path)
        for token in test_data:
            if token:
                predicted_results.append(token + " " + get_label_from_token(token.lower(), all_unique_tokens, emission_parameters))
            else:
                predicted_results.append("")
        with open(ru_dev_p1_out_path, "w+", encoding="utf-8") as file:
            for line in predicted_results:
                file.write(line + "\n")

In [6]:
for language in ["ES", "RU"]:
    write_prediction_output_to_file(language)