In [1]:
import numpy as np
import io
from itertools import permutations
import re
from operator import itemgetter

read training file and create 

In [2]:
def read_file_init_table(fname):
    tag_count = {}
    tag_count['<start>'] = 0
    word_tag = {}
    tag_trans = {}
    with open(fname) as f:
        content = f.readlines()
    content = [x.strip() for x in content]
    idx_line = 0
    is_first_word = 0
    
    while idx_line < len(content):
        prev_tag = '<start>'
        while not content[idx_line].startswith('</kalimat'):
            if  not content[idx_line].startswith('<kalimat'):
                content_part = content[idx_line].split('\t')
                if content_part[1] in tag_count:
                    tag_count[content_part[1]] += 1
                else:
                    tag_count[content_part[1]] = 1
                    
                current_word_tag = content_part[0]+','+content_part[1]
                if current_word_tag in word_tag:
                    word_tag[current_word_tag] += 1
                else:    
                    word_tag[current_word_tag] = 1
                    
                if is_first_word == 1:
                    current_tag_trans = '<start>,'+content_part[1]
                    is_first_word = 0
                else:
                    current_tag_trans = prev_tag+','+content_part[1]
                    
                if current_tag_trans in tag_trans:
                    tag_trans[current_tag_trans] += 1
                else:
                    tag_trans[current_tag_trans] = 1                    
                prev_tag = content_part[1]   
                
            else:
                tag_count['<start>'] += 1
                is_first_word = 1
            idx_line = idx_line + 1
        idx_line = idx_line+1 
    return tag_count, word_tag, tag_trans

In [3]:
tag_count, word_tag, tag_trans = read_file_init_table('Indonesian_Manually_Tagged_Corpus_ID.tsv')

In [4]:
def create_trans_prob_table(tag_trans, tag_count):
    trans_prob = {}
    for tag1 in tag_count.keys():
        for tag2 in tag_count.keys():
            trans_idx = tag1+','+tag2
            if trans_idx in tag_trans:
                trans_prob[trans_idx] = tag_trans[trans_idx]/tag_count[tag1]
    return trans_prob

In [5]:
trans_prob = create_trans_prob_table(tag_trans, tag_count)

In [6]:
def create_emission_prob_table(word_tag, tag_count):
    emission_prob = {}
    for word_tag_entry in word_tag.keys():
        word_tag_split = re.split(r'[,](?=[a-zA-Z])', word_tag_entry)
        current_word = word_tag_split[0]
        current_tag = word_tag_split[1]
        emission_key = current_word+','+current_tag
        emission_prob[emission_key] = word_tag[word_tag_entry]/tag_count[current_tag]    
    return emission_prob

In [7]:
emission_prob = create_emission_prob_table(word_tag, tag_count) # {'word,tag':chance}

# Baseline Method

In [8]:
def baseline_method(trans_prob, emission_prob, sentence_words):
    '''
    params format:
        trans_prob     : {'given_tag,predict_tag' : probability}
        emission_prob  : {'word,tag' : probability}
        sentence_words : ['word1', 'word2', 'word3', ...]
    '''
    tag_sequence = []
    # looping untuk setiap kata dalam sentence_words
    for word in sentence_words: 
        # possible_tag diisi dengan semua kemungkinan tag untuk kata (emission_prob)
        possible_tag = [(key, value) for key, value in emission_prob.items() if key.startswith(word+',')]
        # jika tag terdaftar maka ambil tag dengan kemungkinan tertinggi
        if (len(possible_tag) > 0):
            max_tag = max(possible_tag,key=itemgetter(1))[0].split(',')[1]
        # jika kata tidak terdaftar maka tag dijadikan NNO
        else:
            max_tag = 'NNO'
        # masukkan tag ke dalam tag_sequence
        tag_sequence.append(max_tag)
    return tag_sequence

In [9]:
sentence = 'Ia mengharapkan formalisasi dari tersebut'
baseline_method(trans_prob,emission_prob,sentence.split())

['PRP', 'VB', 'X', 'IN', 'PR']

