In [1]:
import json
import copy

In [2]:
threshold = 2
smoothingProb = 0.000001

#### Vocabulary Creation

In [3]:
def readFile(path):
    file1 = open(path, "r+")
    lines = file1.readlines()
    file1.close()
    return lines
def prepareData(lines):
    vocab = {}
    tagVocab = {}
    tags = {}
    unkW = []
    unkF = 0

    for line in lines:
        word = line[:-1].split("\t")
        if(word[0] != ''):
            if word[1] in vocab:
                vocab[word[1]] += 1
            else:
                vocab[word[1]] = 1

            if word[2] in tags:
                tags[word[2]] += 1
            else:
                tags[word[2]] = 1

            if word[2] in tagVocab:
                if(word[1] in tagVocab[word[2]]):
                    tagVocab[word[2]][word[1]] += 1
                else:
                    tagVocab[word[2]][word[1]] = 1
            else:
                tagVocab[word[2]] = {}
                tagVocab[word[2]][word[1]] = 1        
    for word in vocab:
        if vocab[word] < threshold:
            unkW.append(word)

    for word in unkW:
        unkF += vocab.pop(word)
    
    
    vocab['<unk>'] = unkF
    vocab = dict(sorted(vocab.items(), key=lambda x:x[1], reverse=True))

    tagVocabD = copy.deepcopy(tagVocab)
    for tagV in tagVocab:
        for word in tagVocab[tagV]:
            if(word in unkW):
                tagVocabD[tagV].pop(word)
                
    tagVocab = copy.deepcopy(tagVocabD)
    
    return vocab, unkW, tags, tagVocab

In [4]:
def writeVocab(vocab):
    file2 = open("vocab.txt", "a")
    idx=1
    file2.write(str(idx)+"\t<unk>\t"+str(vocab['<unk>'])+"\n")
    idx+=1
    for word in vocab:
        if(word == '<unk>'):
            continue
        file2.write(str(idx)+"\t"+word+"\t"+str(vocab[word])+"\n")
        idx+=1
    file2.close()

### HMM Model Learning

In [5]:
def prepareTransitionProb(lines, tags):
    transitionDict = {}
    transitionProb = {}

    countPseudoState = 0
    flag = True

    for i in range(1, len(lines)):
        line1 = lines[i-1][:-1].split('\t')
        line2 = lines[i][:-1].split('\t')

        if(flag):
            transitionDict[line1[2]] = 1
            countPseudoState += 1
            flag = False

        if(line1[0]=='' and line2[0]!=''):
            if(line2[2] in transitionDict):
                transitionDict[line2[2]]+=1
                countPseudoState += 1
            else:
                transitionDict[line2[2]]=1
                countPseudoState += 1

        elif(line1[0]!='' and line2[0]!=''):
            transition = (line1[2], line2[2])

            if(transition in transitionDict):
                transitionDict[transition]+=1
            else:
                transitionDict[transition]=1
                
    for transition in transitionDict:
        if(type(transition) == tuple):
            transitionProb[transition] = transitionDict[transition]/tags[transition[0]]
        else:
            transitionProb[transition] = transitionDict[transition]/countPseudoState
    return transitionProb

def prepareEmissionProb(tagVocab, tags):
    emissionDict = {}
    emissionProb = {}
    for tag in tagVocab:
        for word in tagVocab[tag]:
            emission = (tag, word)
            emissionProb[emission] = tagVocab[tag][word]/tags[tag]
    return emissionProb

In [6]:
def writeJSON(transitionProb, emissionProb):
    modelParameters = {
        "transition" : transitionProb,
        "emission" : emissionProb
    }

    HMM = json.dumps(str(modelParameters))

    HMMfile = open('HMM.json', 'w')
    HMMfile.write(HMM)

In [7]:
def createSentence(sentence):
    wordList = []
    realTagList = []
    for words in sentence:
        word = words[:-1].split('\t')
        wordList.append(word[1])
        realTagList.append(word[2])
    return wordList, realTagList

def accuracy(real ,pred):
    correct=0
    for i in range(len(real)):
        if(real[i] == pred[i]):
            correct+=1
    return (correct/len(real))

def createTestSentence(sentence):
    wordList = []
    for words in sentence:
        word = words[:-1].split('\t')
        wordList.append(word[1])
    return wordList
    
def testAlgorithm(testLines, outputPath, algo):
    sentence = []
    for line in testLines:
        if line == '\n':
            wordList = createTestSentence(sentence)
            if(algo == 'G'):
                predTagList = greedyAlgo(wordList)
            elif(algo == 'V'):
                predTagList = greedyAlgo(wordList)
                
            file2 = open(outputPath, "a")
            for idx in range(len(wordList)):
                file2.write(str(idx+1)+"\t"+wordList[idx]+"\t"+predTagList[idx]+"\n")
            file2.write("\n")
            file2.close()
            sentence = []
        else:
            sentence.append(line)

