In [None]:
#viterbi_function_definition
from collections import defaultdict, Counter
from math import log
import numpy as np
EPSILON = 1e-5

def viterbi(train, test):
    #1. Count tag, tag pairs, (word, tag) pairs  
    EPSILON_LOG = np.log(EPSILON)
    initial_counter = Counter() 
    tag_counter = Counter() 
    tag_tag_counter = defaultdict(Counter)
    word_tag_counter = defaultdict(Counter)

    for sentence in train:
        initial_counter[sentence[1][1]] += 1
        for i in range(len(sentence)-1):
            if sentence[i][0] not in ["START", "END"]:
                tag1 = sentence[i][1]
                tag2 = sentence[i+1][1]
                tag_counter[sentence[i][1]] += 1
                word_tag_counter[sentence[i][0]][sentence[i][1]] += 1
                tag_tag_counter[tag1][tag2] += 1
    
    #2. Find each probability and take the log of each probability

    #initial probability
    total_count_for_initial_counter = sum(initial_counter.values())
    initial_probabilities = defaultdict(lambda: defaultdict(float))
    for item, count in initial_counter.items():
        initial_probabilities[item] = np.log(count/ total_count_for_initial_counter) + EPSILON_LOG

    #P(tagB|tagA)
    transition_probabilities = defaultdict(lambda: defaultdict(float))
    for prior_tag, tag_counts in tag_tag_counter.items():
        for tag, count in tag_counts.items():
            transition_probabilities[prior_tag][tag] = np.log(count / tag_counter[prior_tag]) + EPSILON_LOG
    
    #P(word|tag)
    emission_probabilities = defaultdict(lambda: defaultdict(float))
    for word, tag_counts in word_tag_counter.items():
        for tag, count in tag_counts.items():
            emission_probabilities[word][tag] = np.log(count / tag_counter[tag]) + EPSILON_LOG

    #3. Find the maximum probability path
    tagged_sentences = []
    for sentence in test:
        tag_probability_matrix = {0: {tag: initial_probabilities[tag] for tag in tag_counter}}
        reminder = {}

        core_sentence = sentence[1:-1]

        for i in range(1, len(core_sentence)):
            tag_probability_matrix[i] = {}
            reminder[i] = {}
            valid_emissions = emission_probabilities.get(core_sentence[i],{})

            for current_tag in tag_counter:
                max_probability = float('-inf')
                best_previous_tag = None
                for previous_tag in tag_probability_matrix[i-1]:
                    probability = tag_probability_matrix[i-1][previous_tag] + transition_probabilities[previous_tag].get(current_tag, EPSILON_LOG)
                    probability += valid_emissions.get(current_tag, EPSILON_LOG)
                    if probability > max_probability:
                        max_probability = probability
                        best_previous_tag = previous_tag
                tag_probability_matrix[i][current_tag] = max_probability
                reminder[i][current_tag] = best_previous_tag
        
        max_final_probability = float('-inf')
        best_final_tag = None
        for tag, probability in tag_probability_matrix[len(core_sentence)-1].items():
            if  probability> max_final_probability:
                max_final_probability = probability
                best_final_tag = tag
        
        best_path = [best_final_tag]
        for i in range(len(core_sentence)-1, 0, -1):
            best_path.insert(0, reminder[i][best_path[0]])
        
        tagged_sentence = [("START", "START")] + list(zip(core_sentence, best_path)) + [("END", "END")]
        tagged_sentences.append(tagged_sentence)

    return tagged_sentences

In [1]:
#frameworks

import collections

START_TAG = "START"
END_TAG = "END"