# Viterbi

In [10]:
def populate_trans_prob(trans_prob, tag_count):
    '''
        this function will add all possbile tag trans that are not yet
        in tag trans and made it so that the trans prob are 0
    '''
    new_trans_prob = trans_prob
    for tag_a in list(tag_count):
        for tag_b in list(tag_count):
            if (f'{tag_a},{tag_b}' not in trans_prob):
                new_trans_prob[f'{tag_a},{tag_b}'] = 0
    return new_trans_prob

trans_prob = populate_trans_prob(trans_prob, tag_count)

In [11]:
def viterbi(trans_prob, emission_prob, sentence_words):
    '''
    Params format:
        trans_prob     : {'given_tag,predict_tag' : probability}
        emission_prob  : {'word,tag' : probability}
        sentence_words : ['word1', 'word2', 'word3', ...]
    '''
    
    # assign prev_seq menjadi <start> with the probability of 1
    prev_seq = [('<start>',1)]
    last_possible_seq = []
    
    # looping untuk setiap kata dalam sentence_words
    for curr_word in sentence_words:
        '''
        Variable Format:
            curr      = [(word, tag)]
            c_tag     = (word, tag)
            p_seq     = [(prev_squence, prob)]
            pre_tag   = previous tag
            cur_tag   = current tag
        '''
        # assign curr_tag with all possible tag for the current word
        curr_tag = [(key, value) for key, value in emission_prob.items() if key.startswith(curr_word+',')]
        best_seq = []
        # looping untuk setiap tag
        for c_tag in curr_tag:
            possible_seq = {}
            # looping untuk semua kemungkinan sequence sebelumnya
            for p_seq in prev_seq:
                prev_prob = p_seq[1]
                # karena p_seq & c_tag disimpan dalam format 'tag1,tag2,tag3'
                # maka akan di split by ',' dan diambil index paling terakhirnya
                pre_tag = p_seq[0].split(',')[-1]
                cur_tag = c_tag[0].split(',')[-1]
                # hitung emission dan transition
                emission = emission_prob[f'{curr_word},{cur_tag}']
                transition = trans_prob[f'{pre_tag},{cur_tag}']
                # hitung probability
                prob = emission * transition * prev_prob
                possible_seq[f'{p_seq[0]},{cur_tag}'] = prob
            # ambil sequence dengan probability terbesar dan masukkan ke dalam best_seq
            best_key = sorted(possible_seq, key=lambda x:x[1], reverse=True)[0]
            best_seq.append((best_key, possible_seq[best_key]))
            # apabila kata sekarang adalah kata terakhir, maka langsung mencari sequence terbaik
            if (curr_word == sentence_words[-1]):
                last_possible_seq.append((best_key, possible_seq[best_key]))
        prev_seq = best_seq
    # ambil possible sequence yg terbaik
    # dan ambil hanya sequencenya saja, tidak dengan probability dan sudah di split by (',')
    return sorted(last_possible_seq, key=lambda x:x[1], reverse=True)[0][0].split(',')[1:]

In [12]:
sentence = 'Banyak orang menduga mereka ingin membujuk para tamu agar lebih banyak menyumbang untuk amal'
viterbi(trans_prob, emission_prob, sentence.split())

['CD',
 'NN',
 'VB',
 'PRP',
 'RB',
 'VB',
 'DT',
 'NN',
 'SC',
 'RB',
 'CD',
 'VB',
 'SC',
 'NN']

# Classification

In [13]:
def read_dataset(fname):
    with open(fname) as f:
        content = f.readlines()
    content = [x.strip() for x in content]
    idx_line = 0
    is_first_word = 0
    sentences = []
    tags = []
    
    while idx_line < len(content):
        curr_word_list = []
        curr_tag_list  = []
        while not content[idx_line].startswith('</kalimat'):
            if not content[idx_line].startswith('<kalimat'):
                split_cont = content[idx_line].split('\t')
                curr_word_list.append(split_cont[0])
                curr_tag_list.append(split_cont[1])
            idx_line += 1
        sentences.append(curr_word_list)
        tags.append(curr_tag_list)
        idx_line += 1
    
    return sentences,tags

