In [1]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import seaborn as sns
from tqdm import tqdm
import random
from matplotlib import pyplot as plt # show graph
from sklearn.model_selection import GroupShuffleSplit
from nltk import word_tokenize,pos_tag
import warnings
warnings.filterwarnings('ignore')
from hmmlearn import hmm
from sklearn.metrics import confusion_matrix, classification_report,accuracy_score, precision_score, recall_score, f1_score, roc_auc_score




In [2]:
import re
alphabets= "([A-Za-z])"
prefixes = "(Mr|St|Mrs|Ms|Dr)[.]"
suffixes = "(Inc|Ltd|Jr|Sr|Co)"
starters = "(Mr|Mrs|Ms|Dr|He\s|She\s|It\s|They\s|Their\s|Our\s|We\s|But\s|However\s|That\s|This\s|Wherever)"
acronyms = "([A-Z][.][A-Z][.](?:[A-Z][.])?)"
websites = "[.](com|net|org|io|gov)"
digits = "([0-9])"

def split_into_sentences(text):
    text = " " + text + "  "
    text = text.replace("\n"," ")
    text = re.sub(prefixes,"\\1<prd>",text)
    text = re.sub(websites,"<prd>\\1",text)
    text = re.sub(digits + "[.]" + digits,"\\1<prd>\\2",text)
    if "..." in text: text = text.replace("...","<prd><prd><prd>")
    if "Ph.D" in text: text = text.replace("Ph.D.","Ph<prd>D<prd>")
    text = re.sub("\s" + alphabets + "[.] "," \\1<prd> ",text)
    text = re.sub(acronyms+" "+starters,"\\1<stop> \\2",text)
    text = re.sub(alphabets + "[.]" + alphabets + "[.]" + alphabets + "[.]","\\1<prd>\\2<prd>\\3<prd>",text)
    text = re.sub(alphabets + "[.]" + alphabets + "[.]","\\1<prd>\\2<prd>",text)
    text = re.sub(" "+suffixes+"[.] "+starters," \\1<stop> \\2",text)
    text = re.sub(" "+suffixes+"[.]"," \\1<prd>",text)
    text = re.sub(" " + alphabets + "[.]"," \\1<prd>",text)
    if "”" in text: text = text.replace(".”","”.")
    if "\"" in text: text = text.replace(".\"","\".")
    if "!" in text: text = text.replace("!\"","\"!")
    if "?" in text: text = text.replace("?\"","\"?")
    text = text.replace(".",".<stop>")
    text = text.replace("?","?<stop>")
    text = text.replace("!","!<stop>")
    text = text.replace("<prd>",".")
    sentences = text.split("<stop>")
    sentences = sentences[:-1]
    sentences = [s.strip() for s in sentences]
    return sentences

In [3]:
import io
with io.open('harryp.txt','r',encoding='utf8') as f:
    txt = f.read()

In [4]:
# Split text into sentences
sentences = split_into_sentences(txt)

# Create DataFrame with sentences
df = pd.DataFrame({'sentence': sentences})

# POS tagging
dic1 = {'sentence': [], 'word': [], 'pos': []}
for i in range(df.shape[0]):
    text = word_tokenize(df['sentence'].iloc[i])
    for word, pos in pos_tag(text):
        dic1['sentence'].append(i)
        dic1['word'].append(word)
        dic1['pos'].append(pos)

# Create DataFrame with POS tagging results
df2 = pd.DataFrame(dic1)
df2.head()

Unnamed: 0,sentence,word,pos
0,0,Mr,NNP
1,0,and,CC
2,0,Mrs,NNP
3,0,Dursley,NNP
4,0,",",","


In [5]:
data = pd.read_csv("words_pos1.csv" , encoding='latin1' , index_col=[0])
data.head()

Unnamed: 0,sentence,word,pos
0,0,Mr,NNP
1,0,and,CC
2,0,Mrs,NNP
3,0,Dursley,NNP
4,0,",",","


In [6]:
tags = list(set(data.pos.values)) #Read POS values
words = list(set(data.word.values))
len(tags), len(words)

(38, 857)

In [7]:
y = data.pos
X = data.drop('pos', axis=1)

gs = GroupShuffleSplit(n_splits=2, test_size=.33, random_state=42)
train_ix, test_ix = next(gs.split(X, y, groups=data['sentence']))

data_train = data.loc[train_ix]
data_test = data.loc[test_ix]

