In [11]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

# def flatten_list(lists):
#     return functools.reduce(operator.iconcat, lists, [])

# def counter_sort(counter):
#     return sorted(counter.items(), key=lambda item: item[1], reverse=True)

In [12]:
import glob
import functools
import itertools
import operator
from collections import Counter
import numpy as np
import json

def write_data(json_file, N1, N2, to_file):
    # load json file to dictionary
    with open(json_file, "r", encoding='utf-8') as read_file:
        data = json.load(read_file)
    # data string
    sents = data["sentence"]
    res, temp = "", 0
    for n in range(N1 - 1, N2):
        for dict in sents[n]["morp"]:
            for dt in sents[n]["NE"]:
                if dict["lemma"] in dt["text"]:
                    res += dict["lemma"] + '\t' + dt['type'] + '\n'
                    temp = 1
                    break
            if temp == 0: 
                res += dict["lemma"] + '\t' + 'O' + '\n'
            temp = 0
        res += '\n'
    # write data to file
    with open(to_file, 'w', encoding='utf-8') as f:
        res = res.rstrip() + '\n'
        f.write(res)
        
data_hmm = 'HMM_train.data'
write_data("NEtaggedCorpus_train.json", 1, 3000, data_hmm)
write_data("NEtaggedCorpus_train.json", 3001, 3555, 'HMM_test_correct.data')
        
# initializing variables for future functions
lexicon_split = [line.rstrip('\n').split('\t') for line in open(data_hmm, encoding='utf-8') if line.rstrip('\n') != '']
lexicon_all = [line.rstrip('\n').split('\t') for line in open(data_hmm, encoding='utf-8')]
lexicon = sorted({row[0] for row in lexicon_split})
tags = sorted({row[1] for row in lexicon_split})
print("Total number of unique words in lexicon: ", len(lexicon))
print("Total number of unique tags: ", len(tags))

Total number of unique words in lexicon:  12142
Total number of unique tags:  6


In [13]:
# load json file to dictionary
with open("NEtaggedCorpus_train.json", "r", encoding='utf-8') as read_file:
    data = json.load(read_file)
# data string
sents = data["sentence"]
res, temp = "", 0
for n in range(3000, len(sents)):
    for dict in sents[n]["morp"]:
        res += dict["lemma"] + '\n'
    res += '\n'
# write data to file
with open("HMM_test.words", 'w', encoding='utf-8') as f:
    res = res.rstrip() + '\n'
    f.write(res)

57639

In [14]:
# gets emission probability only dependent on the current tag
# Laplace smoothing added
def get_emission(word, tag, helper1, helper2):
    V = len(lexicon)
    return (helper1[(word, tag)] + 1) / float(helper2[tag] + V)

# gets transition probability from tag1 to tag2
def get_transition(tag1, tag2):
    trans = []
    tags_raw = [row[1] for row in lexicon_split]
    for i in range(len(tags_raw) - 1):
        seq = ' '.join(tags_raw[i:i+2])
        trans.append(seq)
    return Counter(trans)[tag1+' '+tag2] / float(Counter(tags_raw)[tag1])

# gets starting probability based on the first tag of a sentence for all sentences
def get_start_prob(tag):
    start = [lexicon_all[i+1] for i in range(len(lexicon_all) - 2) if lexicon_all[i] == [''] or i == 0]
    return Counter([row[1] for row in start])[tag] / float(len(start))

# test commands
helper1, helper2 = Counter([tuple(x) for x in lexicon_split]), Counter([row[1] for row in lexicon_split])
get_emission('년', 'DT', helper1, helper2)
get_emission('김연아', 'PS', helper1, helper2)
get_transition('O', 'PS')
for tag in tags:
    print(tag)
    print(get_start_prob(tag))

0.019402074435631484

0.0008672448298865911

0.0273031885293456

DT
0.07833333333333334
LC
0.07833333333333334
O
0.5
OG
0.16866666666666666
PS
0.16566666666666666
TI
0.009


In [15]:
# stores some of the above probabilities for faster computation later on
# this has been done already
# csv files should be in the zip file / unzipped folder already
def store_to_csv():
    # transition matrix
    temp = np.zeros([len(tags), len(tags)])
    for i in range(len(tags)):
        for j in range(len(tags)):
            temp[i, j] = get_transition(tags[i], tags[j])
    np.savetxt("stored_transition.csv", temp, delimiter=",")
    # starting probability matrix
    temp = np.zeros([1, len(tags)])
    for i in range(len(tags)):
        temp[0, i] = get_start_prob(tags[i])
    np.savetxt("stored_start_prob.csv", temp, delimiter=",")
    # emission matrix
    temp = np.zeros([len(tags), len(lexicon)])
    for t in range(len(tags)):
        for l in range(len(lexicon)):
            temp[t, l] = get_emission(lexicon[l], tags[t], helper1, helper2)
    np.savetxt("stored_emission.csv", temp, delimiter=",")
    