In [14]:
sentences,tags = read_dataset('Indonesian_Manually_Tagged_Corpus_ID.tsv')

sentences,tags = sentences[:1000],tags[:1000]

In [15]:
def features(sentence, index):
    """ sentence: [w1, w2, ...], index: the index of the word """
    return {
        'word': sentence[index],
        'prev_word': '' if index == 0 else sentence[index - 1],
        'next_word': '' if index == len(sentence) - 1 else sentence[index + 1],
        'prefix-1': sentence[index][0],
        'prefix-2': sentence[index][:2],
        'prefix-3': sentence[index][:3],
        'suffix-1': sentence[index][-1],
        'suffix-2': sentence[index][-2:],
        'suffix-3': sentence[index][-3:],
        'is_last' : True if index == len(sentence) - 1 else False,
        'is_first': True if index == 0 else False,
    }
 
def transform_to_dataset(sentences, tags):
    X, y = [], []
 
    for sentence_idx in range(len(sentences)):
        for index in range(len(sentences[sentence_idx])):
            X.append(features(sentences[sentence_idx], index))
            y.append(tags[sentence_idx][index])
 
    return X, y

In [16]:
# # Train test split with ratio of 3:1
cutoff = int(.75 * len(sentences))
training_sentences = sentences[:cutoff]
test_sentences = sentences[cutoff:]
training_tags = tags[:cutoff]
test_tags = tags[cutoff:]  

In [17]:
X, y = transform_to_dataset(training_sentences, training_tags)

from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_extraction import DictVectorizer
from sklearn.pipeline import Pipeline
 
clf = Pipeline([
    ('vectorizer', DictVectorizer(sparse=False)),
    ('classifier', RandomForestClassifier(n_estimators=10))
])

clf.fit(X, y)   
 
print('Training completed')

X_test, y_test = transform_to_dataset(test_sentences, test_tags)

Training completed


In [18]:
# # Test model yang sudah dilatih dengan kalimat masukan bebas

def pos_tag(sentence):
    tags = clf.predict([features(sentence, index) for index in range(len(sentence))])
#     return list(zip(sentence, tags))
    return tags
 
print(pos_tag('Banyak orang menduga mereka ingin membujuk para tamu agar lebih banyak menyumbang untuk amal'.split()))

['CD' 'NN' 'VB' 'PRP' 'RB' 'VB' 'DT' 'NN' 'SC' 'RB' 'CD' 'VB' 'IN' 'NN']


## Accuracy

In [19]:
def checkAccuracy(y_pred, y_train):
    idx = 0
    true_predicted_tag = 0
    number_of_tag = 0
    # looping sebanyak panjang dari data yang di test
    while idx < len(y_pred):
        # kalau panjang list sama (karena bisa saja tidak)
        if len(y_pred[idx]) == len(y_train[idx]):
            pred = np.array(y_pred[idx])
            true = np.array(y_train[idx])
            # hitung jumlah yang sama
            true_predicted_tag += sum(pred == true)
            # hitung jumlah tag yang dicek
            number_of_tag += len(pred)
        idx += 1
    # kembalikan akurasinya
    return true_predicted_tag/number_of_tag

In [20]:
viterbi_pred = []
baseline_pred = []
for sent in test_sentences:
    viterbi_pred.append(viterbi(trans_prob, emission_prob, sent))
    baseline_pred.append(baseline_method(trans_prob,emission_prob,sent))

In [21]:
# Accuracy for Baseline

print(f'from {len(test_tags)} test data:')
print(f'    Baseline      : {checkAccuracy(baseline_pred, test_tags)}')
print(f'    Random Forest : {clf.score(X_test, y_test)}')
print(f'    Viterbi       : {checkAccuracy(viterbi_pred, test_tags)}')

from 250 test data:
    Baseline      : 0.8681524083393243
    Random Forest : 0.9350107836089144
    Viterbi       : 0.9529359693124816
