## Setting up the data

In [89]:
# Import libraries
import os
import numpy as np
from collections import Counter

# Set OS-independent paths, relative to current directory
en_train_path = os.path.join("dataset", "EN", "train")
en_dev_in_path = os.path.join("dataset", "EN", "dev.in")
en_dev_out_path = os.path.join("dataset", "EN", "dev.out")
en_dev_p1_out_path = os.path.join("dataset", "EN", "dev.p1.out")
en_dev_p2_out_path = os.path.join("dataset", "EN", "dev.p2.out")
en_dev_p3_out_path = os.path.join("dataset", "EN", "dev.p3.out")
en_dev_p4_out_path = os.path.join("dataset", "EN", "dev.p4.out")

en_test_in_path = os.path.join("Test", "EN", "test.in")
en_test_out_path = os.path.join("Test", "EN", "test.p4.out")

fr_train_path = os.path.join("dataset", "FR", "train")
fr_dev_in_path = os.path.join("dataset", "FR", "dev.in")
fr_dev_out_path = os.path.join("dataset", "FR", "dev.out")
fr_dev_p1_out_path = os.path.join("dataset", "FR", "dev.p1.out")
fr_dev_p2_out_path = os.path.join("dataset", "FR", "dev.p2.out")
fr_dev_p3_out_path = os.path.join("dataset", "FR", "dev.p3.out")
fr_dev_p4_out_path = os.path.join("dataset", "FR", "dev.p4.out")

fr_test_in_path = os.path.join("Test", "FR", "test.in")
fr_test_out_path = os.path.join("Test", "FR", "test.p4.out")

# Define constant variables
N = 7
labels = {"START": 0,
          "O": 1,
          "B-positive": 2,
          "I-positive": 3,
          "B-neutral": 4,
          "I-neutral": 5,
          "B-negative": 6,
          "I-negative": 7,
          "END": 8}
labels_list = ["START", "O", "B-positive", "I-positive", "B-neutral", "I-neutral", "B-negative", "I-negative", "END"]



In [None]:
# Create labels dictionary and list (for EN)
def create_labels_array_dict(filepath):

    output_labels_dict = {"START": 0}
    
    output_labels_array = ["START"]

    counter = 1

    with open(filepath, "r", encoding="utf-8") as file:
        lines = file.readlines()
        for line in lines:
            if len(line.strip().rsplit(" ", 1)) == 2: 
                _, label = line.strip().rsplit(" ", 1)
                
                # Reference to check if key exist in dictionary: https://www.geeksforgeeks.org/python-check-whether-given-key-already-exists-in-a-dictionary/
                if label not in output_labels_dict.keys():
                    output_labels_dict[label] = counter
                    output_labels_array.append(label)
                    counter += 1
            else:
                continue

    output_labels_dict["END"] = counter
    output_labels_array.append("END")

    # counter - 1 to replace global variable N. 
    return output_labels_array, output_labels_dict, counter - 1


print(create_labels_array_dict(en_train_path))
print(create_labels_array_dict(fr_train_path))


In [None]:
# Read training data
print(labels)
def read_training_data(filepath):
    ''' NOTE: This returns results in tuple form, but without start and end. Not sure if anyone needs this function to be in this form, 
                so I wrote another function called generate_data_tuple_list_with_start_end() which is the same as read_training_data but with
                start and end. - Jonah'''

    results = []

    with open(filepath, "r", encoding="utf-8") as file:
        lines = file.readlines()
        for line in lines:
            if len(line.strip().rsplit(" ", 1)) == 2: # Make sure the line has two elements: word and label
                word, label = line.strip().rsplit(" ", 1)
                results.append((word, labels[label]))
                
            else:
                continue
    return results

print(read_training_data(fr_train_path))

# Read dev.in data
# There are no labels, just list of words
def read_dev_in_data(filepath):
    results = []
    with open(filepath, "r", encoding="utf-8") as file:
        lines = file.readlines()
        for line in lines:
            results.append(line.strip())
    return results
# print(read_dev_in_data(fr_dev_in_path))


# Generate data in tuple form but with (" ", START_index) and (" ", END_index)
def generate_data_tuple_list_with_start_end(filepath):
    results = []

    with open(filepath, "r", encoding="utf-8") as file:
        lines = file.readlines()
        for line in lines:
            if len(line.strip().rsplit(" ", 1)) == 2: # Like in read_training_data, to make sure that this line has two elements: word and label
                word, label = line.strip().rsplit(" ", 1)
                results.append((word, labels[label]))
                
            else:
                # Enters here if it is the end of the sequence.
                results.append((" ", labels["END"]))
                results.append((" ", labels["START"]))

                
    return results

print(generate_data_tuple_list_with_start_end(fr_train_path))

In [None]:
# Calculate number of each labels, with the keys being the index of the label in labels_list
def calculate_number_of_labels(input_data):
    return Counter(elem[1] for elem in input_data)
print(calculate_number_of_labels(read_training_data(fr_train_path)))

# Print out all the words that are unique
def get_all_unique_words(input_data):
    print(len(set(item[0] for item in input_data)))
    return list(set(item[0] for item in input_data))
print(get_all_unique_words(read_training_data(fr_train_path)))




##################################
###### Part 1 Point 1 and 2 ######

# For the return value, we follow the matrix format defined in the slides accordingly
def calculate_emission_parameters(input_data, all_unique_tokens, k=1.0):
    
    ''' NOTE: input_data is a list of tuples. '''

    # Initialisation for emission_counts
    # Final index is for #UNK# tokens
    emission_counts = np.zeros((N, len(all_unique_tokens) + 1), dtype=np.longdouble)

    # Calculate number of each labels and store in a list
    label_counts = np.array(list(val[1] for val in sorted(calculate_number_of_labels(input_data).items())))
    print(label_counts)

    for token, labels_list_index in input_data:
        emission_counts[labels_list_index - 1][all_unique_tokens.index(token)] += 1

    # This is for the other case of #UNK# tokens
    emission_counts[:, -1] = np.full((1, N), k)[0]


    # Initialisation for emission_parameters
    emission_parameters = np.empty((N, len(all_unique_tokens) + 1), dtype=np.longdouble)

    for index, _ in enumerate(emission_counts):
        emission_parameters[index] = emission_counts[index] / (label_counts[index] + k)
    
    return emission_parameters
 


