In [None]:
ES_TRAIN_DATA_FILE = "./ES/train"
ES_TEST_DATA_FILE = "./ES/dev.in"
ES_ACTUAL_DATA_FILE = "./ES/dev.out"
ES_PART1_PREDICTED_DATA_FILE = "./ES/dev.p1.out"
ES_PART2_PREDICTED_DATA_FILE = "./ES/dev.p2.out"
ES_PART3_PREDICTED_DATA_FILE = "./ES/dev.p3.out"
ES_PART4_PREDICTED_DATA_FILE = "./ES/dev.p4.out"

RU_TRAIN_DATA_FILE = "./RU/train"
RU_TEST_DATA_FILE = "./RU/dev.in"
RU_ACTUAL_DATA_FILE = "./RU/dev.out"
RU_PART1_PREDICTED_DATA_FILE = "./RU/dev.p1.out"
RU_PART2_PREDICTED_DATA_FILE = "./RU/dev.p2.out"
RU_PART3_PREDICTED_DATA_FILE = "./RU/dev.p3.out"
RU_PART4_PREDICTED_DATA_FILE = "./RU/dev.p4.out"

UNK_WORD = "#UNK#"
START_TAG = "START"
STOP_TAG = "STOP"

In [None]:
import itertools

def _preprocess_training_file(training_file):
    tags = []
    tags_with_start_stop = []
    words = []

    with open(training_file,"r",encoding="utf8") as f:
        document = f.read().rstrip()
        lines = document.split("\n\n")

        for line in lines:
            tags_list = []
            tags_with_start_stop_list = []
            words_list = []

            for word_tag in line.split("\n"):
                i = word_tag.split(" ")

                if len(i) > 2:
                    i = [" ".join(i[0:len(i)-1]), i[len(i)-1]]

                word, tag = i[0], i[1]
                words_list.append(word)
                tags_list.append(tag)

            tags.append(tags_list)
            tags_with_start_stop_list = [START_TAG] + tags_list + [STOP_TAG]
            tags_with_start_stop.append(tags_with_start_stop_list)
            words.append(words_list)
    
    return tags, tags_with_start_stop, words

def _preprocess_test_file(testing_file):
    test_words = []

    with open(testing_file,"r",encoding="utf8") as f:
        document = f.read().rstrip()
        lines = document.split("\n\n")

        for line in lines:
            word_list = []
            for word in line.split("\n"):
                word_list.append(word)
            test_words.append(word_list)

    return test_words

In [None]:
def get_transition_pairs(tags):
    transition_pair_count = {}

    for tag in tags:
        for tag1, tag2 in zip(tag[:-1], tag[1:]):
            transition_pair = (tag1, tag2)
            if transition_pair in transition_pair_count:
                transition_pair_count[transition_pair] += 1
            else:
                transition_pair_count[transition_pair] = 1

    return transition_pair_count

In [None]:
def get_unique(x):
    return list(set(list(itertools.chain.from_iterable(x))))

In [None]:
def count_y(tag, tags):
    tags_flattened = list(itertools.chain.from_iterable(tags))
    return tags_flattened.count(tag)

In [None]:
def get_emission_using_MLE(unique_tags, unique_words, tags, words, k=1):
    emission = {}
    unique_tags.extend([START_TAG, STOP_TAG])

    for tag in unique_tags:
        emission_word = {}
        for word in unique_words:
            emission_word[word] = 0.0
        emission_word[UNK_WORD] = 0.0
        emission[tag] = emission_word

    # fill up emission dictionary
    for tag_sentence, word_sentence in zip(tags, words):
        for tag, word in zip(tag_sentence, word_sentence):
            emission[tag][word] += 1
            
    for tag, emission_word in emission.items():
        tag_count = count_y(tag, tags) + k
        for word, count_y_x in emission_word.items():
            emission[tag][word] = count_y_x / (tag_count)

        emission[tag][UNK_WORD] = k / (tag_count)
        
    return emission

