In [2]:
import io
import numpy as np
from collections import defaultdict
from sklearn.model_selection import train_test_split
import pandas as pd
import math

In [3]:
gold_corpus = io.open("./Data/gold.txt", encoding="utf-8").readlines()
gold_corpus[:5]

['thành_phố/N washington/N có/V 1/M kiến_trúc/N rất/R đa_dạng/A\n',
 'tuy_nhiên/C vì/C gặp/V nhiều/A khó_khăn/N trong/E cuộc_sống/N ông/N dần/R trở_nên/V khó_tính/A\n',
 'khí_hậu/N hồng_kông/N thuộc/V kiểu/N cận_nhiệt_đới/N và/C chịu/V ảnh_hưởng/V của/E gió_mùa/N\n',
 'khoảng/A hơn/A 70/M bề_mặt/N trái_đất/N được/V bao_phủ/V bởi/E các/D đại_dương/N nước_mặn/N phần/N còn_lại/V là/V các/D lục_địa/N và/C các/D đảo/N\n',
 'đà_lạt/N là/V thành_phố/N trực_thuộc/V tỉnh/N lâm_đồng/N nằm/V trên/E cao_nguyên/N lâm_viên/N thuộc/V vùng/N tây_nguyên/N việt_nam/N\n']

In [4]:
def format(data):
  formated = []
  for line in data:
    sentence = []
    words_tags = line.split()
    for word_tag in words_tags:
      word, tag = word_tag.split("/")
      sentence.append((word,tag))
    formated.append(sentence)
  return formated

In [5]:
data = format(gold_corpus)

In [6]:
train_set, test_set = train_test_split(data, train_size=0.8, random_state=23, shuffle=True)

In [7]:
print(len(train_set))
print(len(test_set))

49
13


In [8]:
with open('./Vocab/vocab.txt', encoding='utf8') as f:
  vocab = f.read().splitlines() 
vocab_dict = {}
index = 0
for word in sorted(vocab): 
    if word not in vocab_dict: 
        vocab_dict[word] = index  
        index += 1

In [9]:
len(vocab_dict)

23800

In [10]:
def preprocess(data, vocab_dict):
  all_tags = []
  all_words = []
  tagged_words = []
  for sent in data:
    all_tags.append("<s>")
    for word, tag in sent:
        all_tags.append(tag)
        if word not in vocab_dict:
          word = "<unk>"
        all_words.append(word)
        tagged_words.append((word, tag))
  return all_tags, all_words, tagged_words

In [11]:
train_tags, train_words, train_tagged_words = preprocess(train_set, vocab_dict)
test_tags, test_words, test_tagged_words = preprocess(test_set, vocab_dict)

In [12]:
len(train_words)

562

In [13]:
train_tags[:5]

['<s>', 'D', 'N', 'V', 'V']

In [14]:
train_tagged_words[:5]

[('những', 'D'),
 ('website', 'N'),
 ('được', 'V'),
 ('thiết_kế', 'V'),
 ('đẹp_mắt', 'A')]

In [15]:
# transition count
def transition_count(all_tags):
  transition_count = defaultdict(int)
  prev_tag = all_tags[0]
  for i in range(1, len(all_tags)):
    tag = all_tags[i]
    transition_count[(prev_tag, tag)] += 1
    prev_tag = tag
  return transition_count
transition = transition_count(train_tags)

In [16]:
# emission count
def emission_count(tagged_words):
  emission_count = defaultdict(int)
  for tagged_word in tagged_words:
    word, tag = tagged_word
    emission_count[(tag, word)] += 1
  return emission_count
emission = emission_count(train_tagged_words)

In [17]:
# tag count
def tag_count(all_tags):
  tag_count = defaultdict(int)
  for tag in all_tags:
    tag_count[tag] += 1
  return tag_count
tags = tag_count(train_tags)

## only emission

In [18]:
states = [k for k, _ in sorted(tags.items(), key=lambda item: item[1], reverse=True)]

In [19]:
states

['N', 'V', 'E', 'A', '<s>', 'R', 'P', 'D', 'C', 'M', 'I']

In [20]:
# predict with emission count
def predict_pos(tagged_words, emission, vocab, states):
  num_correct = 0
  valid_states = states.copy()
  valid_states.remove("<s>")
  for tagged_word in tagged_words:
    word, true_tag = tagged_word
    count_final = 0 
    count = 0
    tag_final = ''

    for tag in valid_states:
      if word not in vocab_dict:
        tag_final = valid_states[0]
        break
      if (tag, word) not in emission:
        continue
      count = emission[(tag, word)]

      if count > count_final:
        count_final = count
        tag_final = tag
    if tag_final == true_tag:
      num_correct += 1
  accuracy = num_correct / len(tagged_words)
  return accuracy


In [21]:
accuracy = predict_pos(train_tagged_words, emission, vocab_dict, states)
print('Độ chính xác trên tập train:', accuracy)