###### Part 1 Point 1 and 2 ######
##################################

In [93]:
# Get tag from word
def get_label_from_word(input_word, all_unique_tokens, emission_parameters):
    if input_word not in all_unique_tokens:
        column_to_consider = emission_parameters[:, -1]
    else:
        column_to_consider = emission_parameters[:, all_unique_tokens.index(input_word)]

    # Randomly choose the index if there is more than one argmax value
    x = np.random.choice(np.argwhere(np.isclose(column_to_consider, column_to_consider.max())).flatten()) + 1
    return labels_list[x]
  
# print(get_label_from_word("nourriture", get_all_unique_words(read_training_data(fr_train_path)), calculate_emission_parameters(read_training_data(fr_train_path), get_all_unique_words(read_training_data(fr_train_path)))))

In [None]:
def write_prediction_output_to_file(language):
    if language == "EN":
        # Conduct training/supervised learning (M-Step)
        train_data = read_training_data(en_train_path)
        all_unique_tokens = get_all_unique_words(train_data)
        emission_parameters = calculate_emission_parameters(train_data, all_unique_tokens)

        # Execute testing/decoding (E-Step)
        predicted_results = []
        test_data = read_dev_in_data(en_dev_in_path)
        for token in test_data:
            if token:
                predicted_results.append(token + " " + get_label_from_word(token, all_unique_tokens, emission_parameters))
            else:
                predicted_results.append("")
        with open(en_dev_p1_out_path, "w+", encoding="utf-8") as file:
            for line in predicted_results:
                file.write(line + "\n")

    elif language == "FR":
        # Conduct training/supervised learning (M-Step)
        train_data = read_training_data(fr_train_path)
        all_unique_tokens = get_all_unique_words(train_data)
        emission_parameters = calculate_emission_parameters(train_data, all_unique_tokens)

        # Execute testing/decoding (E-Step)
        predicted_results = []
        test_data = read_dev_in_data(fr_dev_in_path)
        for token in test_data:
            if token:
                predicted_results.append(token + " " + get_label_from_word(token, all_unique_tokens, emission_parameters))
            else:
                predicted_results.append("")
        with open(fr_dev_p1_out_path, "w+", encoding="utf-8") as file:
            for line in predicted_results:
                file.write(line + "\n")
                
write_prediction_output_to_file("FR")

# Temporary store the FR labels, labels_list and N 
FR_labels_list, FR_labels, FR_N = labels_list, labels, N


labels_list, labels, N = create_labels_array_dict(en_train_path)
#print(labels_list, labels)
write_prediction_output_to_file("EN")



In [None]:
#############################
####### Part 1 Point 3 ######

def read_dev_out_data(filepath):
    results = []
    with open(filepath, "r", encoding="utf-8") as file:
        lines = file.readlines()
        for line in lines:
            results.append(line.strip())
    return results



####### Part 1 Point 3 ######
#############################









''' NOTE: Use the powershell command in the cell below this cell instead. '''

def count_number_of_entities(input_tuple_data, labels_list):
    # This is the function for counting entities, that is, when transit from START to B OR I and from O to B OR I and from I to B
    # Also need to consider whether there are cases of, for example, going from I-positive to I-neutral, or B-positive to B-neutral, etc.
    # By counting the number of transition from O to an entity, we should get the number of entities.

    total_count_of_entities = 0; 
    

    for i in range(0, len(input_tuple_data) - 1):
        # (len(input_tuple_data) - 1) is due to the accessing of index (i + 1) below. 
        # So subtract by one in the for loop range to prevent iteration from going out of the index range of input_tuple_data. 

        # To store the labels in one transition first 
        prev_label_index = input_tuple_data[i][1]
        next_label_index = input_tuple_data[i + 1][1]

        # Reference to check if a string contains a substring: https://stackoverflow.com/questions/3437059/does-python-have-a-string-contains-substring-method
        if((labels_list[prev_label_index] == "START" or labels_list[prev_label_index] == "O") 
           and ("B-" in labels_list[next_label_index] or "I-" in labels_list[next_label_index])):
            # Considers cases where labels transit from START or O to any B- OR I- labels. 
            total_count_of_entities += 1

        elif("B-" in labels_list[prev_label_index] or "I-" in labels_list[prev_label_index]):
            if("B-" in labels_list[next_label_index]):
                # Considers cases like transition from B-positive to B-positive, or B-positive to B-neutral, etc.
                # Also considers cases like transition from I-positive to B-positive or B-neutral or B-negative.
                total_count_of_entities += 1

            elif("I-" in labels_list[next_label_index] and labels_list[prev_label_index][2:] != labels_list[next_label_index][2:]):
                # Considers cases like transition from B-positive to I-neutral and I-negative, BUT NOT B-positive to I-positive.
                # Also considers cases like transition from I-positive to I-neutral and I-negative, BUT NOT I-positve to I-positive.

                # "labels_list[prev_label_index][2:] != labels_list[next_label_index][2:]" part of the if conditions is 
                #   to check that both labels are not both positive and positive, or neutral and neutral, or negative and negative. 

                total_count_of_entities += 1
      

    return total_count_of_entities
    

