# Welcome!

This notebook is a tutorial of how to do named entity recognition (NER) using HMM model. 
In a HMM model, we assume that the hidden state (named entity state) transition from one to another, then emit an observation (the token)
Therefore we need to train the transition and emission probabilities. 
|
The inference will find a path that maximizes the tag sequence's probability.


## Import

In [5]:

import numpy as np
import pandas as pd


## Util functions for data preparation


In [6]:
def preprocess(filename,delimiter=','):
    
    with open(filename) as myfile:
            data = myfile.readlines()
            data = [i.rstrip('\n') for i in data]

    data = [i.split(delimiter) for i in data]



    for i in data:
                if i != [''] and i!=[]:
                    del i[1]
                    del i[1]  # delete the middle 2 columns from the data
    for i in range(0, len(data)):
                if data[i] == [''] or data[i]==[]:
                    data[i] = ["", "O"]


    return data


In [7]:
def word2sents(data_string_list):
    data = list()
    X = list()
    Y = list()
    for data_string in data_string_list:

        if data_string == ['', 'O'] or data_string == ['']:
            if X == ['-DOCSTART-']:
                X = list()
                Y = list()
                continue

            data.append((X, Y))
            X = list()
            Y = list()
        else:

            X.append(data_string[0])
            Y.append(data_string[-1])

    if len(X) > 0:
        data.append((X, Y))

    data = [x for x in data if len(x) != 0]

    return data


## Data preparation

In [19]:

tags_space=['B-PER', 'B-LOC', 'B-ORG','B-MISC', 'I-PER', 'I-LOC', 'I-ORG', 'I-MISC', 'O']

"""
the data should be in CoNLL-2003 dataset's format.
each line looks like this:
(this tutorial does not provide the CoNLL-2003 annotated dataset)

-DOCSTART- -X- O O

EU NNP I-NP I-ORG
rejects VBZ I-VP O
German JJ I-NP I-MISC
call NN I-NP O
to TO I-VP O
boycott VB I-VP O
British JJ I-NP I-MISC
lamb NN I-NP O
. . O O

"""

train_path='/Users/terenceau1/PycharmProjects/pythonProject/data/eng.train'

conll_train = preprocess(train_path, delimiter=' ')

train_set = word2sents(conll_train)
#each element is a tuple of (tokens, tags)



test_path='/Users/terenceau1/PycharmProjects/pythonProject/data/eng.testb'
conll_test=preprocess(test_path,delimiter=' ')

test_set=word2sents(conll_test)

y_truth=[sent[1] for sent in test_set]  #the ground truth labels for the test set.




In [20]:
#create the vocabulary of the training set

sentences=[x[0] for x in train_set]
words=[]
for sentence in sentences:
# Iterate over each item in the sublist
    for word in sentence:
    # Append each item to the flattened list
        words.append(word)



words=[x for x in words if x!='']
words=list(set(words))
words=sorted(words, key=lambda L: (L.lower(), L))



tag2idx= {w: i for i, w in enumerate(tags_space)}
word2idx= {w: i for i, w in enumerate(words)}



# Training

## Util functions for training

In [None]:
def t2_given_t1(t2, t1, train_dataset):


    count_t2_t1 = 0
    for k in range(0 ,len(train_dataset)):

        tags =train_dataset[k][1]

        for index in range(len(tags ) -1):
            if tags[index ]==t1 and tags[index +1] == t2:
                count_t2_t1 += 1
    return count_t2_t1

In [None]:

def word_given_tag(word, tag, train_dataset):
    count_tag =0
    count_w_given_tag =0
    for k in range(0 ,len(train_dataset)):
        sent =train_dataset[k]
        for j in range(0 ,len(sent[0])):
            if sent[1][j ]==tag:
                count_tag+=1
                if sent[0][j ]==word:
                    count_w_given_tag+=1


    return (count_w_given_tag, count_tag)





## Training step

