#  Preprocessing

In [117]:
import numpy as np
import matplotlib.pyplot as plt
import random

import nltk
import os
import string
#from HMM import supervised_HMM, unsupervised_HMM, HiddenMarkovModel
import re # regular expression

import keras.preprocessing.text

In [118]:
########################################
# CS/CNS/EE 155 2018
# Problem Set 6
#
# Author:       Andrew Kang
# Description:  Set 6 skeleton code
########################################

# You can use this (optional) skeleton code to complete the HMM
# implementation of set 5. Once each part is implemented, you can simply
# execute the related problem scripts (e.g. run 'python 2G.py') to quickly
# see the results from your code.
#
# Some pointers to get you started:
#
#     - Choose your notation carefully and consistently! Readable
#       notation will make all the difference in the time it takes you
#       to implement this class, as well as how difficult it is to debug.
#
#     - Read the documentation in this file! Make sure you know what
#       is expected from each function and what each variable is.
#
#     - Any reference to "the (i, j)^th" element of a matrix T means that
#       you should use T[i][j].
#
#     - Note that in our solution code, no NumPy was used. That is, there
#       are no fancy tricks here, just basic coding. If you understand HMMs
#       to a thorough extent, the rest of this implementation should come
#       naturally. However, if you'd like to use NumPy, feel free to.
#
#     - Take one step at a time! Move onto the next algorithm to implement
#       only if you're absolutely sure that all previous algorithms are
#       correct. We are providing you waypoints for this reason.
#
# To get started, just fill in code where indicated. Best of luck!

import random
import operator