def compare_data(dev_out_tuple_data, dev_p1_out_tuple_data):
    
    total_count_of_correct_entities = 0
    whole_entity_correct = False

    for i in range(1, len(dev_out_tuple_data) - 1):
        # Use dev_out range as anything more in dev_p1_out means that the rest in dev_p1_out are wrong?


        # To store the labels in transitions 
        prev_gold_label_index = dev_out_tuple_data[i - 1][1]
        current_gold_label_index = dev_out_tuple_data[i][1]
        next_gold_label_index = dev_out_tuple_data[i + 1][1]


        if("B-" in labels_list[current_gold_label_index]):
            # Below if and elif are all checking if the end of the entity has been reached. As long as current label is B-xxxx , it is the start of the entity.

            if dev_out_tuple_data[i] == dev_p1_out_tuple_data[i]:
               # Boolean turns false when one part of gold entity does not match predicted
               whole_entity_correct = True


            if("B-" in labels_list[next_gold_label_index]):
                # Considers cases like transition from B-positive to B-positive, or B-positive to B-neutral, etc.
                

                if(whole_entity_correct):
                    # Have reached the end of one entity before proceeding to a subsequent entity.
                    # Only considering the previous entity (whose end we have reached) without considering the subsequent entity yet.
                    # If the previous gold entity had no part that is different from the predicted data, then add to the count of correctly predicted entities.
                    
                    total_count_of_correct_entities += 1

                
                # Reset boolean to false, which would be turned to true later if the start of the next gold entity is the same as the predicted 
                whole_entity_correct = False

                

            elif("I-" in labels_list[next_gold_label_index] and labels_list[current_gold_label_index][2:] != labels_list[next_gold_label_index][2:]):
                # Considers cases like transition from B-positive to I-neutral and I-negative, BUT NOT B-positive to I-positive.
  
                # "labels_list[current_gold_label_index][2:] != labels_list[next_gold_label_index][2:]" part of the if conditions is 
                #   to check that both labels are not both positive and positive, or neutral and neutral, or negative and negative. 

                if(whole_entity_correct):
                    # Have reached the end of one entity before proceeding to a subsequent entity.
                    # Only considering the previous entity (whose end we have reached) without considering the subsequent entity yet.
                    # If the previous gold entity had no part that is different from the predicted data, then add to the count of correctly predicted entities.
                    
                    total_count_of_correct_entities += 1

                
                # Reset boolean to false, which would be turned to true later if the start of the next gold entity is the same as the predicted 
                whole_entity_correct = False



            elif(labels_list[next_gold_label_index] == "O" or labels_list[next_gold_label_index] == "END"):
                if(whole_entity_correct):
                    # Have reached the end of one entity before continuing or ending current sequence.
                    # If the previous gold entity had no part that is different from the predicted data, then add to the count of correctly predicted entities.
                    # Set boolean to false, which would be turned to true later if the start of the next gold entity is the same as the predicted 

                    total_count_of_correct_entities += 1

                
                # Reset boolean to false, which would be turned to true later if the start of the next gold entity is the same as the predicted 
                whole_entity_correct = False


        elif("I-" in labels_list[current_gold_label_index]):
            
            # Checking if it is still in the same entity
            if(("I-" in labels_list[prev_gold_label_index] or "B-" in labels_list[prev_gold_label_index]) 
               and labels_list[prev_gold_label_index][2:] == labels_list[current_gold_label_index][2:]):
                # Considers cases of transition from B-positive to I-positive, B-neutral to I-neutral and B-negative to I-negative.
                # Also considers cases of transition from I-positive to I-positive, I-neutral to  I-neutral, and I-negative to I-negative.

                # "labels_list[prev_gold_label_index][2:] == labels_list[current_gold_label_index][2:]" part of the if conditions is 
                #   to check that both labels are both positive and positive, or neutral and neutral, or negative and negative. 

                # In the middle of an entity.
                # If this part of the gold entity is not the same as the predicted, the whole predicted entity is wrongly predicted.
                whole_entity_correct = dev_out_tuple_data[i] == dev_p1_out_tuple_data[i]



            # Checking if the start of entity has been reached and the start label of the entity is I-xxxxx .
            elif(("I-" in labels_list[prev_gold_label_index] or "B-" in labels_list[prev_gold_label_index]) and labels_list[prev_gold_label_index][2:] != labels_list[current_gold_label_index][2:]):
                # Considers cases like transition from B-positive to I-neutral and I-negative, BUT NOT B-positive to I-positive.
                # Also considers cases like transition from I-positive to I-neutral and I-negative, BUT NOT I-positve to I-positive.
                # if dev_out_tuple_data[i] == dev_p1_out_tuple_data[i], whole_entity_correct becomes True
                whole_entity_correct = dev_out_tuple_data[i] == dev_p1_out_tuple_data[i]

            elif labels_list[prev_gold_label_index] == "O":
                whole_entity_correct = dev_out_tuple_data[i] == dev_p1_out_tuple_data[i]
                
            
            # Checking if end of entity. Use if and not elif as this block must still be checked even if the above if and elif were true.
            if("I-" in labels_list[next_gold_label_index] and labels_list[current_gold_label_index][2:] != labels_list[next_gold_label_index][2:]):
                # Considers cases like transition from I-positive to I-neutral and I-negative, BUT NOT I-positve to I-positive.

                # "labels_list[current_gold_label_index][2:] != labels_list[next_gold_label_index][2:]" part of the if conditions is 
                #   to check that both labels are not both positive and positive, or neutral and neutral, or negative and negative. 

                if whole_entity_correct:
                    # Have reached the end of one entity before proceeding to a subsequent entity.
                    # Only considering the previous entity (whose end we have reached) without considering the subsequent entity yet.
                    # If the previous gold entity had no part that is different from the predicted data, then add to the count of correctly predicted entities.
                    
                    total_count_of_correct_entities += 1

                # Reset boolean to false, which would be turned to true later if the start of the next gold entity is the same as the predicted 
                whole_entity_correct = False


            elif("B-" in labels_list[next_gold_label_index] or labels_list[next_gold_label_index] == "O" or labels_list[next_gold_label_index] == "END"):
                if whole_entity_correct:
                    # Have reached the end of one entity before continuing or ending current sequence.
                    # If the previous gold entity had no part that is different from the predicted data, then add to the count of correctly predicted entities.
                    # Set boolean to false, which would be turned to true later if the start of the next gold entity is the same as the predicted 

                    total_count_of_correct_entities += 1

                
                # Reset boolean to false, which would be turned to true later if the start of the next gold entity is the same as the predicted 
                whole_entity_correct = False

                
    return total_count_of_correct_entities

def precision_or_recall_calculation(correct_count, total_count):
    return correct_count / total_count