data_train

Unnamed: 0,sentence,word,pos
0,0,Mr,NNP
1,0,and,CC
2,0,Mrs,NNP
3,0,Dursley,NNP
4,0,",",","
...,...,...,...
3070,207,Flocks,NNS
3071,207,of,IN
3072,207,owls,NN
3073,210,shooting,VBG


In [8]:
data_test

Unnamed: 0,sentence,word,pos
54,2,Mr,NNP
55,2,Dursley,NNP
56,2,was,VBD
57,2,the,DT
58,2,director,NN
...,...,...,...
3061,205,Dursleys,NNP
3062,205,â,NNP
3063,205,dark,JJ
3064,205,living-room,NN


In [9]:
tags = list(set(data_train.pos.values)) #Read POS values
words = list(set(data_train.word.values))
len(tags), len(words)

(36, 673)

In [10]:
dfupdate = data_train.sample(frac=.15, replace=False, random_state=42)
dfupdate.word = 'UNKNOWN'
data_train.update(dfupdate)
words = list(set(data_train.word.values))
# Convert words and tags into numbers
word2id = {w: i for i, w in enumerate(words)}
tag2id = {t: i for i, t in enumerate(tags)}
id2tag = {i: t for i, t in enumerate(tags)}
len(tags), len(words)

(36, 616)

In [11]:
count_tags = dict(data_train.pos.value_counts())
count_tags_to_words = data_train.groupby(['pos']).apply(lambda grp: grp.groupby('word')['pos'].count().to_dict()).to_dict()
count_init_tags = dict(data_train.groupby('sentence').first().pos.value_counts())

# TODO use panda solution
count_tags_to_next_tags = np.zeros((len(tags), len(tags)), dtype=int)
sentences = list(data_train.sentence)
pos = list(data_train.pos)
for i in range(len(sentences)) :
    if (i > 0) and (sentences[i] == sentences[i - 1]):
        prevtagid = tag2id[pos[i - 1]]
        nexttagid = tag2id[pos[i]]
        count_tags_to_next_tags[prevtagid][nexttagid] += 1

In [12]:
mystartprob = np.zeros((len(tags),))
mytransmat = np.zeros((len(tags), len(tags)))
myemissionprob = np.zeros((len(tags), len(words)))
num_sentences = sum(count_init_tags.values())
sum_tags_to_next_tags = np.sum(count_tags_to_next_tags, axis=1)
for tag, tagid in tag2id.items():
    floatCountTag = float(count_tags.get(tag, 0))
    mystartprob[tagid] = count_init_tags.get(tag, 0) / num_sentences
    for word, wordid in word2id.items():
        myemissionprob[tagid][wordid]= count_tags_to_words.get(tag, {}).get(word, 0) / floatCountTag
    for tag2, tagid2 in tag2id.items():
        mytransmat[tagid][tagid2]= count_tags_to_next_tags[tagid][tagid2] / sum_tags_to_next_tags[tagid]

In [13]:
model = hmm.CategoricalHMM(n_components=len(tags), algorithm='viterbi', random_state=42)
model.startprob_ = mystartprob
model.transmat_ = mytransmat
model.emissionprob_ = myemissionprob

In [14]:
data_test.loc[~data_test['word'].isin(words), 'word'] = 'UNKNOWN'
data_test.loc[~data_test['pos'].isin(tags),'pos'] = random.choice(tags)
#print(data_test)
word_test = list(data_test.word)
#print(word_test)
samples = []
for i, val in enumerate(word_test):
    samples.append([word2id[val]])
    ##print(i,val,samples)


In [15]:
# TODO use panda solution
lengths = []
count = 0
sentences = list(data_test.sentence)
for i in range(len(sentences)) :
    if (i > 0) and (sentences[i] == sentences[i - 1]):
        count += 1
    elif i > 0:
        lengths.append(count)
        count = 1
    else:
        count = 1
len(lengths),len(samples)

(58, 976)

In [16]:
# Convert samples to a 2D NumPy array
samples_2d = np.array(samples)

# Normalize the rows of the emissionprob_ matrix
emissionprob_normalized = model.emissionprob_ / model.emissionprob_.sum(axis=1, keepdims=True)
model.emissionprob_ = emissionprob_normalized

# Predict POS tags
pos_predict = model.predict(samples_2d, lengths)
pos_predict


