In [7]:
import re
from collections import Counter,defaultdict
import random
import numpy as np
import math
import collections

In [3]:

def import_corpus(path_to_file):
    sentences = []
    sentence = []
    states = set()
    emission_symbols = set()

    f = open(path_to_file)
    
    while True:
        line = f.readline()
        if not line: break
            
        line = line.strip()
        if len(line) == 0:
            sentences.append(sentence)    
            sentence = []
            continue
                
        parts = line.split(' ')
        sentence.append((parts[0], parts[-1]))
        states.add(parts[-1])
        
    f.close()        
    token_counts = Counter([token for sentence in sentences for token, _ in sentence])
    for i, sentence in enumerate(sentences):
        sentences[i] = [(token if token_counts[token] > 1 else '<unknown>', label) for token, label in sentence]
    emission_symbols.add('<unknown>')
    return sentences, states, token_counts

corpus_path = 'corpus_ner.txt'
sentences, states, emission_symbols = import_corpus(corpus_path)




In [4]:
states

{'B-LOC', 'B-MISC', 'B-ORG', 'B-PER', 'I-LOC', 'I-MISC', 'I-ORG', 'I-PER', 'O'}

In [33]:
internal_representation = {
    'Pπ': estimate_initial_state_probabilities(sentences, states),
    'Pa': estimate_transition_probabilities(sentences, states),
    'Pb':estimate_emission_probabilities(sentences, states, emission_symbols)
}

def initial_state_probabilities(state, internal_representation):
    return internal_representation['Pπ'][state]

def transition_probabilities(state_from, state_to, internal_representation):
    return internal_representation['Pa'][state_from].get(state_to, 1e-10)

def emission_probabilities(state, emission_symbol, internal_representation):
    return  internal_representation['Pb'][state].get(emission_symbol, 1e-10)
    
initial_probs = internal_representation['Pπ']
transition_probs = internal_representation['Pa']
emission_probs = internal_representation['Pb']

In [18]:
initial_probs

{'B-MISC': 0.051075268817204304,
 'B-LOC': 0.07984866587017124,
 'I-LOC': 0.0,
 'B-PER': 0.11210673038630029,
 'B-ORG': 0.08562325766626842,
 'I-PER': 0.0,
 'O': 0.6713460772600558,
 'I-MISC': 0.0,
 'I-ORG': 0.0}

In [19]:
transition_probs