def f_score_calculation(precision, recall):
    return 2 / ( (1/precision) + (1/recall) )







#######################
####### FR Data #######

labels_list, labels, N = FR_labels_list, FR_labels, FR_N
fr_dev_out_storage = generate_data_tuple_list_with_start_end(fr_dev_out_path)
fr_dev_p1_out_storage = generate_data_tuple_list_with_start_end(fr_dev_p1_out_path)




'''COUNTING PREDICTED'''
total_count_of_predicted_entities = count_number_of_entities(fr_dev_p1_out_storage, labels_list)

'''COUNTING GOLD'''
total_count_of_gold_entities = count_number_of_entities(fr_dev_out_storage, labels_list)

correct_count = compare_data(fr_dev_out_storage, fr_dev_p1_out_storage)
precision = precision_or_recall_calculation(correct_count, total_count_of_predicted_entities)
recall = precision_or_recall_calculation(correct_count, total_count_of_gold_entities)
f_score = f_score_calculation(precision, recall)


print("Total count of FR predicted entities: ", total_count_of_predicted_entities)
print("Total count of FR gold entities: ", total_count_of_gold_entities)
print("Total number of correctly predicted entities: ", correct_count)
print()
print("FR Values:")
print("Precision: ", precision)
print("Recall: ", recall)
print("FR F_Score: ", f_score)



print("\n\n\n\n")



#######################
####### EN Data #######

labels_list, labels, N = create_labels_array_dict(en_train_path)
en_dev_out_storage = generate_data_tuple_list_with_start_end(en_dev_out_path)
en_dev_p1_out_storage = generate_data_tuple_list_with_start_end(en_dev_p1_out_path)

'''COUNTING PREDICTED'''
total_count_of_predicted_entities = count_number_of_entities(en_dev_p1_out_storage, labels_list)

'''COUNTING GOLD'''
total_count_of_gold_entities = count_number_of_entities(en_dev_out_storage, labels_list)

correct_count = compare_data(fr_dev_out_storage, fr_dev_p1_out_storage)
precision = precision_or_recall_calculation(correct_count, total_count_of_predicted_entities)
recall = precision_or_recall_calculation(correct_count, total_count_of_gold_entities)
f_score = f_score_calculation(precision, recall)

print("Total count of EN predicted entities: ", total_count_of_predicted_entities)
print("Total count of EN gold entities: ", total_count_of_gold_entities)
print("Total number of correctly predicted entities: ", correct_count)
print()
print("EN Values:")
print("Precision: ", precision)
print("Recall: ", recall)
print("EN F_Score: ", f_score)




''' NOTE: Use the powershell command in the cell below this cell instead. '''

In [96]:
!python .\dataset\FR\evalResult.py .\dataset\FR\dev.out .\dataset\FR\dev.p1.out

!python .\dataset\EN\evalResult.py .\dataset\EN\dev.out .\dataset\EN\dev.p1.out


#Entity in gold data: 238
#Entity in prediction: 1114

#Correct Entity : 186
Entity  precision: 0.1670
Entity  recall: 0.7815
Entity  F: 0.2751

#Correct Sentiment : 79
Sentiment  precision: 0.0709
Sentiment  recall: 0.3319
Sentiment  F: 0.1169

#Entity in gold data: 802
#Entity in prediction: 1081

#Correct Entity : 587
Entity  precision: 0.5430
Entity  recall: 0.7319
Entity  F: 0.6235

#Correct Sentiment : 448
Sentiment  precision: 0.4144
Sentiment  recall: 0.5586
Sentiment  F: 0.4758


In [None]:
#############################
####### Part 2 Point 1 ######

def count_transition_from_u_to_v(input_tuple_data, index_of_u_label, index_of_v_label):

    # This is the function for Count(y_(i-1), y_i), that is, Count(u, v).

    total_count_of_such_transition = 0; 

    for i in range(0, len(input_tuple_data) - 1):
        # (len(input_tuple_data) - 1) is due to the accessing of index (i + 1) below. 
        # So subtract by one in the for loop range to prevent iteration from going out of the index range of input_tuple_data. 

        # To store the labels in one transition first 
        prev_label_index = input_tuple_data[i][1]
        next_label_index = input_tuple_data[i + 1][1]

        if(prev_label_index == index_of_u_label and next_label_index == index_of_v_label):
            total_count_of_such_transition += 1

    return total_count_of_such_transition



def maximum_likelihood_estimation(transition_from_u_to_v_count, total_count_of_u):
    # This is the MLE function, which is q(y_i | y_(i-1)), that is, q(v | u).

    return transition_from_u_to_v_count / total_count_of_u



''' NOTE: Using generate_data_tuple_list_with_start_end() to generate tuple data with start and end included.'''
#######################
####### FR Data #######
print("FR")
print()

labels_list, labels, N = FR_labels_list, FR_labels, FR_N
input_tuple_data = generate_data_tuple_list_with_start_end(fr_train_path) # Note the function used.
labels_count = calculate_number_of_labels(input_tuple_data)



for u in range(0, len(labels_list) - 1):
    # From START to y_n. This is the index for u.

    for v in range(1, len(labels_list)):
        # From y_1 to END. This is the index for v.

        val_of_count_transition_from_u_to_v = count_transition_from_u_to_v(input_tuple_data, u, v)

        print("Count(" + labels_list[u] + ", " + labels_list[v] + "): ", val_of_count_transition_from_u_to_v)
        print("Maximum likelihood estimation is:", maximum_likelihood_estimation(val_of_count_transition_from_u_to_v, labels_count[u]))
        print()
       

#######################
####### EN Data #######
print()
print()
print("EN")
print()

labels_list, labels, N = create_labels_array_dict(en_train_path)
input_tuple_data = generate_data_tuple_list_with_start_end(en_train_path) # Note the function used.
labels_count = calculate_number_of_labels(input_tuple_data)