Độ chính xác trên tập train: 0.9733096085409253


In [22]:
accuracy = predict_pos(test_tagged_words, emission, vocab_dict, states)
print('Độ chính xác trên tập test:', accuracy)

Độ chính xác trên tập test: 0.4624277456647399


## hidden markov

In [23]:
def transition_matrix(alpha, transition, states, tags):
  valid_states = states.copy()
  valid_states.remove("<s>")
  len_tags = len(valid_states)
  A = np.zeros((len_tags, len_tags))
  
  for i in range(len_tags):
    for j in range(len_tags):
      count = 0
      if (valid_states[i], valid_states[j]) in transition:
        count = transition[(valid_states[i], valid_states[j])]
      
      count_prev_tag = tags[valid_states[i]]
      A[i, j] = (count + alpha) / (count_prev_tag + alpha * len_tags)
  return A
  

In [24]:
alpha = 0.000001
A = transition_matrix(alpha, transition, states, tags)

In [25]:
def emission_matrix(alpha, emission, vocab_dict, states, tags):
  vocab = [v for v in vocab_dict]
  valid_states = states.copy()
  valid_states.remove("<s>")
  len_tags = len(valid_states)
  len_vocab = len(vocab)

  B = np.zeros((len_tags, len_vocab))
  
  for i in range(len_tags):
    for j in range(len_vocab):
      count = 0
      if (valid_states[i], vocab[j]) in emission:
        count = emission[(valid_states[i], vocab[j])]
      
      count_tag = tags[valid_states[i]]
      B[i,j] = (count + alpha) / (count_tag + alpha * len_vocab)
  return B

In [26]:
B = emission_matrix(alpha, emission,vocab_dict, states, tags)

In [27]:
def viterbi_initialize(states, tags, A, B, corpus, vocab_dict):
  valid_states = states.copy()
  valid_states.remove("<s>")
  len_tags = len(valid_states)

  best_probs = np.zeros((len_tags, len(corpus)))
  best_paths = np.zeros((len_tags, len(corpus)), dtype=int)

  for i in range(len_tags):
      index = vocab_dict[corpus[0]]
      best_probs[i,0] = math.log(A[0, i]) + math.log(B[i, index])
  return best_probs, best_paths

In [28]:
best_probs_train, best_paths_train = viterbi_initialize(states, train_tags, A, B, train_words, vocab_dict)
best_probs_test, best_paths_test = viterbi_initialize(states, tags, A, B, test_words, vocab_dict)

In [29]:
def viterbi_forward(A, B, corpus, best_probs, best_paths, vocabs_dict):
    num_tags = best_probs.shape[0]
    for i in range(1, len(corpus)): 
        # if i % 5000 == 0: print(f'Processed {i} words...')
        for j in range(num_tags):
            best_prob_i = float('-inf')
            best_path_i = None
            for k in range(num_tags):
                index = vocabs_dict[corpus[i]]
                prob = best_probs[k, i - 1] + math.log(A[k, j]) + math.log(B[j, index])
                # prob = best_probs[k, i - 1] + math.log(A[k, j - 1]) + math.log(B[j - 1, index])

                if prob > best_prob_i:
                    best_prob_i = prob
                    best_path_i = k
                    
            best_probs[j, i] = best_prob_i
            best_paths[j, i] = best_path_i
            
    return best_probs, best_paths

In [30]:
best_probs_train, best_paths_train = viterbi_forward(A, B, train_words, best_probs_train, best_paths_train, vocab_dict)
best_probs_test, best_paths_test = viterbi_forward(A, B, test_words, best_probs_test, best_paths_test, vocab_dict)


best_probs_train[0, 1]: -11.051450965573057
best_paths_train[0, 4]: 1


In [32]:
def viterbi_backward(best_probs, best_paths, corpus, states):
    valid_states = states.copy()
    valid_states.remove("<s>")
    m = best_paths.shape[1] 
    z = [None] * m
    pred = [None] * m
    
    best_prob_for_last_word = float('-inf')
    num_tags = best_probs.shape[0]
    
    for k in range(num_tags):
        if best_probs[k, m - 1] > best_prob_for_last_word:
            best_prob_for_last_word = best_probs[k, m - 1]
            z[m - 1] = k
            
    pred[m - 1] = valid_states[z[m - 1]]
    for i in range(m - 1, -1, -1):
        z[i - 1] = best_paths[z[i], i]
        pred[i - 1] = valid_states[z[i - 1]]
    return pred

In [43]:
train_pred = viterbi_backward(best_probs_train, best_paths_train, train_words, states)
test_pred = viterbi_backward(best_probs_test, best_paths_test, test_words, states)

['đà_lạt', 'là', 'thành_phố', 'trực_thuộc', 'tỉnh', 'lâm_đồng', 'nằm', 'trên', 'cao_nguyên', '<unk>']
['N', 'V', 'N', 'C', 'I', 'P', 'V', 'E', 'P', 'R']