store_to_csv()

In [16]:
def get_stored(file1, file2, file3):
    stored_start_prob = np.genfromtxt(file1, delimiter=',')
    stored_transition = np.genfromtxt(file2, delimiter=',')
    stored_emission = np.genfromtxt(file3, delimiter=',')
    return stored_start_prob, stored_transition, stored_emission

file1, file2, file3 = 'stored_start_prob.csv', 'stored_transition.csv', 'stored_emission.csv'
stored_start_prob, stored_transition, stored_emission = get_stored(file1, file2, file3)
# adding a column of uniform probability (for unseen words) to emission matrix
# this uniform probability is calculated as per aforementioned Laplace smoothing
avg_per_tag, V = len(lexicon_split) / len(tags), len(lexicon)
stored_emission = np.append(stored_emission, [[1 / float(avg_per_tag + V)] for x in range(len(tags))], axis=1)

In [17]:
# A and B are transition and emission matrices, respectively
# initial (starting probability) and text (tokenized sentence) are lists
def viterbi_sentence(A, B, initial, text):
    def index_helper(w):
        try:
            return lexicon.index(w)
        except ValueError:
            return len(lexicon)
    M = A.shape[0]    
    N = len(text)
    textn = [index_helper(x) for x in text]
    # initialization
    best_prob = np.zeros([M, N])
    best_tags = np.zeros([M, N - 1])
    best_prob[:, 0] = np.multiply(initial, B[:, textn[0]])
    # dynamic programming for best_prob (which is our Viterbi function)
    for i in range(1, N):
        for t in range(M):
            temp = np.multiply(best_prob[:, i-1], A[:, t])
            best_prob[t, i] = np.max(temp) * B[t, textn[i]]
            best_tags[t, i-1] = np.argmax(temp)
    # recovering sequence based on max probability
    max_temp = np.zeros([1, N])
    max_temp[0, -1] = np.argmax(best_prob[:, -1])
    # backtracking
    for i in range(N - 2, -1, -1):
        max_temp[0, i] = best_tags[int(max_temp[0, i+1]), i]
    opt_tags = max_temp.astype(int)[0]
    return np.stack([text, [tags[x] for x in opt_tags]]).T.tolist()

# test sentence
text = ['한편',
 '지나',
 'ㄴ',
 '해',
 'K',
 '리그',
 '챔피언',
 '포항',
 '은',
 '중국',
 '창춘',
 '스타디움',
 '에서',
 '열리',
 'ㄴ',
 '지난해',
 '중국',
 '슈퍼',
 '리그',
 '우승팀',
 '창춘',
 '과',
 '의',
 'E',
 '조',
 '3',
 '차전',
 '원정',
 '경기',
 '에서',
 '후반',
 '40',
 '분',
 '결승골',
 '을',
 '내주',
 '고',
 '0',
 '-',
 '1',
 '로',
 '패하',
 '았',
 '다',
 '.']
viterbi_sentence(stored_transition, stored_emission, stored_start_prob, text)

[['한편', 'O'],
 ['지나', 'O'],
 ['ㄴ', 'O'],
 ['해', 'O'],
 ['K', 'OG'],
 ['리그', 'OG'],
 ['챔피언', 'OG'],
 ['포항', 'OG'],
 ['은', 'O'],
 ['중국', 'LC'],
 ['창춘', 'LC'],
 ['스타디움', 'O'],
 ['에서', 'O'],
 ['열리', 'O'],
 ['ㄴ', 'O'],
 ['지난해', 'DT'],
 ['중국', 'LC'],
 ['슈퍼', 'OG'],
 ['리그', 'OG'],
 ['우승팀', 'OG'],
 ['창춘', 'OG'],
 ['과', 'O'],
 ['의', 'O'],
 ['E', 'O'],
 ['조', 'O'],
 ['3', 'O'],
 ['차전', 'O'],
 ['원정', 'O'],
 ['경기', 'O'],
 ['에서', 'O'],
 ['후반', 'TI'],
 ['40', 'TI'],
 ['분', 'TI'],
 ['결승골', 'TI'],
 ['을', 'O'],
 ['내주', 'O'],
 ['고', 'O'],
 ['0', 'O'],
 ['-', 'O'],
 ['1', 'O'],
 ['로', 'O'],
 ['패하', 'O'],
 ['았', 'O'],
 ['다', 'O'],
 ['.', 'O']]