for u in range(0, len(labels_list) - 1):
    # From START to y_n. This is the index for u.

    for v in range(1, len(labels_list)):
        # From y_1 to END. This is the index for v.

        val_of_count_transition_from_u_to_v = count_transition_from_u_to_v(input_tuple_data, u, v)

        print("Count(" + labels_list[u] + ", " + labels_list[v] + "): ", val_of_count_transition_from_u_to_v)
        print("Maximum likelihood estimation is:", maximum_likelihood_estimation(val_of_count_transition_from_u_to_v, labels_count[u]))
        print()





In [None]:
#############################
####### Part 2 Point 2 ######

def get_emission_from_words(word, unique_words, emission_params):
    if word not in unique_words:
        b = emission_params[:, -1]
    else:
        b = emission_params[:, unique_words.index(word)]
    return np.reshape(np.pad(b, 1), (-1, 1))


def get_parent_state(probs):
    parent_indices = []
    for i in range(probs.shape[1]):
        column = probs[1:-1, i]
        max_value = column.max(axis=0)
        max_value_indices = np.argwhere(column == max_value).flatten()
        chosen_index = np.random.choice(max_value_indices) + 1
        parent_indices.append(chosen_index)
    return parent_indices


def viterbi(transition, emission, tokens, labels, data):
    # Initial step
    log_transition = np.log(transition)
    pi = [np.log(np.zeros([log_transition.shape[0], 1]))]
    pi[0][0] = 0
    parents = []
    time_step = 0
    results = []
    
    for word in data:
        if not word: 
            # End of sentence, perform backtracking
            probs = pi[time_step] + log_transition[:,-1:]
            output = get_parent_state(probs)
            while time_step > 1:
                time_step -= 1
                output.insert(0, parents[time_step][output[0]])
            for i in output:
                results.append(labels[i])
            results.append('')
            # Reset variables
            pi = [np.log(np.zeros([log_transition.shape[0], 1]))]
            pi[0][0] = 0
            parents = []
            time_step = 0
        else:
            # Calculate the probability of the current observation
            log_emissions = np.log(get_emission_from_words(word, tokens, emission))
            probs = np.matmul(pi[time_step], np.transpose(np.ones(log_emissions.shape))) \
                    + np.matmul(np.ones(log_emissions.shape), np.transpose(log_emissions)) \
                    + log_transition
            pi.append(np.reshape(probs.max(axis=0), (-1, 1)))
            parents.append(get_parent_state(probs))
            time_step += 1
    
    return results

def calculate_transition_parameters(data):
    transition_counts = np.zeros([N+2, N+2])
    priori_counts = np.zeros([N+2, 1])
    prev = 0
    for pair in data:
        curr = pair[1]
        priori_counts[prev] += 1
        transition_counts[prev, curr] += 1
        prev = curr
    transition = transition_counts / priori_counts
    transition[-1, 0] = 0 
    
    return transition

labels_list, labels, N = create_labels_array_dict(fr_train_path)
input_data_start_end = generate_data_tuple_list_with_start_end(fr_train_path)
input_data = read_training_data(fr_train_path)
all_unique_words = get_all_unique_words(input_data)
transition_params = calculate_transition_parameters(input_data_start_end)
emission_params = calculate_emission_parameters(input_data, all_unique_words)
test_data = read_dev_in_data(fr_dev_in_path)

predicted_results = viterbi(transition_params, emission_params, all_unique_words, labels_list, test_data)

In [99]:
def write_viterbi_output_to_file(language):
    if language == "EN":
        labels_list, labels, N = create_labels_array_dict(en_train_path)
        input_data = read_training_data(en_train_path)
        train_data_w_start_end = generate_data_tuple_list_with_start_end(en_train_path)
        test_data = read_dev_in_data(en_dev_in_path)
        output_path = en_dev_p2_out_path

    elif language == "FR":
        labels_list, labels, N = create_labels_array_dict(fr_train_path)
        input_data = read_training_data(fr_train_path)
        train_data_w_start_end = generate_data_tuple_list_with_start_end(fr_train_path)
        test_data = read_dev_in_data(fr_dev_in_path)
        output_path = fr_dev_p2_out_path

    # Conduct training/supervised learning (M-Step)
    all_unique_words = get_all_unique_words(input_data)
    emission_parameters = calculate_emission_parameters(input_data, all_unique_words)
    transition_parameters = calculate_transition_parameters(train_data_w_start_end)
    
    
    # Execute testing/decoding with Viterbi Algorithm (E-Step)
    predicted_results = viterbi(transition_parameters, emission_parameters, all_unique_words, labels_list,  test_data)
    with open(output_path, "w+", encoding="utf-8") as file:
        for i in range(len(test_data)):
            if test_data[i] and predicted_results[i]:
                file.write("{} {}\n".format(test_data[i], predicted_results[i]))
            else:
                file.write("\n")

In [None]:
labels_list, labels, N = create_labels_array_dict(fr_train_path)

write_viterbi_output_to_file("FR")

# Temporary store the FR labels, labels_list and N 
FR_labels_list, FR_labels, FR_N = labels_list, labels, N


labels_list, labels, N = create_labels_array_dict(en_train_path)
#print(labels_list, labels)
write_viterbi_output_to_file("EN")

# Temporary store the FR labels, labels_list and N 
EN_labels_list, EN_labels, EN_N = labels_list, labels, N

In [102]:
!python .\dataset\FR\evalResult.py .\dataset\FR\dev.out .\dataset\FR\dev.p2.out

!python .\dataset\EN\evalResult.py .\dataset\EN\dev.out .\dataset\EN\dev.p2.out


#Entity in gold data: 238
#Entity in prediction: 456

#Correct Entity : 136
Entity  precision: 0.2982
Entity  recall: 0.5714
Entity  F: 0.3919

#Correct Sentiment : 76
Sentiment  precision: 0.1667
Sentiment  recall: 0.3193
Sentiment  F: 0.2190

#Entity in gold data: 802
#Entity in prediction: 859

#Correct Entity : 543
Entity  precision: 0.6321
Entity  recall: 0.6771
Entity  F: 0.6538

#Correct Sentiment : 444
Sentiment  precision: 0.5169
Sentiment  recall: 0.5536
Sentiment  F: 0.5346


----

# Part 3