In [44]:
i = 0
for tag in test_tags:
  if tag == "<s>":
    print()
    continue
  else:
    print(test_words[i], end="/")
    print(test_pred[i], end=" ")
    i+=1


đà_lạt/N là/V thành_phố/N trực_thuộc/C tỉnh/I lâm_đồng/P nằm/V trên/E cao_nguyên/P <unk>/R thuộc/V vùng/A tây_nguyên/C việt_nam/I 
song_song/P là/V 2/I cửa_sổ/P 2/V người/N ngồi/V trong/E cửa_sổ/V song_song/I 
nó/P có/V đầy_đủ/E các/D nhân_vật/N được/V phát_triển/V chuẩn/A và/C 1/M bối_cảnh/N trò_chơi/P vô_cùng/R sống_động/A 
nhưng/C đó/P là/V chuyện/I đã/V qua/I 
nếu_như/P là/V trước_đây/C chắc/I tôi/P sẽ/R sợ/R không/R muốn/A ai/C <unk>/D phần/N cơ_thể/E đó/P 
sau/N trận/C mưa_lũ/I lịch_sử/P đà_nẵng/C giá/I rau/P nhích/R lên/V chính_quyền/I cảnh_báo/P không/R lợi_dụng/A nâng/C giá/I 
gõ/V <unk>/D bằng/N 5/E <unk>/P sẽ/R giúp/V bạn/P có/V tốc_độ/N gõ/R nhanh/A hơn/A 
mỗi/D buổi/N chiều/N tôi/P thường/R dành/V thời_gian/N để/E tập/D gym/N hoặc/C đá/I bóng/P và/C tôi/P đang/R cố_gắng/V để/E duy_trì/N nó/P 
sứ_mệnh/V của/E <unk>/P là/V mang/V đến/E những/D trải_nghiệm/N thú_vị/A và/C độc_đáo/I cho/V người_dùng/N 
điều/N quan_trọng/R là/V bạn/P phải/V có/V mục_tiêu/I rõ_ràng/P và/C cụ_th

In [35]:
import warnings
warnings.filterwarnings('ignore')

In [36]:
from sklearn.metrics import classification_report
def report(pred, gold):
    y_pred = pred
    y_true = [t for t in gold if t != "<s>"]
    
    print(classification_report(y_pred, y_true))
    return y_pred, y_true

In [39]:
print('Kết quả của mô hình Hidden Markov kết hợp thuật toán Viterbi trên tập train:\n')
y_pred, y_true_train = report(train_pred, train_tags)

Kết quả của mô hình Hidden Markov kết hợp thuật toán Viterbi trên tập train:

              precision    recall  f1-score   support

           A       1.00      1.00      1.00        51
           C       0.94      0.88      0.91        17
           D       1.00      1.00      1.00        18
           E       0.98      0.95      0.96        58
           I       1.00      1.00      1.00         2
           M       1.00      1.00      1.00        12
           N       0.98      1.00      0.99       184
           P       0.97      1.00      0.99        38
           R       0.98      0.95      0.97        44
           V       0.98      0.98      0.98       138

    accuracy                           0.98       562
   macro avg       0.98      0.98      0.98       562
weighted avg       0.98      0.98      0.98       562



In [40]:
print('Kết quả của mô hình Hidden Markov kết hợp thuật toán Viterbi trên tập test:\n')
y_pred, y_true_test = report(test_pred, test_tags)

Kết quả của mô hình Hidden Markov kết hợp thuật toán Viterbi trên tập test:

              precision    recall  f1-score   support

           A       0.47      0.53      0.50        15
           C       0.78      0.39      0.52        18
           D       1.00      0.50      0.67         6
           E       0.58      0.64      0.61        11
           I       0.00      0.00      0.00        19
           M       0.40      0.67      0.50         3
           N       0.30      0.76      0.43        21
           P       0.85      0.38      0.52        29
           R       0.92      0.63      0.75        19
           V       0.49      0.72      0.58        32
           X       0.00      0.00      0.00         0

    accuracy                           0.51       173
   macro avg       0.53      0.47      0.46       173
weighted avg       0.57      0.51      0.50       173



# Custom Test

In [41]:
sentence = ["vì", "nó", "rất", "phức_tạp", "nên","tôi","đã","chú_ý"]
best_probs_sent, best_paths_sent = viterbi_initialize(states, tags, A, B, sentence, vocab_dict)
best_probs_sent, best_paths_sent = viterbi_forward(A, B, sentence, best_probs_sent, best_paths_sent, vocab_dict)
sent_pred = viterbi_backward(best_probs_sent, best_paths_sent, test_words, states)

In [42]:
for word, tag in zip(sentence,sent_pred):
  print(word + "/" + tag, end=" ")

vì/E nó/P rất/R phức_tạp/A nên/C tôi/P đã/R chú_ý/N 