In [17]:
#train the transition matrix


transition_matrix = np.zeros((len(tags_space) , len(tags_space)), dtype='float32')
for i, t1 in enumerate(list(tags_space)):
        occurrence_t1=0
        for j, t2 in enumerate(list(tags_space)):
            occurrence_t1+= t2_given_t1(t2,t1,train_set)



        for j, t2 in enumerate(list(tags_space)):

            try:

                transition_matrix[i, j] =  t2_given_t1(t2, t1, train_set) / occurrence_t1
                                    #t2 given t1 means, t1 first, then t2


            except:
                transition_matrix[i, j]=0


row_sums = transition_matrix.sum(axis=1)

#this handles cases where some entries in row_sums=0
for i in range(transition_matrix.shape[0]):
    if row_sums[i] != 0:
        transition_matrix[i] /= row_sums[i]
    else:
        # Handle rows with sum zero separately
        #which means, the "from" state is never encountered.
        transition_matrix[i] = np.zeros(transition_matrix.shape[1])


#we can visualize it clearer in pandas dataframe
transition_df=pd.DataFrame(transition_matrix, columns=tags_space, index=tags_space)


In [None]:
initial_dist=np.zeros((1,len(tags_space)),dtype='float32')

for j in range(0,len(train_set)):
    first_word_tag = train_set[j][1][0]

    initial_dist[0,tag2idx[first_word_tag]]+= 1 #the initial distribution
#this row is unnormalized for now. It will be normalized in the next step (to make them sum to 1)

row_sums = initial_dist.sum(axis=1)
initial_dist /= initial_dist.sum(axis=1)

In [22]:
#train emission matrix


emission_matrix = np.zeros((len(tags_space), len(word2idx)), dtype='float32')

for i in range(0,len(train_set)):
    sent=train_set[i]
    for j in range(0,len(sent[0])):
        word=sent[0][j]
        tag=sent[1][j]
        emission_matrix[tag2idx[tag],word2idx[word]]+=1



row_sums = emission_matrix.sum(axis=1)


#normalize

for i in range(emission_matrix.shape[0]):
    if row_sums[i] != 0:
        emission_matrix[i] /= row_sums[i]
    else:
        # Handle rows with sum zero separately
        emission_matrix[i] = np.zeros(emission_matrix.shape[1])


emission_df = pd.DataFrame(emission_matrix, columns=list(word2idx.keys()), index=list(tags_space))


# Inference

## Utils for inference

In [26]:

def viterbi_new(obs, trans_matrix, emit_matrix, initial_dist ,tag2idx ,word2idx):
    # the obs is of length T, the trans matrix is of size C time C.

    states =list(tag2idx.keys())

    # Step 2: Initialize Variables
    viterbi_table = [[0.0 for _ in range(len(states))] for _ in range(len(obs))]
    backpointer = [["O" for _ in range(len(states))] for _ in range(len(obs))]
    # list of length T, each being a sublist of length "C"

    backpointer[0 ] =["START" for _ in range(len(states))]

    if obs[0] in word2idx:

        emit_probs =emit_matrix[: ,word2idx[obs[0]]]
    else:
        emit_probs =np.ones((len(states),) ,dtype='float32')

    # viterbi_table[0]=initial_dist*emit_probs
    # the for-loop version of the element-wise vector multiplication above (for loop is slower, but clearer for readability)
    for s in range(0 ,len(states)):
        viterbi_table[0][s] = initial_dist[0, s] * emit_probs[s]

    probs = viterbi_table[0]


    nonzero_probs = [x for x in probs if x != 0]
    if nonzero_probs == []:
        probs =[0.01 for _ in range(len(states))]  # in the degenerate case where, all current state has prob=0, ie.
        # hmm encounters a situation where current observation is absolutely impossible
        # we assumes a uniform potential so the inference can continue at future time steps
    else:
        _, scale_constant = adjust_numbers_to_range(nonzero_probs, target_min=1 ,target_max=100) #scale them so they dont get too big or too small (for stability)
        probs =[ x *scale_constant for x in probs]


    viterbi_table[0] =probs



    # Step 3: Calculate Probabilities
    for t in range(1 ,len(obs)):

        if obs[t] in word2idx:

            emit_probs = emit_matrix[:, word2idx[obs[t]]]
        else:
            emit_probs = np.ones(( len(states),), dtype='float32')



        for s in states:  # s is current state
            trans_probs =[viterbi_table[ t -1][tag2idx[prev_s]] * trans_matrix[tag2idx[prev_s] ,tag2idx[s]] for prev_s in states]



            max_trans_probs = max(trans_probs)
            # max_trans_probs is the max transition prob, from the most likely previous state
            if max_trans_probs ==0.0:
                best_prev_state ='O'
            else:
                best_prev_state =np.argmax(trans_probs)
                best_prev_state =states[best_prev_state]


            viterbi_table[t][tag2idx[s]] = max_trans_probs * emit_probs[tag2idx[s]]

            backpointer[t][tag2idx[s]] = best_prev_state

        probs = viterbi_table[t]

        nonzero_probs = [x for x in probs if x != 0]
        if nonzero_probs==[]:
            probs = [0.01 for _ in
                     range(len(states))]
        else:
            _, scale_constant = adjust_numbers_to_range(nonzero_probs, target_min=1, target_max=100)
            probs = [x * scale_constant for x in probs]
        viterbi_table[t] = probs




    # Step 4: Traceback and Find Best Path
    best_path_prob = max(viterbi_table[-1])
    # best_path_pointer = max(range(len(states)), key=lambda s: viterbi_table[-1][s])
    best_end_state =states[np.argmax(viterbi_table[-1])]


    best_path = [best_end_state]
    for t in range(len(obs ) -1, 0, -1):
        best_path.insert(0, backpointer[t][tag2idx[best_path[0]]])

    # Step 5: Return Best Path
    return best_path, best_path_prob ,viterbi_table,backpointer





In [27]:
def adjust_numbers_to_range(numbers, target_max=1000, target_min=10, base=10):


    largest_number=max(numbers)
    scale_down_constant=1
    if largest_number>target_max:


        scale_down_power =0
        while largest_number * (base ** -scale_down_power) > target_max:
            scale_down_power += 1
        #scale_down_power-= 1


        scale_down_constant=base**(-scale_down_power)
        numbers = [num * scale_down_constant for num in numbers]


    # Find the smallest positive number in the list
    smallest_number = min(numbers)
    scale_up_constant=1

    if smallest_number<target_min:

        scale_up_power =0
        while smallest_number * (base ** scale_up_power) < target_min:
            scale_up_power += 1
        #scale_up_power-= 1

        # Calculate the constant (power of 10)
        scale_up_constant = base ** scale_up_power

        # Multiply all numbers by the constant
        numbers = [num * scale_up_constant for num in numbers]



    power=scale_up_constant*scale_down_constant

    return numbers,power


# Inference step

In [28]:

#just the tokens of the test dataset
test_dataset_tokens=[sent[0] for sent in test_set]

all_predictions=[]

for i in range(0,len(test_dataset_tokens)):

        sent=test_dataset_tokens[i]



        ans, prob, V, b = viterbi_new(obs=test_dataset_tokens[i],trans_matrix=  transition_matrix, emit_matrix=emission_matrix,
                                    initial_dist= initial_dist, tag2idx= tag2idx, word2idx=word2idx)


        all_predictions.append(ans)




# Evaluation

## Utils for evaluation

In [31]:

def endofphrase(prev, current):#if the previous word is the last word of a NE phrase, then returns true
    answer=False
    if prev.startswith("B") and current.startswith("B"):
        answer=True
    if prev.startswith("B") and current.startswith("O"):
        answer=True
    if prev.startswith("I") and current.startswith("B"):
        answer=True
    if prev.startswith("I") and current.startswith("O"):
        answer=True
    if prev!="O" and current!="O" and prev[2:]!=current[2:]:
        answer=True
    return answer