In [None]:
def get_transition_using_MLE(unique_tags, tags_with_start_stop, transition_pair_count):
    unique_tags = [START_TAG] + unique_tags + [STOP_TAG]
    transition = {}
    
    for u in unique_tags[:-1]: # omit STOP
        transition_tag = {}
        for v in unique_tags[1:]: # omit START
            transition_tag[v] = 0.0
        transition[u] = transition_tag

    # fill up transition parameters
    for u, v in transition_pair_count:
        transition[u][v] += transition_pair_count[(u, v)]
    
    # divide transition_count by count_yi, to get probability
    for u, transition_tag in transition.items():
        count_yi = count_y(u, tags_with_start_stop)
        # words in training set
        for v, transition_count in transition_tag.items():
            if count_yi == 0:
                transition[u][v] = 0.0
            else:
                transition[u][v] = transition_count / count_yi

    return transition

In [None]:
def get_unknown_words(train, test):
    test_words = get_unique(test)
    train_words = get_unique(train)
    return set(test_words) - set(train_words)


In [None]:
# Preprocess for ES Dataset
ES_tags, ES_tags_with_start_stop, ES_train_words = _preprocess_training_file(ES_TRAIN_DATA_FILE)
ES_test_words = _preprocess_test_file(ES_TEST_DATA_FILE)
ES_unique_tags = get_unique(ES_tags)
ES_unique_words = get_unique(ES_train_words)

ES_emission_parameters = get_emission_using_MLE(ES_unique_tags, ES_unique_words, ES_tags, ES_train_words)
ES_transition_pair_count = get_transition_pairs(ES_tags_with_start_stop)
ES_transition_parameters = get_transition_using_MLE(ES_unique_tags, ES_tags_with_start_stop, ES_transition_pair_count)
ES_unknown_words = get_unknown_words(ES_train_words, ES_test_words)

ES_emission_parameters

In [None]:
def get_most_probable_tag(emission):
    highest_prob = {}
    output = {}

    for tag in emission:
        for word in emission[tag]:
            prob = emission[tag][word]
            if word not in highest_prob:
                highest_prob[word] = prob
                output[word] = tag
            else:
                if prob > highest_prob[word]:
                    highest_prob[word] = prob
                    output[word] = tag
    
    return output

In [None]:
def write_to_predicted_file_part1(predicted_file, test_file, most_probable_tag):
    with open(predicted_file, "w", encoding="utf8") as f:
        for seq in test_file:
            for word in seq:
                if len(word) > 0:
                    try:
                        y = most_probable_tag[word]
                    except:
                        y = most_probable_tag[UNK_WORD]
                    f.write(f"{word} {y}\n")
                else: # leave the newlines??
                    f.write("\n")
            f.write("\n")

In [None]:
# Part 1 output
ES_most_probable_tag = get_most_probable_tag(ES_emission_parameters)
write_to_predicted_file_part1(ES_PART1_PREDICTED_DATA_FILE, ES_test_words, ES_most_probable_tag)

In [None]:
from EvalScript.evalResult import get_observed, get_predicted,compare_observed_to_predicted

def evaluateScores(actual_file, predicted_file):
    with open(predicted_file, encoding="utf8") as f:
        predicted = f.read().splitlines()

    with open(actual_file, encoding="utf8") as f:
        actual = f.read().splitlines()

    compare_observed_to_predicted(get_observed(actual), get_predicted(predicted))

evaluateScores(ES_ACTUAL_DATA_FILE, ES_PART1_PREDICTED_DATA_FILE)

In [None]:
# import math
# import sys

# word_output_list = []  # list of tuple(word, predicted_tag) for writing to file
# viterbi_values = {}  # key: (n, tag)  value: float

# def NEW_init_viterbi_values(tags_unique):
#     global viterbi_values
#     viterbi_values = {(0, START_TAG): 0.0}

#     for tag in tags_unique:
#         viterbi_values[(0, tag)] = -sys.float_info.max


# def NEW_generate_viterbi_values(n, current_tag, word_list, words_unique, tags_unique, emission_params, transmission_params):
#     global viterbi_values
#     current_max_viterbi_value = -sys.float_info.max
#     value = -sys.float_info.max

#     if n == 0:
#         return

#     # Recursive call to generate viterbi_values for (n-1, tag)
#     for tag in tags_unique:
#         if (n-1, tag) not in viterbi_values:
#             NEW_generate_viterbi_values(n-1, tag, word_list, words_unique, tags_unique, emission_params, transmission_params)