def evaluate_accuracies(predicted_sentences, tag_sentences):
    """
    :param predicted_sentences:
    :param tag_sentences:
    :return: (Accuracy, correct word-tag counter, wrong word-tag counter)
    """
    assert len(predicted_sentences) == len(tag_sentences), "The number of predicted sentence {} does not match the true number {}".format(len(predicted_sentences), len(tag_sentences))

    correct_wordtagcounter = {}
    wrong_wordtagcounter = {}
    correct = 0
    wrong = 0
    for pred_sentence, tag_sentence in zip(predicted_sentences, tag_sentences):
        assert len(pred_sentence) == len(tag_sentence), "The predicted sentence length {} does not match the true length {}".format(len(pred_sentence), len(tag_sentence))
        for pred_wordtag, real_wordtag in zip(pred_sentence, tag_sentence):
            assert pred_wordtag[0] == real_wordtag[0], "The predicted sentence WORDS do not match with the original sentence, you should only be predicting the tags"
            word = pred_wordtag[0]
            if real_wordtag[1] in [START_TAG, END_TAG]:
                continue
            if pred_wordtag[1] == real_wordtag[1]:
                if word not in correct_wordtagcounter.keys():
                    correct_wordtagcounter[word] = collections.Counter()
                correct_wordtagcounter[word][real_wordtag[1]] += 1
                correct += 1
            else:
                if word not in wrong_wordtagcounter.keys():
                    wrong_wordtagcounter[word] = collections.Counter()
                wrong_wordtagcounter[word][real_wordtag[1]] += 1
                wrong += 1

    accuracy = correct / (correct + wrong)

    return accuracy, correct_wordtagcounter, wrong_wordtagcounter


def specialword_accuracies(train_sentences, predicted_sentences, tag_sentences):
    """
    :param train_sentences:
    :param predicted_sentences:
    :param tag_sentences:
    :return: Accuracy on words with multiple tags, and accuracy on words that do not occur in the training sentences
    """
    seen_words, words_with_multitags_set = get_word_tag_statistics(train_sentences)
    multitags_correct = 0
    multitags_wrong = 0
    unseen_correct = 0
    unseen_wrong = 0
    for i in range(len(predicted_sentences)):
        for j in range(len(predicted_sentences[i])):
            word = tag_sentences[i][j][0]
            tag = tag_sentences[i][j][1]

            if tag in [START_TAG, END_TAG]:
                continue

            if predicted_sentences[i][j][1] == tag:
                if word in words_with_multitags_set:
                    multitags_correct += 1
                if word not in seen_words:
                    unseen_correct += 1
            else:
                if word in words_with_multitags_set:
                    multitags_wrong += 1
                if word not in seen_words:
                    unseen_wrong += 1
    multitag_accuracy = multitags_correct / (multitags_correct + multitags_wrong)
    total_unseen = unseen_correct + unseen_wrong
    unseen_accuracy = unseen_correct / total_unseen if total_unseen > 0 else 0

    return multitag_accuracy, unseen_accuracy


def topk_wordtagcounter(wordtagcounter, k):
    top_items = sorted(wordtagcounter.items(), key=lambda item: sum(item[1].values()), reverse=True)[:k]
    top_items = list(map(lambda item: (item[0], dict(item[1])), top_items))
    return top_items


def load_dataset(data_file):
    if not data_file.endswith(".txt"):
        raise ValueError("File must be a .txt file")

    sentences = []
    with open(data_file, 'r', encoding='UTF-8') as f:
        for line in f:
            sentence = [(START_TAG, START_TAG)]
            raw = line.split()
            for pair in raw:
                splitted = pair.split('=')
                if (len(splitted) < 2):
                    continue
                else:
                    tag = splitted[-1]

                    # find word
                    word = splitted[0]
                    for element in splitted[1:-1]:
                        word += '/' + element
                    sentence.append((word.lower(), tag))
            sentence.append((END_TAG, END_TAG))
            if len(sentence) > 2:
                sentences.append(sentence)
            else:
                print(sentence)
    return sentences


def strip_tags(sentences):
    '''
    Strip tags
    input:  list of sentences
            each sentence is a list of (word,tag) pairs
    output: list of sentences
            each sentence is a list of words (no tags)
    '''

    sentences_without_tags = []

    for sentence in sentences:
        sentence_without_tags = []
        for i in range(len(sentence)):
            pair = sentence[i]
            sentence_without_tags.append(pair[0])
        sentences_without_tags.append(sentence_without_tags)

    return sentences_without_tags


def get_word_tag_statistics(data_set):
    # get set of all seen words and set of words with multitags
    word_tags = collections.defaultdict(lambda: set())
    word_set = set()
    for sentence in data_set:
        for word, tag in sentence:
            word_tags[word].add(tag)
            word_set.add(word)
    return word_set, set(map(lambda elem: elem[0], filter(lambda elem: len(elem[1]) > 1, word_tags.items())))

   

In [None]:
from collections import defaultdict, Counter
from math import log
import numpy as np

EPSILON = 1e-5
EPSILON_LOG = np.log(EPSILON)