class HiddenMarkovModel:
    '''
    Class implementation of Hidden Markov Models.
    '''

    def __init__(self, A, O):
        '''
        Initializes an HMM. Assumes the following:
            - States and observations are integers starting from 0.
            - There is a start state (see notes on A_start below). There
              is no integer associated with the start state, only
              probabilities in the vector A_start.
            - There is no end state.

        Arguments:
            A:          Transition matrix with dimensions L x L.
                        The (i, j)^th element is the probability of
                        transitioning from state i to state j. Note that
                        this does not include the starting probabilities.

            O:          Observation matrix with dimensions L x D.
                        The (i, j)^th element is the probability of
                        emitting observation j given state i.

        Parameters:
            L:          Number of states.

            D:          Number of observations.

            A:          The transition matrix.

            O:          The observation matrix.

            A_start:    Starting transition probabilities. The i^th element
                        is the probability of transitioning from the start
                        state to state i. For simplicity, we assume that
                        this distribution is uniform.
        '''

        self.L = len(A)
        self.D = len(O[0])
        self.A = A
        self.O = O
        self.A_start = [1. / self.L for _ in range(self.L)]


    def viterbi(self, x):
        '''
        Uses the Viterbi algorithm to find the max probability state
        sequence corresponding to a given input sequence.

        Arguments:
            x:          Input sequence in the form of a list of length M,
                        consisting of integers ranging from 0 to D - 1.

        Returns:
            max_seq:    State sequence corresponding to x with the highest
                        probability.
        '''

        M = len(x)      # Length of sequence.

        # The (i, j)^th elements of probs and seqs are the max probability
        # of the prefix of length i ending in state j and the prefix
        # that gives this probability, respectively.
        #
        # For instance, probs[1][0] is the probability of the prefix of
        # length 1 ending in state 0.
        probs = [[0. for _ in range(self.L)] for _ in range(M + 1)]
        seqs = [['' for _ in range(self.L)] for _ in range(M + 1)]
        t = []
        # initialize row 2 of probs and seqs as 0
        for i in range(self.L):
            probs[1][i] = self.A_start[i] * self.O[i][x[0]]

        # start w row 2 and use top-down approach
        for row in range(2, M+1):
            for col1 in range(self.L):
                temp = []
                for col2 in range(self.L):
                    temp.append(probs[row-1][col2] * self.O[col1][x[row-1]] *
                                self.A[col2] [col1])
                max_index, max_value = max(enumerate(temp), key=operator.itemgetter(1))
                probs[row][col1] = max_value
                seqs[row][col1] = seqs[row-1][max_index] + str(max_index)

        max_index, max_value = max(enumerate(probs[M]), key=operator.itemgetter(1))
        max_seq = seqs[len(probs)-1][max_index] + str(max_index)

        return max_seq


    def forward(self, x, normalize=True):
        '''
        Uses the forward algorithm to calculate the alpha probability
        vectors corresponding to a given input sequence.

        Arguments:
            x:          Input sequence in the form of a list of length M,
                        consisting of integers ranging from 0 to D - 1.

            normalize:  Whether to normalize each set of alpha_j(i) vectors
                        at each i. This is useful to avoid underflow in
                        unsupervised learning.

        Returns:
            alphas:     Vector of alphas.

                        The (i, j)^th element of alphas is alpha_j(i),
                        i.e. the probability of observing prefix x^1:i
                        and state y^i = j.

                        e.g. alphas[1][0] corresponds to the probability
                        of observing x^1:1, i.e. the first observation,
                        given that y^1 = 0, i.e. the first state is 0.
        '''
        M = len(x)
        alphas = [[0. for _ in range(self.L)] for _ in range(M + 1)]

        for i in range(self.L):
            alphas[1][i] = self.O[i][x[0]] * self.A_start[i]

        for i in range(1, M):
            for j in range(self.L):
                sum = 0
                for l in range(self.L):
                    sum += alphas[i][l] * self.A[l][j]
                alphas[i + 1][j] = self.O[j][x[i]] * sum

        if (normalize):
            for i in range(1, M):
                sum = 0
                for a in alphas[i + 1]:
                    sum += a
                if sum != 0:
                    alphas[i + 1] = [a / sum for a in alphas[i + 1]]

        return alphas


    def backward(self, x, normalize=False):
        '''
        Uses the backward algorithm to calculate the beta probability
        vectors corresponding to a given input sequence.

        Arguments:
            x:          Input sequence in the form of a list of length M,
                        consisting of integers ranging from 0 to D - 1.

            normalize:  Whether to normalize each set of alpha_j(i) vectors
                        at each i. This is useful to avoid underflow in
                        unsupervised learning.

        Returns:
            betas:      Vector of betas.

                        The (i, j)^th element of betas is beta_j(i), i.e.
                        the probability of observing prefix x^(i+1):M and
                        state y^i = j.

                        e.g. betas[M][0] corresponds to the probability
                        of observing x^M+1:M, i.e. no observations,
                        given that y^M = 0, i.e. the last state is 0.
        '''
        M = len(x)      # len of sequence
        if normalize:
            betas = [[0. for i in range(self.L)] for j in range(M + 1)]

            for i in range(self.L):
                betas[M][i] = 1

            for row in range(M-1, 0, -1):
                for col in range(self.L):
                    for col2 in range(self.L):
                        betas[row][col] += (betas[row+1][col2] *
                                            self.O[col2][x[row]] * self.A[col] [col2])
                norm = sum(betas[row])
                for nCol in range(self.L):
                    betas[row][nCol] /= norm
            return betas

        betas = [[0. for i in range(self.L)] for j in range(M + 1)]

        for i in range(self.L):
            betas[M][i] = 1

        for row in range(M-1,0, -1):
            for col in range(self.L):
                for col2 in range(self.L):
                    betas[row][col] += (betas[row+1][col2] * self.O[col2][x[row]] *
                                self.A[col] [col2])
        return betas

    def count_transitions(self, a, b, X, Y):
        '''
        y_i^j = b
        and y_i^(j-1) = a
        '''
        den = 0
        num = 0

        for i in range(len(X)):
            for j in range(1, len(X[i])):
                if Y[i][j-1] == a:
                    den += 1
                    if Y[i][j] == b:
                        num += 1

        return num, den

    def count_observations(self, w, a, X, Y):
        '''
        y_i^j = w
        and x_i^j = w
        '''
        den = 0
        num = 0

        for i in range(len(X)):
            for j in range(len(X[i])):
                if Y[i][j] == a:
                    den += 1
                    if X[i][j] == w:
                        num += 1
        return num, den

    def supervised_learning(self, X, Y):
        '''
        Trains the HMM using the Maximum Likelihood closed form solutions
        for the transition and observation matrices on a labeled
        datset (X, Y). Note that this method does not return anything, but
        instead updates the attributes of the HMM object.

        Arguments:
            X:          A dataset consisting of input sequences in the form
                        of lists of variable length, consisting of integers
                        ranging from 0 to D - 1. In other words, a list of
                        lists.

            Y:          A dataset consisting of state sequences in the form
                        of lists of variable length, consisting of integers
                        ranging from 0 to L - 1. In other words, a list of
                        lists.

                        Note that the elements in X line up with those in Y.
        '''

        for a in range(self.L):
            for b in range(self.L):
                num, den = self.count_transitions(a, b, X, Y)
                self.A[a][b] = num / den

        for a in range(len(self.O)):
            for w in range(len(self.O[0])):
                num, den = self.count_observations(w, a, X, Y)
                self.O[a][w] = num / den

        pass


    def unsupervised_learning(self, X, N_iters):
        '''
        Trains the HMM using the Baum-Welch algorithm on an unlabeled
        datset X. Note that this method does not return anything, but
        instead updates the attributes of the HMM object.

        Arguments:
            X:          A dataset consisting of input sequences in the form
                        of lists of length M, consisting of integers ranging
                        from 0 to D - 1. In other words, a list of lists.

            N_iters:    The number of iterations to train on.
        '''

        for x in range(N_iters):
            print("Iteration " + str(x))

            A_den = [0. for i in range(self.L)]
            O_den = [0. for i in range(self.L)]

            A_num = [[0. for i in range(self.L)] for i in range(self.L)]
            O_num = [[0. for i in range(self.D)] for i in range(self.L)]

            for sequence in X:
                M = len(sequence)

                alpha = self.forward(sequence, normalize=True)
                beta = self.backward(sequence, normalize=True)

                for w in range(1, M+1):
                    temp = [0. for i in range(self.L)]
                    for z in range(self.L):
                        temp[z] = alpha[w][z] * beta[w][z]

                    # normalize
                    norm = sum(temp)

                    for z in range(len(temp)):
                        temp[z] /= norm

                    for z in range(self.L):
                        if w != M:
                            A_den[z] += temp[z]
                        O_num[z][sequence[w-1]] += temp[z]
                        O_den[z] += temp[z]

                for w in range(1, M):
                    norm = 0

                    temp = [[0. for _ in range(self.L)] for _ in range(self.L)]

                    for i in range(self.L):
                        for j in range(self.L):
                            temp[i][j] = alpha[w][i] * self.A[i][j] * self.O[j][sequence[w]] * beta[w+1][j]

                    for row in temp:
                        norm += sum(row)

                    for i in range(self.L):
                        for j in range(self.L):
                            temp[i][j] /= norm

                    for i in range(self.L):
                        for j in range(self.L):
                            A_num[i][j] += temp[i][j]

            for i in range(self.L):
                for j in range(self.L):
                    self.A[i][j] = A_num[i][j] / A_den[i]

            for i in range(self.L):
                for j in range(self.D):
                    self.O[i][j] = O_num[i][j] / O_den[i]


    def generate_emission(self, M):
        '''
        Generates an emission of length M, assuming that the starting state
        is chosen uniformly at random.

        Arguments:
            M:          Length of the emission to generate.

        Returns:
            emission:   The randomly generated emission as a list.

            states:     The randomly generated states as a list.
        '''

        emission = []
        states = []
        state = random.choice(range(self.L))

        for _ in range(M):
            states.append(state)

            t = self.A[state]
            o = self.O[state]

            emission.append(int(np.random.choice(range(self.D), 1, p=o)))

            state = int(np.random.choice(range(self.L), 1, p=t))

        return emission, states

    def generate_emission_syllables(self, M, syllable_dic, mapping_r):
        '''
        Generates an emission of length M, assuming that the starting state
        is chosen uniformly at random.

        Returns:
            emission:   The randomly generated emission as a list.
            states:     The randomly generated states as a list.
        '''

        emission = []
        states = []
        state = random.choice(range(self.L))


        count = 0

        while count < M:
            states.append(state)
            bool = False

            rand = random.uniform(0, 1)
            next_obs = 0

            while rand > 0:
                rand -= self.O[states[len(emission)]][next_obs]
                next_obs += 1

            next_obs -= 1

            try:
                if syllable_dic[mapping_r[next_obs]]:
                    syllable_length = syllable_dic[mapping_r[next_obs]]
                    if len(syllable_length) == 1:
                        if int(syllable_length[0]) == 1:
                            count += 1
                            bool = True

                        else:
                            if (count+int(syllable_length[0]) > M):
                                pass
                            else:
                                count += int(syllable_length[0])
                                bool = True
                    else:
                        if 'E' in sorted(syllable_length)[1]:
                            place = int(sorted(syllable_length)[1][1:])
                            if count + place == M:
                                count += place
                                bool = True
                            elif count + int(sorted(syllable_length)[0]) < M:
                                count += int(sorted(syllable_length)[0])
                                bool = True

                        else:
                            if count + int(syllable_length[0]) <= M:
                                count += int(syllable_length[0])
                                bool = True
                            elif count + int(syllable_length[1]) <= M:
                                count += int(syllable_length[1])
                                bool = True
                            else:
                                pass

            except KeyError:
                pass
            if bool:
                emission.append(next_obs)
            else:
                del states[-1]

            #  next
            rand = random.uniform(0, 1)
            next = 0

            while rand > 0:
                rand -= self.A[states[len(emission)-1]][next]
                next += 1

            next -= 1
            state = next

        return emission, states


    def probability_alphas(self, x):
        '''
        Finds the maximum probability of a given input sequence using
        the forward algorithm.

        Arguments:
            x:          Input sequence in the form of a list of length M,
                        consisting of integers ranging from 0 to D - 1.

        Returns:
            prob:       Total probability that x can occur.
        '''

        # Calculate alpha vectors.
        alphas = self.forward(x)

        # alpha_j(M) gives the probability that the state sequence ends
        # in j. Summing this value over all possible states j gives the
        # total probability of x paired with any state sequence, i.e.
        # the probability of x.
        prob = sum(alphas[-1])
        return prob


    def probability_betas(self, x):
        '''
        Finds the maximum probability of a given input sequence using
        the backward algorithm.

        Arguments:
            x:          Input sequence in the form of a list of length M,
                        consisting of integers ranging from 0 to D - 1.

        Returns:
            prob:       Total probability that x can occur.
        '''

        betas = self.backward(x)

        # beta_j(1) gives the probability that the state sequence starts
        # with j. Summing this, multiplied by the starting transition
        # probability and the observation probability, over all states
        # gives the total probability of x paired with any state
        # sequence, i.e. the probability of x.
        prob = sum([betas[1][j] * self.A_start[j] * self.O[j][x[0]] \
                    for j in range(self.L)])

        return prob