def startofphrase(prev, current):  #if the current word is the first word of a NE phrase, then returns true
    answer=False
    if current.startswith("B"):
        answer=True
    if prev.startswith("O") and current.startswith("I"):
        answer=True
    if prev!="O" and current!="O" and prev[2:]!=current[2:]:
        answer=True
    return answer


In [32]:
def performance_basic(predlist,truelist):
    if len(predlist)!=len(truelist):
        #sanity check
        print("not same!!!")
        return None
    total = len(predlist)
    tp = 0
    retrieved = 0
    relevant = 0
    correct=0

    check=False #checks if the prediction matches the true tag. this turns true when both detects a start a phrase. It remains true until an error occurs, or if the phrase ends


    if total>1:

        #loop through the sentence. True positive, retrieved and relevant are all counted on a phrase-basis (one phrase is counted as 1)

        for i in range(0,total):
            if i==0:
                trueprev='O'
                predprev='O'
            else:
                trueprev = truelist[i - 1]
                predprev = predlist[i - 1]


            truecurrent=truelist[i]
            predcurrent = predlist[i]


            if startofphrase(trueprev,truecurrent)==True:
                relevant+=1

            if startofphrase(predprev,predcurrent)==True:
                retrieved+=1

            if check==True:
                if endofphrase(trueprev,truecurrent) ==True and endofphrase(predprev,predcurrent)==True and trueprev[2:]==predprev[2:]:
                    tp+=1
                    check=False
                if truecurrent[2:]!=predcurrent[2:] or endofphrase(trueprev,truecurrent)!=endofphrase(predprev,predcurrent):
                    check=False

            if startofphrase(trueprev, truecurrent) == True and startofphrase(predprev,predcurrent) == True and truecurrent[2:] == predcurrent[2:]:
                        check = True
                
        if check==True: #this is to fill in the gap of the for-loop above. if the last word is in a NE and so far the check is ok , then this is also a true positive
            tp+=1

    elif total==1: #one-token sentence

        if truelist[0] == predlist[0]:
            correct += 1


        if truelist[0] != "O":
            relevant += 1

        if predlist[0] != "O":
            retrieved += 1
        if truelist[0] == predlist[0] and truelist[0]!='O':
            tp+=1


    """
    #you can print it if you want, to inspect it
    print("Total:",total)
    print("Releant:" , relevant)
    print("Retrieved:" , retrieved)
    print("TP:" , tp)
    """

    return relevant,retrieved,tp


## Evaulation step

In [None]:



tp = 0
retrieved = 0
relevant= 0


results=[]


for i in range(0,len(all_predictions)):

    truelist=y_truth[i]
    predlist=all_predictions[i]
    relevant,retrieved,tp=performance_basic(predlist,truelist)

    result=(relevant,retrieved,tp)



    results.append(result)



relevant = [x[0] for x in results]
relevant = sum(relevant)

retrieved = [x[1] for x in results]
retrieved = sum(retrieved)

tp = [x[2] for x in results]
tp = sum(tp)



try:
    precision = tp / retrieved
except:
    precision=0


try:
    recall = tp / relevant
except:
    recall=0

try:
    fscore = 2 * precision * recall / (precision + recall)
except:
    fscore=0


precision=round(precision*100,1)
recall=round(recall*100,1)
fscore=round(fscore*100,1)



#print(relevant)
print(f"HMM &  {precision} & {recall} & {fscore} & {tp} & {retrieved} & {relevant} ")




HMM &  70.9 & 55.5 & 62.2 & 3133 & 4418 & 5648 


# Conclusions

We have shown how to do NER using HMM. If trained and tested CoNLL-2003 dataset,
it should have 70.9% precision, 55.5% recall and 62.2% F-score.