{'B-MISC': {'B-MISC': 0.01796930830857186,
  'B-LOC': 0.02437751532703953,
  'I-LOC': 0.005242223643133862,
  'B-PER': 0.022755482002482418,
  'B-ORG': 0.018326625794561253,
  'I-PER': 0.0165870538232971,
  'O': 0.873128784744424,
  'I-MISC': 0.00592864933990296,
  'I-ORG': 0.015684357016587054},
 'B-LOC': {'B-MISC': 0.01796930830857186,
  'B-LOC': 0.02437751532703953,
  'I-LOC': 0.005242223643133862,
  'B-PER': 0.022755482002482418,
  'B-ORG': 0.018326625794561253,
  'I-PER': 0.0165870538232971,
  'O': 0.873128784744424,
  'I-MISC': 0.00592864933990296,
  'I-ORG': 0.015684357016587054},
 'I-LOC': {'B-MISC': 0.01796930830857186,
  'B-LOC': 0.02437751532703953,
  'I-LOC': 0.005242223643133862,
  'B-PER': 0.022755482002482418,
  'B-ORG': 0.018326625794561253,
  'I-PER': 0.0165870538232971,
  'O': 0.873128784744424,
  'I-MISC': 0.00592864933990296,
  'I-ORG': 0.015684357016587054},
 'B-PER': {'B-MISC': 0.01796930830857186,
  'B-LOC': 0.02437751532703953,
  'I-LOC': 0.005242223643133862,
 

In [20]:
emission_probs

{'O': {'SOCCER': 0.0011275421659201678,
  '-': 0.006079374811643301,
  '<unknown>': 0.03916260509004749,
  'WIN': 0.0001350972180365178,
  ',': 0.0440053207519719,
  'IN': 0.00043646793519490373,
  'SURPRISE': 1.0392093695116756e-05,
  'DEFEAT': 1.0392093695116756e-05,
  '.': 0.05213713406840076,
  'began': 0.00029097862346326913,
  'the': 0.050744593513255117,
  'defence': 0.0001662734991218681,
  'of': 0.024296715059182974,
  'their': 0.0024577301588951128,
  'title': 0.0002598023423779189,
  'with': 0.005975453874692134,
  'a': 0.02123104741912353,
  'lucky': 2.078418739023351e-05,
  '2-1': 0.00015588140542675133,
  'win': 0.0006858781838777058,
  'against': 0.0015068535857919296,
  'in': 0.024291519012335415,
  'Group': 1.5588140542675134e-05,
  'C': 3.637232793290864e-05,
  'championship': 0.00024421420183524375,
  'match': 0.0008417595893044572,
  'on': 0.013785112286572376,
  'Friday': 0.0020004780363099754,
  'But': 0.0009976409947312086,
  'saw': 0.00012990117118895944,
  'luc

In [16]:

def estimate_initial_state_probabilities(observations, states):
    total_sequences = len(observations)
    initial_state_counts = dict.fromkeys(states,0)
    # initial_state_counts = collections.defaultdict(int)
    for sequence in observations:
        initial_state_counts[sequence[0][-1]] += 1

    estimated_initial_state_probabilities = {state: count / total_sequences for state, count in initial_state_counts.items()}
    
    return estimated_initial_state_probabilities

def estimate_transition_probabilities(observations, states):
    transition_counts = dict.fromkeys(states,dict.fromkeys(states,0))
    # transition_counts = collections.defaultdict(lambda: defaultdict(int))
    for sequence in observations:
        for i in range(len(sequence) - 1):
            transition_counts[sequence[i][-1]][sequence[i+1][-1]] += 1

    estimated_transition_probabilities = {
        state_from: {state_to: count / sum(transition_counts[state_from].values())
                     for state_to, count in transitions.items()}
        for state_from, transitions in transition_counts.items()
    }

    return estimated_transition_probabilities

def estimate_emission_probabilities(observations, states, emission_symbols):
    emission_counts = defaultdict(lambda: defaultdict(int))

    for sequence in observations:
        for  symbol, state in sequence:
            emission_counts[state][symbol] += 1

    estimated_emission_probabilities = {
        state: {symbol: count / sum(emission_counts[state].values())
                for symbol, count in emissions.items()}
        for state, emissions in emission_counts.items()
    }

    return estimated_emission_probabilities



In [38]:
def most_likely_state_sequence(observed_symbols, initial_state_probabilities_parameters, transition_probabilities_parameters,
                               emission_probabilities_parameters):
    viterbi = [{}]
    path = {}

    epsilon = 1e-10

    for state, prob in initial_state_probabilities_parameters.items():
        initial_prob = prob if prob !=0 else epsilon
        emission_prob = emission_probabilities_parameters[state].get(observed_symbols[0], epsilon)
        viterbi[0][state] = math.log(initial_prob) + math.log(emission_prob)
        path[state] = [state]

    for t in range(1, len(observed_symbols)):
        viterbi.append({})
        new_path = {}

        for state, params in transition_probabilities_parameters.items():
            (prob, prev_state) = max(
                (viterbi[t - 1][prev_state] + math.log(params[prev_state] + epsilon) +
                 math.log(emission_probabilities_parameters[state].get(observed_symbols[t], epsilon)), prev_state)
                for prev_state in transition_probabilities_parameters
            )
            viterbi[t][state] = prob
            new_path[state] = path[prev_state] + [state]
        path = new_path
    (prob, state) = max((viterbi[len(observed_symbols) - 1][final_state], final_state) for final_state in viterbi[-1])

    return path[state]



In [22]:
input_sentence = ["Japan","coach" ,"Shu Kamo","said","The" ,"Syrian","own" ,"goal" ,"proved" ,"lucky","for" ,"us"]

result_sequence = most_likely_state_sequence(input_sentence, initial_probs,transition_probs,emission_probs)
print("Most Likely State Sequence:")
print(result_sequence)

Most Likely State Sequence:
['B-LOC', 'O', 'O', 'O', 'O', 'B-MISC', 'O', 'O', 'O', 'O', 'O', 'O']


In [36]:


def most_likely_state_sequence1(sentence, initial_state_probabilities_parameters, transition_probabilities_parameters, emission_probabilities_parameters):
    # Initialize the forward variables
    epsilon = 1e-10

    forward_probabilities = {}
    for state in initial_state_probabilities_parameters:
        prob=initial_state_probabilities(state,internal_representation)
        initial_prob = prob if prob !=0 else epsilon
        emission_prob = emission_probabilities(state,sentence[0],internal_representation)
        forward_probabilities[state] = math.log(initial_prob) +math.log(emission_prob)

    # Calculate the forward probabilities recursively
    for t in range(1, len(sentence)):
        for state in transition_probabilities_parameters:
            forward_probabilities[state] = 0
            for previous_state in transition_probabilities_parameters:
                if sentence[t] in emission_probabilities_parameters[state]:
                    emission_prob = emission_probabilities(state,sentence[t],internal_representation) 
                else:
                    emission_prob = emission_probabilities(state,'<unknown>',internal_representation)

                forward_probabilities[state] += math.exp(forward_probabilities[previous_state]+ 
                                                         transition_probabilities(previous_state,state,internal_representation)+
                                                         math.log(emission_prob))

    # Backtrack to find the most likely state sequence
    backpointers = {}
    for t in range(len(sentence) - 1, -1, -1):
        for state in transition_probabilities_parameters:
            argmax = -math.inf
            for previous_state in transition_probabilities_parameters:
                score = forward_probabilities[previous_state] + transition_probabilities(previous_state,state,internal_representation)
                if score > argmax:
                    argmax = score
                    backpointers[state] = previous_state

    states = []
    current_state = max(forward_probabilities, key=forward_probabilities.get)
    while current_state != 'START':
        states.append(current_state)
        current_state = backpointers[current_state]
    states.reverse()

    return states


In [37]:
input_sentence = ["Japan","coach" ,"Shu Kamo","said","The" ,"Syrian","own" ,"goal" ,"proved" ,"lucky","for" ,"us"]

result_sequence = most_likely_state_sequence1(input_sentence, initial_probs,transition_probs,emission_probs)
print("Most Likely State Sequence:")
print(result_sequence)

OverflowError: math range error