def supervised_HMM(X, Y):
    '''
    Helper function to train a supervised HMM. The function determines the
    number of unique states and observations in the given data, initializes
    the transition and observation matrices, creates the HMM, and then runs
    the training function for supervised learning.

    Arguments:
        X:          A dataset consisting of input sequences in the form
                    of lists of variable length, consisting of integers
                    ranging from 0 to D - 1. In other words, a list of lists.

        Y:          A dataset consisting of state sequences in the form
                    of lists of variable length, consisting of integers
                    ranging from 0 to L - 1. In other words, a list of lists.
                    Note that the elements in X line up with those in Y.
    '''
    # Make a set of observations.
    observations = set()
    for x in X:
        observations |= set(x)

    # Make a set of states.
    states = set()
    for y in Y:
        states |= set(y)

    # Compute L and D.
    L = len(states)
    D = len(observations)

    # Randomly initialize and normalize matrix A.
    A = [[random.random() for i in range(L)] for j in range(L)]

    for i in range(len(A)):
        norm = sum(A[i])
        for j in range(len(A[i])):
            A[i][j] /= norm

    # Randomly initialize and normalize matrix O.
    O = [[random.random() for i in range(D)] for j in range(L)]

    for i in range(len(O)):
        norm = sum(O[i])
        for j in range(len(O[i])):
            O[i][j] /= norm

    # Train an HMM with labeled data.
    HMM = HiddenMarkovModel(A, O)
    HMM.supervised_learning(X, Y)

    return HMM