def viterbi(train, test):

    #1. Count tag, tag pairs, (word, tag) pairs  
    initial_counter = Counter() 
    tag_counter = Counter() 
    tag_tag_counter = defaultdict(Counter)
    word_tag_counter = defaultdict(Counter)

    for sentence in train:
        initial_counter[sentence[1][1]] += 1
        for i in range(len(sentence)-1):
            if sentence[i][0] not in ["START", "END"]:
                tag1 = sentence[i][1]
                tag2 = sentence[i+1][1]
                tag_counter[sentence[i][1]] += 1
                word_tag_counter[sentence[i][0]][sentence[i][1]] += 1
                tag_tag_counter[tag1][tag2] += 1
    
    #2. Find each probability and take the log of each probability

    #initial probability
    total_count_for_initial = sum(initial_counter.values())
    initial_probabilities = {tag: np.log(count / total_count_for_initial) \
                             for tag, count in initial_counter.items()}

    # Transition counter - P(tagB|tagA)
    transition_probabilities = {
        prior_tag: {tag: np.log(count / tag_counter[prior_tag]) \
                    for tag, count in tag_counts.items()}
        for prior_tag, tag_counts in tag_tag_counter.items()
    }

    # Emission probabilities - P(word|tag)
    emission_probabilities = {
        word: {tag: np.log(count / tag_counter[tag]) \
               for tag, count in tag_counts.items()}
        for word, tag_counts in word_tag_counter.items() 
    }

    #3. Find the maximum probability path
    tagged_sentences = []
    for sentence in test:
        core_sentence = sentence[1:-1]  
        n = len(core_sentence)
        tag_probability_matrix = [{} for _ in range(n)]
        reminder = [{} for _ in range(n)]
        valid_emissions = emission_probabilities.get(core_sentence[0], {})

        for tag in tag_counter:
            tag_probability_matrix[0][tag] = \
                initial_probabilities.get(tag, EPSILON_LOG) + valid_emissions.get(tag, EPSILON_LOG)

        for i in range(1, n):
            valid_emissions = emission_probabilities.get(core_sentence[i], {})
            if not valid_emissions:
                # 무조건 특정 태그로 설정
                best_path.append("WRONG_TAG")
                continue

            previous_tags = tag_probability_matrix[i-1].keys()
            for current_tag, emission_probability in valid_emissions.items():
                max_probability = float('-inf')
                best_previous_tag = None

                for previous_tag in previous_tags:
                    transition_probability = transition_probabilities.get(previous_tag, {}).get(current_tag, EPSILON_LOG)
                    probability = tag_probability_matrix[i-1][previous_tag] + transition_probability + emission_probability

                    if probability > max_probability:
                        max_probability = probability
                        best_previous_tag = previous_tag

                tag_probability_matrix[i][current_tag] = max_probability
                reminder[i][current_tag] = best_previous_tag

        max_final_probability = float('-inf')
        best_final_tag = None

        for tag, probability in tag_probability_matrix[-1].items():
            if probability > max_final_probability:
                max_final_probability = probability
                best_final_tag = tag

        best_path = [best_final_tag]

        for i in range(n-1, 0, -1):
            best_path.insert(0, reminder[i][best_path[0]])

        tagged_sentence = [("START", "START")] + list(zip(core_sentence, best_path)) + [("END", "END")]
        tagged_sentences.append(tagged_sentence)

    return tagged_sentences

In [14]:
#viterbi_execution
if __name__ == "__main__":

    import time

    train_set = load_dataset('data/brown-training.txt')
    dev_set = load_dataset('data/brown-test.txt')

    model = viterbi
    start_time = time.time()
    predicted = model(train_set, strip_tags(dev_set))
    time_spend = time.time() - start_time
    accuracy, _, _ = evaluate_accuracies(predicted, dev_set)
    multi_tag_accuracy, unseen_words_accuracy, = specialword_accuracies(train_set, predicted, dev_set)

    print(f"======== {model.__name__.capitalize()} =========")
    print("time spent: {0:.4f} sec".format(time_spend))
    print("accuracy: {0:.4f}".format(accuracy))
    print("multi-tag accuracy: {0:.4f}".format(multi_tag_accuracy))
    print("unseen word accuracy: {0:.4f}".format(unseen_words_accuracy))

     

time spent: 2.5922 sec
accuracy: 0.9310
multi-tag accuracy: 0.9402
unseen word accuracy: 0.0893