#     # Use viterbi values from n-1 to generate current viterbi value
#     if n == 1:
#         if word_list[n-1] not in words_unique:
#             viterbi_values[(n, current_tag)] = viterbi_values[(n-1, START_TAG)] + math.log(emission_params[current_tag][UNK_WORD] * transmission_params[START_TAG][current_tag])
#         else:
#             try:
#                 viterbi_values[(n, current_tag)] = viterbi_values[(n-1, START_TAG)] + math.log(emission_params[current_tag][word_list[n-1]] * transmission_params[START_TAG][current_tag])
#             except ValueError:
#                 viterbi_values[(n, current_tag)] = -sys.float_info.max
#         return
    
#     # For n >= 2, search through tags
#     for tag in tags_unique:
#         try:
#             if word_list[n-1] not in words_unique:
#                 value = viterbi_values[(n-1, tag)] + math.log(emission_params[current_tag][UNK_WORD] * transmission_params[tag][current_tag])
#             else:
#                 try:
#                     value = viterbi_values[(n-1, tag)] + math.log(emission_params[current_tag][word_list[n-1]] * transmission_params[tag][current_tag])
#                 except KeyError:
#                     continue
#         except ValueError:
#             continue

#         current_max_viterbi_value = max(current_max_viterbi_value, value)

#     viterbi_values[(n, current_tag)] = current_max_viterbi_value

# def NEW_start_viterbi(word_list, words_unique, tags_unique, emission_params, transmission_params):
#     global viterbi_values
#     max_final_viterbi_value = -sys.float_info.max
#     value = -sys.float_info.max

#     n = len(word_list)

#     # Recursive call to generate viterbi_values for (n, tag)
#     for tag in tags_unique:
#         NEW_generate_viterbi_values(n, tag, word_list, words_unique, tags_unique, emission_params, transmission_params)

#     # Use viterbi values from n to generate viterbi value for (n+1, STOP)
#     for tag in tags_unique:
#         try:
#             value = viterbi_values[(n, tag)] + math.log(transmission_params[tag][STOP_TAG])
#         except ValueError:
#             continue
#         max_final_viterbi_value = max(max_final_viterbi_value, value)

#     viterbi_values[(n+1, STOP_TAG)] = max_final_viterbi_value

#     print(viterbi_values)


# def generate_predictions_viterbi(word_list, tags_unique, emission_params, transmission_params):
#     global viterbi_values

#     n = len(word_list)

#     generated_tag_list = [''] * n

#     # Compute tag for n
#     current_best_tag = ''
#     current_best_tag_value = -sys.float_info.max

#     for tag in tags_unique:
#         try:
#             value = viterbi_values[(n, tag)] + math.log(transmission_params[tag][STOP_TAG])
#         except ValueError:
#             continue
#         if value > current_best_tag_value:
#             current_best_tag = tag
#             current_best_tag_value = value

#     # if current_best_tag == '':
#     #     current_best_tag = 'O'
#     generated_tag_list[n-1] = current_best_tag #! unable to generate best tag sometimes, stuck at ''

#     # Generate predictions starting from n-1 going down to 1
#     for i in range(n-1, 0, -1):
#         current_best_tag = ''
#         current_best_tag_value = -sys.float_info.max

#         for tag in tags_unique:
#             try:
#                 value = viterbi_values[(i, tag)] * transmission_params[tag][generated_tag_list[i]]

#             except ValueError:
#                 continue

#             if value > current_best_tag_value:
#                 current_best_tag = tag
#                 current_best_tag_value = value

#         generated_tag_list[i-1] = current_best_tag
    
#     return generated_tag_list

In [None]:
# test_word = ['Con', 'lo', 'cual', 'en', 'el', 'comedor', 'tienes', 'que', 'levantar', 'mas', 'la', 'voz', 'para', 'oirte', 'y', 'se', 'forma', 'un', 'ambiente', 'que', 'no', 'lo', 'que', 'se', 'espera', 'de', 'una', 'estrella', 'michelin', '.']
# NEW_init_viterbi_values(ES_unique_tags)
# NEW_start_viterbi(test_word, ES_unique_words, ES_unique_tags, ES_emission_parameters, ES_transition_parameters)
# print(viterbi_values)
# ES_generated_tag_list = generate_predictions_viterbi(test_word, ES_unique_tags, ES_emission_parameters, ES_transition_parameters)
# print(ES_generated_tag_list)