In [103]:
def calculate_double_transition_parameters(data):
    transition_counts = np.zeros([N+2, N+2, N+2])
    priori_counts = np.zeros([N+2, N+2])

    prev2 = None
    prev = None
    
    for i in range(len(data)):
        # input_data[i] -> ('écoute', 1)
        sticky = data[i]
        curr = sticky[1]
        priori_counts[prev2, prev] += 1
        transition_counts[prev2, prev, curr] += 1
        
        prev2 = prev
        prev = curr

    for state1 in transition_counts:
        for state2 in state1:
            state2[0] = 0
            state2[-1] = 0

    # print(priori_counts)
    transition = transition_counts / priori_counts
    transition[np.isnan(transition)] =0
    return transition

In [None]:
labels_list, labels, N = create_labels_array_dict(fr_train_path)

input_data_start_end = generate_data_tuple_list_with_start_end(fr_train_path)
input_data = read_training_data(fr_train_path)
all_unique_words = get_all_unique_words(input_data)
transition_params = calculate_double_transition_parameters(input_data)
print(transition_params.shape)

In [None]:
def second_order_get_parent_state(probs):
    # collect indices of elements from list of probabilities that is max probabilities.
    choice_2 = [np.random.choice(np.argwhere(probs[1:-1, i,j] == probs[1:-1, i,j].max(axis=0)).flatten()) + 1 for i in range(probs.shape[1]) for j in range(probs.shape[0]) ]
    
    split_arr = np.array_split(choice_2, probs.shape[1])
    result_lst = []
    for sub_arr in split_arr:
        unique, counts = np.unique(sub_arr, return_counts=True)
        max_freq_idx = np.argmax(counts)
        most_common = unique[max_freq_idx]
        result_lst.append(most_common)

    return result_lst

def get_max_index_from_list(choice):
    # Same as second_order_get_parent_state, but for a single list.
    return [np.bincount(choice).argmax()]


def second_order_viterbi_log(transition, emission, tokens, labels, data):
    log_transition = np.log(transition)
    # initialisation
    time_step = 0
    pi = [np.log(np.zeros([log_transition.shape[0], log_transition.shape[1], N+2]))]
    pi[0][0][0] = 0
    parents = []
    results = []

    for word in data:

        if len(word)!=0: 
            # propagation
            log_emissions = np.log(get_emission_from_words(word, tokens, emission))
            k = np.arange(pi[time_step].shape[0]).reshape(pi[time_step].shape[1], 1)

            probs = np.matmul(k, np.transpose(np.ones(log_emissions.shape))) \
                    + np.matmul(np.ones(log_emissions.shape), np.transpose(log_emissions)) \
                    + log_transition
            pi.append(np.reshape(probs.max(axis=0), (log_transition.shape[0],log_transition.shape[1])))
            parents.append(second_order_get_parent_state(probs))
            time_step += 1
        else: 
            
            probs = pi[time_step] + log_transition[:,-1:,-1:]

            # backtracking 
            output = second_order_get_parent_state(probs)
            output = get_max_index_from_list(output)

            while time_step > 1: 
                time_step -= 1
                output.insert(0, parents[time_step][output[0]])


            for i in output:
                results.append(labels[i])
            # results.append(labels_list[i]) for i in output)
            results.append('')

            # Reset inits
            time_step = 0
            pi[0][0][0] = 0
            parents = []
    
    return results

labels_list, labels, N = create_labels_array_dict(fr_train_path)
input_data_start_end = generate_data_tuple_list_with_start_end(fr_train_path)
input_data = read_training_data(fr_train_path)
all_unique_words = get_all_unique_words(input_data)
transition_params = calculate_double_transition_parameters(input_data_start_end)
emission_params = calculate_emission_parameters(input_data, all_unique_words)
test_data = read_dev_in_data(fr_dev_in_path)

predicted_results = second_order_viterbi_log(transition_params, emission_params, all_unique_words, labels_list, test_data)
print(predicted_results)

In [106]:
def write_viterbi_output_to_file3(language):
    if language == "EN":
        labels_list, labels, N = create_labels_array_dict(en_train_path)
        input_data = read_training_data(en_train_path)
        train_data_w_start_end = generate_data_tuple_list_with_start_end(en_train_path)
        test_data = read_dev_in_data(en_dev_in_path)
        output_path = en_dev_p3_out_path

    elif language == "FR":
        labels_list, labels, N = create_labels_array_dict(fr_train_path)
        input_data = read_training_data(fr_train_path)
        train_data_w_start_end = generate_data_tuple_list_with_start_end(fr_train_path)
        test_data = read_dev_in_data(fr_dev_in_path)
        output_path = fr_dev_p3_out_path

    # Conduct training/supervised learning (M-Step)
    all_unique_words = get_all_unique_words(input_data)
    emission_parameters = calculate_emission_parameters(input_data, all_unique_words)
    transition_parameters = calculate_double_transition_parameters(train_data_w_start_end)

    # Execute testing/decoding with Viterbi Algorithm (E-Step)
    predicted_results = second_order_viterbi_log(transition_parameters, emission_parameters, all_unique_words, test_data)
    with open(output_path, "w+", encoding="utf-8") as file:
        for i in range(len(predicted_results)):
            if test_data[i] and predicted_results[i]:
                file.write("{} {}\n".format(test_data[i], predicted_results[i]))
            else:
                file.write("\n")

In [None]:
# Temporary store the FR labels, labels_list and N 
labels_list, labels, N = create_labels_array_dict(fr_train_path)
FR_labels_list, FR_labels, FR_N = labels_list, labels, N
write_viterbi_output_to_file3("FR")

# Temporary store the FR labels, labels_list and N 
labels_list, labels, N = create_labels_array_dict(en_train_path)
EN_labels_list, EN_labels, EN_N = labels_list, labels, N
#print(labels_list, labels)
write_viterbi_output_to_file3("EN")

In [111]:
!python .\dataset\FR\evalResult.py .\dataset\FR\dev.out .\dataset\FR\dev.p3.out

!python .\dataset\EN\evalResult.py .\dataset\EN\dev.out .\dataset\EN\dev.p3.out