def unsupervised_HMM(X, n_states, N_iters):
    '''
    Helper function to train an unsupervised HMM. The function determines the
    number of unique observations in the given data, initializes
    the transition and observation matrices, creates the HMM, and then runs
    the training function for unsupervised learing.

    Arguments:
        X:          A dataset consisting of input sequences in the form
                    of lists of variable length, consisting of integers
                    ranging from 0 to D - 1. In other words, a list of lists.

        n_states:   Number of hidden states to use in training.

        N_iters:    The number of iterations to train on.
    '''

    # Make a set of observations.
    observations = set()
    for x in X:
        observations |= set(x)

    # Compute L and D.
    L = n_states
    D = len(observations)

    # Randomly initialize and normalize matrix A.
    random.seed(2020)
    A = [[random.random() for i in range(L)] for j in range(L)]

    for i in range(len(A)):
        norm = sum(A[i])
        for j in range(len(A[i])):
            A[i][j] /= norm

    # Randomly initialize and normalize matrix O.
    random.seed(155)
    O = [[random.random() for i in range(D)] for j in range(L)]

    for i in range(len(O)):
        norm = sum(O[i])
        for j in range(len(O[i])):
            O[i][j] /= norm

    # Train an HMM with unlabeled data.
    HMM = HiddenMarkovModel(A, O)
    HMM.unsupervised_learning(X, N_iters)

    return HMM