### Greedy Decoding with HMM

In [8]:
def bestWordTag(word, flag, prevTag):
    maxProb = -1
    bestTag = ""
    for tag in tags:
        if(flag == True):
            try:
                trp = transitionProb[tag]
            except:
                trp = 0
            try:
                emp = emissionProb[(tag, word)]
            except:
                emp = smoothingProb
            prob = (trp *emp)
            if(prob > maxProb):
                maxProb = prob
                bestTag = tag
        else:
            try:
                trp = transitionProb[(prevTag, tag)]
            except:
                trp = 0
            try:
                emp = emissionProb[(tag, word)]
            except:
                emp = smoothingProb
            prob = (trp *emp)
            if(prob > maxProb):
                maxProb = prob
                bestTag = tag
    return bestTag
            
def greedyAlgo(sentence):
    flag=True
    predTagList = []
    for word in sentence:
        if(flag):
            bestTag = bestWordTag(word, flag, '')
            flag = False
        else:
            bestTag = bestWordTag(word, flag, bestTag)
        predTagList.append(bestTag)
    return predTagList

def greedyMain(lines):
    realTags = []
    predTags = []
    sentence = []
    
    for line in lines:
        if line == '\n':
            wordList, realTagList = createSentence(sentence)
            predTagList = greedyAlgo(wordList)
            predTags += predTagList
            realTags += realTagList
            sentence = []
        else:
            sentence.append(line)
    return accuracy(realTags, predTags)

### Viterbi Algorithm

In [9]:
def viterbiAlgo(sentence):
    backTrackMat = [[-1 for j in range(len(sentence))] for i in range(len(tags))]
    viterbiDP = [[-1 for j in range(len(sentence))] for i in range(len(tags))]
    for i in range(len(tags)):
        try:
            trp = transitionProb[tags[i]]
        except:
            trp = 0
        try:
            emp = emissionProb[(tags[i], sentence[0])]
        except:
            emp = smoothingProb 
        viterbiDP[i][0] = (trp *emp)
          
    for j in range(1, len(sentence)):
        for i in range(len(tags)):
            maxTagProb = -1
            maxTag = -1
            for ip in range(len(tags)):
                try:
                    trp = transitionProb[(tags[ip], tags[i])]
                except:
                    trp = 0
                try:
                    emp = emissionProb[(tags[i], sentence[j])]
                except:
                    emp = smoothingProb
                    
                prob = (trp *emp*viterbiDP[ip][j-1])
                
                if prob > maxTagProb:
                    maxTagProb = prob
                    maxTag = ip
            viterbiDP[i][j] = maxTagProb
            backTrackMat[i][j] = maxTag
    
    maxTagProb = -1
    maxTag = -1
    for i in range(len(tags)):
        if(viterbiDP[i][len(sentence)-1] > maxTagProb):
            maxTagProb = viterbiDP[i][len(sentence)-1]
            maxTag = i
    
    return findTagList(backTrackMat, maxTag, len(sentence))

def findTagList(backTrackMat, maxTag, sentenceLength):
    tagList = [tags[maxTag]]
    idx = maxTag
    for j in range(sentenceLength-1, 0, -1):
        idx = backTrackMat[idx][j]
        tagList.insert(0, tags[idx])
    return tagList
            
def viterbiMain(lines):
    realTags = []
    predTags = []
    sentence = []
    
    for line in lines:
        if line == '\n':
            wordList, realTagList = createSentence(sentence)
            predTagList = viterbiAlgo(wordList)
            predTags += predTagList
            realTags += realTagList
            sentence = []
        else:
            sentence.append(line)
    return accuracy(realTags, predTags)        

In [10]:
lines = readFile("data/train")
vocab, unkW, tags, tagVocab = prepareData(lines)
writeVocab(vocab)
transitionProb = prepareTransitionProb(lines, tags)
emissionProb = prepareEmissionProb(tagVocab, tags)
writeJSON(transitionProb, emissionProb)
devLines = readFile("data/dev")
testLines = readFile("data/test")
greedyAcc = greedyMain(devLines)
testAlgorithm(testLines, 'greedy.out', 'G')
tags = list(tags.keys())
viterbiAcc = viterbiMain(devLines)
testAlgorithm(testLines, 'viterbi.out', 'V')

In [11]:
print("Accuracy with Greedy algorithm: ", greedyAcc)

Accuracy with Greedy algorithm:  0.9257007536944691


In [12]:
print("Accuracy with Viterbi algorithm: ", viterbiAcc)

Accuracy with Viterbi algorithm:  0.942421689398942