#Entity in gold data: 238
#Entity in prediction: 1100

#Correct Entity : 62
Entity  precision: 0.0564
Entity  recall: 0.2605
Entity  F: 0.0927

#Correct Sentiment : 21
Sentiment  precision: 0.0191
Sentiment  recall: 0.0882
Sentiment  F: 0.0314

#Entity in gold data: 802
#Entity in prediction: 680

#Correct Entity : 281
Entity  precision: 0.4132
Entity  recall: 0.3504
Entity  F: 0.3792

#Correct Sentiment : 50
Sentiment  precision: 0.0735
Sentiment  recall: 0.0623
Sentiment  F: 0.0675


----
# Part 4

In [None]:
def count_unique_words_for_each_label(input_tuple_data, labels_list):
    dict_of_count_of_words_in_each_label = {}
    dict_of_total_count_unique_words = {}

    # Initialisation by setting labels as keys of dictionary, and their values are also dictionaries using unique words as keys.
    for i in range(1, len(labels_list) - 1):
        # Exclude START and END
        dict_of_count_of_words_in_each_label[i] = {}

    
    for tuple in input_tuple_data:

        # Using "d[key] = d.get(key, 0) + 1" to set a default value to 0 if key not found in the first place. Reference: https://stackoverflow.com/questions/1602934/check-if-a-given-key-already-exists-in-a-dictionary
        dict_of_count_of_words_in_each_label[tuple[1]][tuple[0].lower()] = dict_of_count_of_words_in_each_label[tuple[1]].get(tuple[0].lower(), 0) + 1

        dict_of_total_count_unique_words[tuple[0].lower()] = dict_of_total_count_unique_words.get(tuple[0].lower(), 0) + 1


    return dict_of_count_of_words_in_each_label, dict_of_total_count_unique_words



def probabilities_of_words_in_each_label(dict_of_count_of_words_in_each_label, dict_of_total_count_unique_words):
    dict_of_words_prob_in_each_label = {}

    # Initialisation by setting labels as keys of dictionary, and their values are also dictionaries.
    for i in range(1, len(labels_list) - 1):
        # Exclude START and END
        dict_of_words_prob_in_each_label[i] = {}

    for label_index_key in dict_of_count_of_words_in_each_label:
        for unique_word_key in dict_of_count_of_words_in_each_label[label_index_key]:
            dict_of_words_prob_in_each_label[label_index_key][unique_word_key] = dict_of_count_of_words_in_each_label[label_index_key][unique_word_key] / (dict_of_total_count_unique_words[unique_word_key]) 

    return dict_of_words_prob_in_each_label




def predict_word_sentiment(word, dict_of_words_prob_in_each_label, labels_list, unique_words_list):

    current_max_probability = 0
    current_most_probable_label_index = 1
    
    word_lowercase = word.lower()

    for label_index_key in dict_of_words_prob_in_each_label:
        # Exclude START and END labels
        if word_lowercase in dict_of_words_prob_in_each_label[label_index_key]:

            if dict_of_words_prob_in_each_label[label_index_key][word_lowercase] > current_max_probability:
                current_max_probability = dict_of_words_prob_in_each_label[label_index_key][word_lowercase]
                current_most_probable_label_index = label_index_key

        

    if current_max_probability == 0:
        missing_letter_cost = 10             # If either word has a missing letter, continue to add a small value to the "distance" between the words. 

        min_unicode_diff = 1000000
        closest_word = ""

        # Use of .maketrans() to remove accented characters. Reference: https://stackoverflow.com/questions/41004941/python-replace-french-letters-with-english
        translation_table = str.maketrans("éàèùâêîôûç", "eaeuaeiouc")
        # Use of .translate() to remove accented characters. Reference: https://stackoverflow.com/questions/41004941/python-replace-french-letters-with-english
        translated_word = word.translate(translation_table)
        translated_word = translated_word.lower()

        for unique_word in unique_words_list:
            current_unicode_diff = 0

            if len(unique_word) < len(word):
                smaller_length = len(unique_word)
            else:
                smaller_length = len(word)

            # Use of .translate() to remove accented characters. Reference: https://stackoverflow.com/questions/41004941/python-replace-french-letters-with-english
            translated_unique_word = unique_word.translate(translation_table)
            
            for i in range(0, smaller_length - 1):
                current_unicode_diff += pow((ord(translated_word[i]) - ord(translated_unique_word[i])), 2)

            current_unicode_diff += abs(len(word) - len(unique_word)) * missing_letter_cost

            if current_unicode_diff < min_unicode_diff:
                min_unicode_diff = current_unicode_diff
                closest_word = unique_word

        for label_index_key in dict_of_words_prob_in_each_label:
            if closest_word in dict_of_words_prob_in_each_label[label_index_key] and dict_of_words_prob_in_each_label[label_index_key][closest_word] > current_max_probability:
                current_max_probability = dict_of_words_prob_in_each_label[label_index_key][closest_word]
                current_most_probable_label_index = label_index_key

        
    return labels_list[current_most_probable_label_index]