In [119]:
def parse_map(lines):
    counter = 0
    x = []
    mapping = {}

    for line in lines:
        elem = []
        
        for word in line:
            # replace the following characters with '', i.e. remove them
            #word = re.sub(r'[]_`\', '', word)
            word = re.sub(r'[^\w]', '', word).lower()
                          # {|\}0123456789^\n\t
            # convert all words to lower case
            word = word.lower()
            
            if word not in mapping:
                mapping[word] = counter
                counter += 1
            elem.append(mapping[word])
        x.append(elem)

    return x, mapping



In [120]:
def mapping_reverser(mapping):
    mapping_r = {}

    for key in mapping:
        mapping_r[mapping[key]] = key

    return mapping_r

In [121]:
# get syllable info

syllable = open('data/Syllable_dictionary.txt','r')
syllable_dic = {}
    
for line in syllable:
    line = line.split()
    word = line[0]
    rem = line[1: len(line)]
    syllable_dic[word] = rem

In [122]:
def open_file(files):
    '''Keeps track of lines in an array and words in a dictionary mapping to word frequency'''
    words = {}
    lines = [] 
    
    for f in files:
        file = open(f, 'r')
        
        for line in file:
            if  len(line) < 10:
                continue
            line = line.strip()
            line = line.lower() # turn everything into lower case
            line = "".join(l for l in line if l not in string.punctuation) # take out all punctuation
            line = line.split() # split the line and then add it to lines

            lines.append(line)

            # increase frequency if item already exists, create it if it doesn't
            for w in line:
                try:
                    words[w] += 1
                except KeyError:
                    words[w] = 1
    
    return words, lines

In [123]:
words, lines = open_file(["data/shakespeare.txt","data/spenser.txt"])

In [124]:
states = 12
iterations = 3
x, mapping =  parse_map(lines)

In [125]:
HMM = unsupervised_HMM(x, states, iterations)

Iteration 0
Iteration 1
Iteration 2


In [116]:
x, mapping = parse_map(lines)
mapping_r =  mapping_reverser(mapping)

poem_length = 14
syllable_length = 10

for i in range(poem_length): 
        emission = HMM.generate_emission_syllables(syllable_length, syllable_dic, mapping_r) 
        sentence = [mapping_r[i] for i in emission[0]]

        print(' '.join(sentence).capitalize())

Bound oft old such that whom i own then do
The dost heart one stand my one bright belongs
Vex inhearse of love in making feeling
Drained o better bower her last honest as
Drink sought proud fair comfort darkness grace of
Like night haply love and me is written
Thy so burn proved possessed me every this
When above eye make the your put for your
I fire deface afresh her time weary
She or seemly my write then this doth lies
Art and once me knows but is write you burn
I the see of which when body times whose
And better praises the the and at in
Did in thine render what gives of love thine