array([ 7,  7, 11, 26, 12, 15, 26, 12, 32,  7, 21, 27, 11, 15, 26,  7, 22,
       35, 35, 35, 35, 35, 35, 35, 35, 35, 35, 35, 35, 35,  0,  9,  7, 10,
        7,  7, 11, 24, 15, 26, 12, 21,  7,  7,  7,  7,  7, 21, 28, 11, 12,
       15, 26, 12, 22, 35, 35, 35, 35, 35, 35, 35, 35, 35, 35, 35, 35, 35,
       35, 35,  0, 16, 11, 15, 12, 21,  7,  7, 11, 24, 33, 12, 21,  7,  7,
        7, 15, 26, 12, 21, 10, 11, 25,  7,  7, 21, 10,  7, 21, 15,  7, 11,
        0, 15, 26, 12, 10, 11, 33, 12, 15, 26, 12,  7,  7,  7, 21,  7, 22,
       35, 35, 35, 35, 35, 35,  0, 35, 35, 35, 35, 35, 35, 35, 35, 35, 35,
       35, 35, 35, 35, 35, 35, 35, 35, 35, 35, 35, 35,  0, 35, 35, 35, 35,
       35, 35, 35, 35, 35, 35, 35, 35, 35, 35, 35, 35, 35, 35, 35, 35, 35,
       35, 35,  0, 35, 35, 35, 35, 35, 35, 35, 35, 35, 35, 35, 35, 35, 35,
       35, 35, 35, 35,  0,  7,  7, 11, 16, 26, 12, 21, 10, 11, 26, 12, 15,
       15, 33, 12, 15, 16, 11, 15, 26, 31, 12, 15, 12, 21, 16,  2, 31, 17,
       22, 35, 15,  0, 11

In [17]:
predicted_sequence = []
for i in pos_predict:
    predicted_sequence.append(id2tag[i])


In [18]:
for x,y in zip(data_test['word'],predicted_sequence):
    print(("Word = "+str(x),"Predicted POS = "+str(y)))

('Word = Mr', 'Predicted POS = NNP')
('Word = Dursley', 'Predicted POS = NNP')
('Word = was', 'Predicted POS = VBD')
('Word = the', 'Predicted POS = DT')
('Word = UNKNOWN', 'Predicted POS = NN')
('Word = of', 'Predicted POS = IN')
('Word = a', 'Predicted POS = DT')
('Word = UNKNOWN', 'Predicted POS = NN')
('Word = called', 'Predicted POS = VBN')
('Word = UNKNOWN', 'Predicted POS = NNP')
('Word = ,', 'Predicted POS = ,')
('Word = which', 'Predicted POS = WDT')
('Word = made', 'Predicted POS = VBD')
('Word = UNKNOWN', 'Predicted POS = IN')
('Word = The', 'Predicted POS = DT')
('Word = Dursleys', 'Predicted POS = NNP')
('Word = UNKNOWN', 'Predicted POS = .')
('Word = to', "Predicted POS = ''")
('Word = think', "Predicted POS = ''")
('Word = what', "Predicted POS = ''")
('Word = the', "Predicted POS = ''")
('Word = UNKNOWN', "Predicted POS = ''")
('Word = would', "Predicted POS = ''")
('Word = say', "Predicted POS = ''")
('Word = if', "Predicted POS = ''")
('Word = the', "Predicted POS = '

In [19]:
tags_test = list(data_test.pos)
pos_test = np.zeros((len(tags_test), ), dtype=int)
# print(pos_test)
# print(tags_test)
# print(len(pos_test))
# print(len(tags_test))

for i, val in enumerate(tags_test):
    pos_test[i] = tag2id[val]
len(pos_predict), len(pos_test), len(samples), len(word_test)

(963, 976, 976, 976)

In [20]:
def reportTest(y_pred, y_test):
    print("The accuracy is {}".format(accuracy_score(y_test, y_pred))) 
    print("The precision is {}".format(precision_score(y_test, y_pred, average='weighted'))) 
    print("The recall is {}".format(recall_score(y_test, y_pred, average='weighted'))) 
    print("The F1-Score is {}".format(f1_score(y_test, y_pred, average='weighted')))

min_length = min(len(pos_predict), len(pos_test))

reportTest(pos_predict[:min_length], pos_test[:min_length])

The accuracy is 0.45586708203530635
The precision is 0.7590006443683556
The recall is 0.45586708203530635
The F1-Score is 0.5592902696546449