def write_prediction_output_to_file_part4(language):

    if language == "EN":
        # Conduct training/supervised learning (M-Step)
        labels_list, labels, N = create_labels_array_dict(en_train_path)
        input_tuple_data = read_training_data(en_train_path)
        labels_count = calculate_number_of_labels(input_tuple_data)
        unique_words_list = get_all_unique_words(input_tuple_data)

        dict_of_count_of_words_in_each_label, dict_of_total_count_unique_words = count_unique_words_for_each_label(input_tuple_data, labels_list)
        dict_of_words_prob_in_each_label = probabilities_of_words_in_each_label(dict_of_count_of_words_in_each_label, dict_of_total_count_unique_words)
        

        # Execute testing/decoding (E-Step)
        predicted_results = []
        test_data = read_dev_in_data(en_dev_in_path)

        for token in test_data:
            if token:
                predicted_results.append(token + " " + predict_word_sentiment(token, dict_of_words_prob_in_each_label, labels_list, unique_words_list))
            else:
                predicted_results.append("")
        with open(en_dev_p4_out_path, "w+", encoding="utf-8") as file:
            for line in predicted_results:
                file.write(line + "\n")

    elif language == "FR":
        # Conduct training/supervised learning (M-Step)
        labels_list, labels, N = FR_labels_list, FR_labels, FR_N
        input_tuple_data = read_training_data(fr_train_path)
        labels_count = calculate_number_of_labels(input_tuple_data)
        unique_words_list = get_all_unique_words(input_tuple_data)


        dict_of_count_of_words_in_each_label, dict_of_total_count_unique_words = count_unique_words_for_each_label(input_tuple_data, labels_list)
        dict_of_words_prob_in_each_label = probabilities_of_words_in_each_label(dict_of_count_of_words_in_each_label, dict_of_total_count_unique_words)
        

        # Execute testing/decoding (E-Step)
        predicted_results = []
        test_data = read_dev_in_data(fr_dev_in_path)

        for token in test_data:
            if token:
                predicted_results.append(token + " " + predict_word_sentiment(token, dict_of_words_prob_in_each_label, labels_list, unique_words_list))
            else:
                predicted_results.append("")
        with open(fr_dev_p4_out_path, "w+", encoding="utf-8") as file:
            for line in predicted_results:
                file.write(line + "\n")


labels_list, labels, N = FR_labels_list, FR_labels, FR_N
input_tuple_data = read_training_data(fr_train_path)
labels_count = calculate_number_of_labels(input_tuple_data)


write_prediction_output_to_file_part4("FR")

labels_list, labels, N = create_labels_array_dict(en_train_path)
input_tuple_data = generate_data_tuple_list_with_start_end(en_train_path) # Note the function used.
labels_count = calculate_number_of_labels(input_tuple_data)
# labels_list, labels, N = create_labels_array_dict(en_train_path)
# en_dev_out_storage = generate_data_tuple_list_with_start_end(en_dev_out_path)
# en_dev_p2_out_storage = generate_data_tuple_list_with_start_end(en_dev_p2_out_path)

write_prediction_output_to_file_part4("EN")


In [113]:
!python .\dataset\FR\evalResult.py .\dataset\FR\dev.out .\dataset\FR\dev.p4.out

!python .\dataset\EN\evalResult.py .\dataset\EN\dev.out .\dataset\EN\dev.p4.out


#Entity in gold data: 238
#Entity in prediction: 174

#Correct Entity : 117
Entity  precision: 0.6724
Entity  recall: 0.4916
Entity  F: 0.5680

#Correct Sentiment : 81
Sentiment  precision: 0.4655
Sentiment  recall: 0.3403
Sentiment  F: 0.3932

#Entity in gold data: 802
#Entity in prediction: 971

#Correct Entity : 620
Entity  precision: 0.6385
Entity  recall: 0.7731
Entity  F: 0.6994

#Correct Sentiment : 534
Sentiment  precision: 0.5499
Sentiment  recall: 0.6658
Sentiment  F: 0.6024


In [None]:
def write_TEST_prediction_output_to_file_part4(language):

    if language == "EN":
        # Conduct training/supervised learning (M-Step)
        labels_list, labels, N = create_labels_array_dict(en_train_path)
        input_tuple_data = read_training_data(en_train_path)
        labels_count = calculate_number_of_labels(input_tuple_data)
        unique_words_list = get_all_unique_words(input_tuple_data)

        dict_of_count_of_words_in_each_label, dict_of_total_count_unique_words = count_unique_words_for_each_label(input_tuple_data, labels_list)
        dict_of_words_prob_in_each_label = probabilities_of_words_in_each_label(dict_of_count_of_words_in_each_label, dict_of_total_count_unique_words)
        

        # Execute testing/decoding (E-Step)
        predicted_results = []
        test_data = read_dev_in_data(en_test_in_path)

        for token in test_data:
            if token:
                predicted_results.append(token + " " + predict_word_sentiment(token, dict_of_words_prob_in_each_label, labels_list, unique_words_list))
            else:
                predicted_results.append("")
        with open(en_test_out_path, "w+", encoding="utf-8") as file:
            for line in predicted_results:
                file.write(line + "\n")

    elif language == "FR":
        # Conduct training/supervised learning (M-Step)
        labels_list, labels, N = FR_labels_list, FR_labels, FR_N
        input_tuple_data = read_training_data(fr_train_path)
        labels_count = calculate_number_of_labels(input_tuple_data)
        unique_words_list = get_all_unique_words(input_tuple_data)


        dict_of_count_of_words_in_each_label, dict_of_total_count_unique_words = count_unique_words_for_each_label(input_tuple_data, labels_list)
        dict_of_words_prob_in_each_label = probabilities_of_words_in_each_label(dict_of_count_of_words_in_each_label, dict_of_total_count_unique_words)
        

        # Execute testing/decoding (E-Step)
        predicted_results = []
        test_data = read_dev_in_data(fr_test_in_path)

        for token in test_data:
            if token:
                predicted_results.append(token + " " + predict_word_sentiment(token, dict_of_words_prob_in_each_label, labels_list, unique_words_list))
            else:
                predicted_results.append("")
        with open(fr_test_out_path, "w+", encoding="utf-8") as file:
            for line in predicted_results:
                file.write(line + "\n")


labels_list, labels, N = FR_labels_list, FR_labels, FR_N
input_tuple_data = read_training_data(fr_train_path)
labels_count = calculate_number_of_labels(input_tuple_data)


write_TEST_prediction_output_to_file_part4("FR")

labels_list, labels, N = create_labels_array_dict(en_train_path)
input_tuple_data = generate_data_tuple_list_with_start_end(en_train_path) # Note the function used.
labels_count = calculate_number_of_labels(input_tuple_data)
# labels_list, labels, N = create_labels_array_dict(en_train_path)
# en_dev_out_storage = generate_data_tuple_list_with_start_end(en_dev_out_path)
# en_dev_p2_out_storage = generate_data_tuple_list_with_start_end(en_dev_p2_out_path)

write_TEST_prediction_output_to_file_part4("EN")