In [26]:
# can input our pos files using this function
def viterbi_file(A, B, initial, file_from, file_to):
    lexicon_dev = [line.rstrip('\n') for line in open("HMM_test.words", encoding='utf-8')]
    lexicon_dev = [list(y) for x, y in itertools.groupby(lexicon_dev, lambda z: z == '') if not x]
    viterbi_res = [viterbi_sentence(A, B, initial, sent) for sent in lexicon_dev]
    res = '\n'.join([''.join(['\t'.join(w) + '\n' for w in viterbi_res[sent]]) for sent in range(len(viterbi_res))])
    with open(file_to, "w", encoding='utf-8') as text_file:
        text_file.write(res)
    return viterbi_res

# saving to WSJ_24.pos file -- score function applied later
viterbi_file(stored_transition, stored_emission, stored_start_prob,
             'HMM_test.words', 'HMM_test.data')

[[['코카콜라', 'O'],
  ['가', 'O'],
  ['호남', 'OG'],
  ['식품', 'OG'],
  ['의', 'O'],
  ['음료', 'O'],
  ['사업', 'O'],
  ['자산', 'O'],
  ['을', 'O'],
  ['인수', 'O'],
  ['하', 'O'],
  ['았', 'O'],
  ['다', 'O'],
  ['.', 'O']],
 [['세계', 'O'],
  ['최대', 'O'],
  ['인터넷', 'O'],
  ['검색', 'O'],
  ['업체', 'O'],
  ['이', 'O'],
  ['ㄴ', 'O'],
  ['구글', 'O'],
  ['이', 'O'],
  ['15', 'DT'],
  ['일', 'DT'],
  ['휴대', 'DT'],
  ['전화', 'O'],
  ['제', 'O'],
  ['조사', 'O'],
  ['이', 'O'],
  ['ㄴ', 'O'],
  ['모토로라', 'O'],
  ['모빌리티', 'O'],
  ['를', 'O'],
  ['현금', 'O'],
  ['125', 'O'],
  ['억', 'O'],
  ['달러', 'O'],
  ['(', 'O'],
  ['약', 'O'],
  ['13', 'O'],
  ['조', 'O'],
  ['5', 'O'],
  ['천', 'O'],
  ['125', 'O'],
  ['억', 'O'],
  ['원', 'O'],
  [')', 'O'],
  ['에', 'O'],
  ['인수', 'O'],
  ['하', 'O'],
  ['ㄴ다고', 'O'],
  ['발표', 'O'],
  ['하', 'O'],
  ['았', 'O'],
  ['다', 'O'],
  ['.', 'O']],
 [['10', 'DT'],
  ['일', 'DT'],
  ['MS', 'O'],
  ['는', 'O'],
  ['"', 'O'],
  ['MS', 'O'],
  ['가', 'O'],
  ['스카이프', 'O'],
  ['를', 'O'],
  ['85', 'O'],
  ['억', '

In [28]:
#!/usr/bin/python
#
# scorer for NLP class Spring 2016
# ver.1.0
#
# score a key file against a response file
# both should consist of lines of the form:   token \t tag
# sentences are separated by empty lines
#
def score (keyFileName, responseFileName):
    keyFile = open(keyFileName, 'r', encoding='utf-8')
    key = keyFile.readlines()
    responseFile = open(responseFileName, 'r', encoding='utf-8')
    response = responseFile.readlines()
    if len(key) != len(response):
        print ("length mismatch between key and submitted file")
        exit()
    correct = 0
    incorrect = 0
    for i in range(len(key)):
        key[i] = key[i].rstrip('\n')
        response[i] = response[i].rstrip('\n')
        if key[i] == "":
            if response[i] == "":
                continue
            else:
                print ("sentence break expected at line " + str(i))
                exit()
        keyFields = key[i].split('\t')
        if len(keyFields) != 2:
            print ("format error in key at line " + str(i) + ":" + key[i])
            exit()
        keyToken = keyFields[0]
        keyPos = keyFields[1]
        responseFields = response[i].split('\t')
        if len(responseFields) != 2:
            print ("format error at line " + str(i))
            exit()
        responseToken = responseFields[0]
        responsePos = responseFields[1]
        if responseToken != keyToken:
            print ("token mismatch at line " + str(i))
            exit()
        if responsePos == keyPos:
            correct = correct + 1
        else:
            incorrect = incorrect + 1
    print (str(correct) + " out of " + str(correct + incorrect) + " tags correct")
    accuracy = 100.0 * correct / (correct + incorrect)
    print ("  accuracy: %f" % accuracy)
    
score ('HMM_test_correct.data','HMM_test.data')

18841 out of 21451 tags correct
  accuracy: 87.832735
